From 28278447310cdf9ab0a5810769ce49155f7abca6 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Tue, 10 Sep 2024 09:49:36 +0200 Subject: [PATCH] [NU-1789] Catalog configuration (#6741) --- ...DeploymentApiHttpServiceBusinessSpec.scala | 4 +- ...ManagementResourcesBatchBusinessSpec.scala | 4 +- docs/Changelog.md | 6 +- .../nussknacker/BatchDataGenerationSpec.scala | 2 +- .../org.apache.flink.table.factories.Factory | 1 + .../definition/MockableCatalogFactory.scala | 30 +++++ .../SqlFromFileReaderTest.scala | 2 +- .../TablesDefinitionDiscoveryTest.scala | 81 +++++++++---- .../flink/table/sink/TableFileSinkTest.scala | 53 ++++++--- .../table/sink/TableSinkParametersTest.scala | 16 +-- .../TableSourceDataGenerationTest.scala | 17 ++- .../flink/table/source/TableSourceTest.scala | 79 +++++++++++++ .../table/FlinkTableComponentProvider.scala | 22 +++- .../engine/flink/table/TableDefinition.scala | 4 +- .../definition/FlinkDataDefinition.scala | 106 ++++++++++++++++++ .../SqlStatementReader.scala | 2 +- .../TablesDefinitionDiscovery.scala | 57 ++++++++++ .../extractor/TablesDefinitionDiscovery.scala | 93 --------------- .../engine/flink/table/sink/TableSink.scala | 11 +- .../flink/table/sink/TableSinkFactory.scala | 13 +-- .../FlinkMiniClusterTableOperations.scala | 20 ++-- .../flink/table/source/TableSource.scala | 25 ++--- .../table/source/TableSourceFactory.scala | 10 +- .../table/utils/TableComponentFactory.scala | 9 +- .../devmodel/TableKafkaPingPongTest.scala | 10 +- 25 files changed, 472 insertions(+), 205 deletions(-) create mode 100644 engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala rename engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/{extractor => definition}/SqlFromFileReaderTest.scala (94%) rename engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/{extractor => definition}/TablesDefinitionDiscoveryTest.scala (54%) create mode 100644 engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala create mode 100644 engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala rename engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/{extractor => definition}/SqlStatementReader.scala (86%) create mode 100644 engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscovery.scala delete mode 100644 engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscovery.scala diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/BaseDeploymentApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/BaseDeploymentApiHttpServiceBusinessSpec.scala index c4629a65d5b..f16b1557767 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/BaseDeploymentApiHttpServiceBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/BaseDeploymentApiHttpServiceBusinessSpec.scala @@ -42,7 +42,7 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy protected val scenario: CanonicalProcess = ScenarioBuilder .streaming(scenarioName) - .source(sourceNodeId, "table", "Table" -> Expression.spel("'transactions'")) + .source(sourceNodeId, "table", "Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions`'")) .customNode( id = "aggregate", outputVar = "agg", @@ -60,7 +60,7 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy .emptySink( id = "sink", typ = "table", - "Table" -> Expression.spel("'transactions_summary'"), + "Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions_summary`'"), "Raw editor" -> Expression.spel("false"), "client_id" -> Expression.spel("#keyValues[0]"), "date" -> Expression.spel("#keyValues[1]"), diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesBatchBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesBatchBusinessSpec.scala index 4d4639d2ffb..dfe6527f4c8 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesBatchBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ManagementResourcesBatchBusinessSpec.scala @@ -76,11 +76,11 @@ class ManagementResourcesBatchBusinessSpec private lazy val exampleScenario = ScenarioBuilder .streaming(exampleScenarioName) - .source("sourceId", "table", "Table" -> Expression.spel("'transactions'")) + .source("sourceId", "table", "Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions`'")) .emptySink( "sinkId", "table", - "Table" -> Expression.spel("'transactions'"), + "Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions_summary`'"), "Raw editor" -> Expression.spel("true"), "Value" -> Expression.spel("#input") ) diff --git a/docs/Changelog.md b/docs/Changelog.md index 95c27ccff5f..794ffdf7a51 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -17,6 +17,10 @@ * [#6734](https://github.com/TouK/nussknacker/pull/6734) Tables from external catalogs are now refreshed automatically when entering into node form. Please be aware that changes in `tableDefinition.sql` are not refreshed. To do this, use `/app/processingtype/reload` API + * [#6741](https://github.com/TouK/nussknacker/pull/6741) Added `catalogConfiguration` configuration option allowing to set up catalog + directly in config instead of by `tableDefinition.sql` + * [#6741](https://github.com/TouK/nussknacker/pull/6741) (Breaking change) Fully qualified table paths are used instead of table names + in table source and sink components in `Table` parameter * [#6716](https://github.com/TouK/nussknacker/pull/6716) Fix type hints for #COLLECTION.merge function. * [#6695](https://github.com/TouK/nussknacker/pull/6695) From now on, arrays on UI are visible as lists but on a background they are stored as it is and SpeL converts them to lists in a runtime. @@ -74,7 +78,6 @@ * [#6445](https://github.com/TouK/nussknacker/pull/6445) [#6499](https://github.com/TouK/nussknacker/pull/6499) Add support to seconds in a duration editor * [#6436](https://github.com/TouK/nussknacker/pull/6436) Typed SpEL list expressions will now infer their compile-time known values, instead of only the supertype of its elements. These values can be used in custom components or validators. * NOTE: selection (`.?`), projection (`.!`) or operations from the `#COLLECTIONS` helper cause the typed list to lose its elements' values -* [#6445](https://github.com/TouK/nussknacker/pull/6445) [#6499](https://github.com/TouK/nussknacker/pull/6499) Add support to seconds in a duration editor * [#6570](https://github.com/TouK/nussknacker/pull/6570) Generic parameters of collection types are better typed now: e.g. `List[Integer]` can be passed to `List[Number]` but not the other way * Batch processing mode related improvements: * [#6353](https://github.com/TouK/nussknacker/pull/6353) [#6467](https://github.com/TouK/nussknacker/pull/6467) Added `join` component @@ -97,7 +100,6 @@ * [#6546](https://github.com/TouK/nussknacker/pull/6546) Error handling during saving data to table sink * [#6567](https://github.com/TouK/nussknacker/pull/6567) Flink's [execution mode](https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/datastream/execution_mode) can now be set for Flink-based scenario types under `modelConfig.executionMode`. -* [#6570](https://github.com/TouK/nussknacker/pull/6570) Generic parameters of collection types are better typed now: e.g. `List[Integer]` can be passed to `List[Number]` but not the other way * [#6615](https://github.com/TouK/nussknacker/pull/6615) Add encode/decode support for typed SpEL values of types: `java.time.LocalDateTime`, `java.time.LocalDate`, `java.time.LocalTime`, `java.time.Duration`, `java.time.Period` * [#6591](https://github.com/TouK/nussknacker/pull/6591) The whole model can be reloaded with `POST /api/app/processingtype/reload` now - you can use this endpoint to reconfigure Nu processing types without need to restart the app * [#6623](https://github.com/TouK/nussknacker/pull/6623) Added `sortedAscBy` and `reverse` functions to `#COLLECTION` helper diff --git a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala index 3017ec0afcc..4a04ecc3dc9 100644 --- a/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala +++ b/e2e-tests/src/test/scala/pl/touk/nussknacker/BatchDataGenerationSpec.scala @@ -22,7 +22,7 @@ class BatchDataGenerationSpec private val simpleBatchTableScenario = ScenarioBuilder .streaming("SumTransactions") - .source("sourceId", "table", "Table" -> "'transactions'".spel) + .source("sourceId", "table", "Table" -> "'`default_catalog`.`default_database`.`transactions`'".spel) .emptySink("end", "dead-end") private val designerServiceUrl = "http://localhost:8080" diff --git a/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000000..670172264d6 --- /dev/null +++ b/engine/flink/components/base-tests/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1 @@ +pl.touk.nussknacker.engine.flink.table.definition.MockableCatalogFactory diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala new file mode 100644 index 00000000000..91d7b897d44 --- /dev/null +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/MockableCatalogFactory.scala @@ -0,0 +1,30 @@ +package pl.touk.nussknacker.engine.flink.table.definition + +import org.apache.flink.table.catalog.{Catalog, GenericInMemoryCatalog} +import org.apache.flink.table.factories.CatalogFactory + +class MockableCatalogFactory extends CatalogFactory { + + override def factoryIdentifier(): String = MockableCatalogFactory.catalogName + + override def createCatalog(context: CatalogFactory.Context): Catalog = MockableCatalogFactory.catalog + +} + +// Warning: this implementation can't be used by concurrent threads +object MockableCatalogFactory { + + private val catalogName = "mockable" + + @volatile + var catalog: GenericInMemoryCatalog = createCatalog + + private def createCatalog = { + new GenericInMemoryCatalog(catalogName) + } + + def resetCatalog(): Unit = { + catalog = createCatalog + } + +} diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/extractor/SqlFromFileReaderTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/SqlFromFileReaderTest.scala similarity index 94% rename from engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/extractor/SqlFromFileReaderTest.scala rename to engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/SqlFromFileReaderTest.scala index 16fcacc91ea..df0aa90fd5f 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/extractor/SqlFromFileReaderTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/SqlFromFileReaderTest.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.flink.table.extractor +package pl.touk.nussknacker.engine.flink.table.definition import org.scalatest.Inside.inside import org.scalatest.funsuite.AnyFunSuite diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscoveryTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala similarity index 54% rename from engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscoveryTest.scala rename to engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala index 5feba21decb..5095302c62b 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscoveryTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscoveryTest.scala @@ -1,16 +1,17 @@ -package pl.touk.nussknacker.engine.flink.table.extractor +package pl.touk.nussknacker.engine.flink.table.definition -import org.apache.flink.table.api.DataTypes -import org.apache.flink.table.catalog.{Column, ResolvedSchema} +import cats.data.Validated.Invalid +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.{DataTypes, Schema} +import org.apache.flink.table.catalog._ import org.scalatest.LoneElement import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks -import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._ import pl.touk.nussknacker.engine.flink.table.TableTestCases.SimpleTable -import pl.touk.nussknacker.engine.flink.table._ -import pl.touk.nussknacker.engine.flink.table.extractor.TablesDefinitionDiscoveryTest.invalidSqlStatements -import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage +import pl.touk.nussknacker.engine.flink.table.definition.TablesDefinitionDiscoveryTest.invalidSqlStatements +import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._ +import pl.touk.nussknacker.test.{PatientScalaFutures, ValidatedValuesDetailedMessage} import scala.jdk.CollectionConverters._ @@ -19,11 +20,25 @@ class TablesDefinitionDiscoveryTest with Matchers with LoneElement with ValidatedValuesDetailedMessage - with TableDrivenPropertyChecks { + with TableDrivenPropertyChecks + with PatientScalaFutures { + + test("return error for empty flink data definition") { + FlinkDataDefinition.create( + sqlStatements = None, + catalogConfigurationOpt = None, + ) should matchPattern { case Invalid(EmptyDataDefinition) => + } + } test("extracts configuration from valid sql statement") { - val statements = SqlStatementReader.readSql(SimpleTable.sqlStatement) - val discovery = TablesDefinitionDiscovery.prepareDiscovery(statements).validValue + val flinkDataDefinition = FlinkDataDefinition + .create( + sqlStatements = Some(SqlStatementReader.readSql(SimpleTable.sqlStatement)), + catalogConfigurationOpt = None, + ) + .validValue + val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue val tablesDefinitions = discovery.listTables val tableDefinition = tablesDefinitions.loneElement val sourceRowType = tableDefinition.sourceRowDataType.toLogicalRowTypeUnsafe @@ -64,25 +79,53 @@ class TablesDefinitionDiscoveryTest | 'connector' = 'datagen' |);""".stripMargin - val statements = SqlStatementReader.readSql(statementsStr) - val discovery = TablesDefinitionDiscovery.prepareDiscovery(statements).validValue - val tablesDefinitions = discovery.listTables + val flinkDataDefinition = FlinkDataDefinition + .create( + sqlStatements = Some(SqlStatementReader.readSql(statementsStr)), + catalogConfigurationOpt = None, + ) + .validValue + val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue + val tablesDefinition = discovery.listTables.loneElement - tablesDefinitions.loneElement shouldBe TableDefinition( - tableName, - ResolvedSchema.of(Column.physical("someString", DataTypes.STRING())) - ) + tablesDefinition.tableId.toString shouldBe "`someCatalog`.`someDatabase`.`testTable2`" + tablesDefinition.schema shouldBe ResolvedSchema.of(Column.physical("someString", DataTypes.STRING())) } test("returns errors for statements that cannot be executed") { invalidSqlStatements.foreach { invalidStatement => - val parsedStatement = SqlStatementReader.readSql(invalidStatement) - val sqlStatementExecutionErrors = TablesDefinitionDiscovery.prepareDiscovery(parsedStatement).invalidValue + val flinkDataDefinition = FlinkDataDefinition + .create( + sqlStatements = Some(SqlStatementReader.readSql(invalidStatement)), + catalogConfigurationOpt = None, + ) + .validValue + val sqlStatementExecutionErrors = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).invalidValue sqlStatementExecutionErrors.size shouldBe 1 } } + test("use catalog configuration in data definition") { + val catalogConfiguration = Configuration.fromMap(Map("type" -> "mockable").asJava) + val catalogTable = CatalogTable.of( + Schema.newBuilder().column("fooColumn", DataTypes.STRING()).build(), + null, + List.empty[String].asJava, + Map.empty[String, String].asJava + ) + MockableCatalogFactory.resetCatalog() + MockableCatalogFactory.catalog.createTable(ObjectPath.fromString("default.fooTable"), catalogTable, false) + val flinkDataDefinition = FlinkDataDefinition.create(None, Some(catalogConfiguration)).validValue + + val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue + + val tableDefinition = discovery.listTables.loneElement + + tableDefinition.tableId.toString shouldBe s"`${FlinkDataDefinition.internalCatalogName}`.`default`.`fooTable`" + tableDefinition.schema shouldBe ResolvedSchema.of(Column.physical("fooColumn", DataTypes.STRING())) + } + } object TablesDefinitionDiscoveryTest { diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala index 61ea8b3aff8..2e001094bca 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableFileSinkTest.scala @@ -209,6 +209,15 @@ class TableFileSinkTest | 'path' = 'file:///$datetimeExpressionOutputDirectory', | 'format' = 'json' |) LIKE `$datetimePingPongInputTableName`; + | + |CREATE DATABASE testdb; + | + |CREATE TABLE testdb.tablewithqualifiedname ( + | `quantity` INT + |) WITH ( + | 'connector' = 'datagen', + | 'number-of-rows' = '1' + |); |""".stripMargin private lazy val sqlTablesDefinitionFilePath = { @@ -253,7 +262,11 @@ class TableFileSinkTest test("should do file-to-file ping-pong for all basic types") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$basicPingPongInputTableName'".spel) + .source( + "start", + "table", + "Table" -> s"'`default_catalog`.`default_database`.`$basicPingPongInputTableName`'".spel + ) .buildVariable( "example-transformations", "out", @@ -270,7 +283,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$basicPingPongOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$basicPingPongOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> "#input".spel ) @@ -305,11 +318,15 @@ class TableFileSinkTest test("should be able to access virtual columns in input table") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$virtualColumnInputTableName'".spel) + .source( + "start", + "table", + "Table" -> s"'`default_catalog`.`default_database`.`$virtualColumnInputTableName`'".spel + ) .emptySink( "end", "table", - "Table" -> s"'$virtualColumnOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$virtualColumnOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> "#input".spel ) @@ -356,7 +373,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$basicExpressionOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$basicExpressionOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> basicTypesExpression ) @@ -383,7 +400,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$oneColumnOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$oneColumnOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> valueExpression ) @@ -397,7 +414,11 @@ class TableFileSinkTest test("should do file-to-file ping-pong for advanced types") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$advancedPingPongInputTableName'".spel) + .source( + "start", + "table", + "Table" -> s"'`default_catalog`.`default_database`.`$advancedPingPongInputTableName`'".spel + ) .buildVariable( "example-transformations", "out", @@ -409,7 +430,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$advancedPingPongOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$advancedPingPongOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> "#input".spel ) @@ -487,7 +508,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$advancedExpressionOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$advancedExpressionOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> valueExpression ) @@ -526,7 +547,11 @@ class TableFileSinkTest test("should do file-to-file ping-pong for datetime types") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$datetimePingPongInputTableName'".spel) + .source( + "start", + "table", + "Table" -> s"'`default_catalog`.`default_database`.`$datetimePingPongInputTableName`'".spel + ) .buildVariable( "example-transformations", "out", @@ -538,7 +563,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$datetimePingPongOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$datetimePingPongOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> "#input".spel ) @@ -581,7 +606,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$datetimeExpressionOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$datetimeExpressionOutputTableName`'".spel, "Raw editor" -> "false".spel, "date" -> "T(java.time.LocalDate).parse('2024-01-01')".spel, "time" -> "T(java.time.LocalTime).parse('12:01:02.000000003')".spel, @@ -623,7 +648,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$genericsOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$genericsOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> valueExpression ) @@ -654,7 +679,7 @@ class TableFileSinkTest .emptySink( "end", "table", - "Table" -> s"'$oneColumnOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$oneColumnOutputTableName`'".spel, "Raw editor" -> "true".spel, "Value" -> valueExpression ) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala index 37cd98e2609..d562d63e193 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkParametersTest.scala @@ -114,11 +114,11 @@ class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers w test("should take parameters per column in non-raw mode") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$inputTableName'".spel) + .source("start", "table", "Table" -> s"'`default_catalog`.`default_database`.`$inputTableName`'".spel) .emptySink( "end", "table", - "Table" -> s"'$outputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$outputTableName`'".spel, "Raw editor" -> "false".spel, "client_id" -> "''".spel, "amount" -> "1".spel, @@ -134,11 +134,11 @@ class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers w test("should skip virtual columns in listed parameters in non-raw mode") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$inputTableName'".spel) + .source("start", "table", "Table" -> s"'`default_catalog`.`default_database`.`$inputTableName`'".spel) .emptySink( "end", "table", - "Table" -> s"'$virtualColumnOutputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$virtualColumnOutputTableName`'".spel, "Raw editor" -> "false".spel, "quantity" -> "2".spel, "price" -> "1.5".spel, @@ -154,11 +154,11 @@ class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers w test("should return errors for type errors in non-raw mode value parameters") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$inputTableName'".spel) + .source("start", "table", "Table" -> s"'`default_catalog`.`default_database`.`$inputTableName`'".spel) .emptySink( "end", "table", - "Table" -> s"'$outputTableName'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$outputTableName`'".spel, "Raw editor" -> "false".spel, "client_id" -> "123.11".spel, "amount" -> "''".spel, @@ -190,11 +190,11 @@ class TableSinkParametersTest extends AnyFunSuite with FlinkSpec with Matchers w test("should return errors for illegally named columns for non-raw mode") { val scenario = ScenarioBuilder .streaming("test") - .source("start", "table", "Table" -> s"'$inputTableName'".spel) + .source("start", "table", "Table" -> s"'`default_catalog`.`default_database`.`$inputTableName`'".spel) .emptySink( "end", "table", - "Table" -> s"'$outputTableNameWithInvalidCols'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$outputTableNameWithInvalidCols`'".spel, "Raw editor" -> "false".spel ) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceDataGenerationTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceDataGenerationTest.scala index 9e0a40782d8..668cf63145a 100644 --- a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceDataGenerationTest.scala +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceDataGenerationTest.scala @@ -5,7 +5,11 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers import pl.touk.nussknacker.engine.flink.table.TableComponentProviderConfig.TestDataGenerationMode import pl.touk.nussknacker.engine.flink.table.TableTestCases.SimpleTable -import pl.touk.nussknacker.engine.flink.table.extractor.{SqlStatementReader, TablesDefinitionDiscovery} +import pl.touk.nussknacker.engine.flink.table.definition.{ + FlinkDataDefinition, + SqlStatementReader, + TablesDefinitionDiscovery +} import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage import scala.jdk.CollectionConverters._ @@ -16,13 +20,18 @@ class TableSourceDataGenerationTest with LoneElement with ValidatedValuesDetailedMessage { - private val statements = SqlStatementReader.readSql(SimpleTable.sqlStatement) + private val flinkDataDefinition = FlinkDataDefinition + .create( + sqlStatements = Some(SqlStatementReader.readSql(SimpleTable.sqlStatement)), + catalogConfigurationOpt = None, + ) + .validValue - private val discovery = TablesDefinitionDiscovery.prepareDiscovery(statements).validValue + private val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue private val tableSource = new TableSource( tableDefinition = discovery.listTables.loneElement, - sqlStatements = statements, + flinkDataDefinition = flinkDataDefinition, testDataGenerationMode = TestDataGenerationMode.Random ) diff --git a/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala new file mode 100644 index 00000000000..8a9339808e3 --- /dev/null +++ b/engine/flink/components/base-tests/src/test/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceTest.scala @@ -0,0 +1,79 @@ +package pl.touk.nussknacker.engine.flink.table.source + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.commons.io.FileUtils +import org.apache.flink.types.Row +import org.scalatest.LoneElement +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider +import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner +import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.{PatientScalaFutures, ValidatedValuesDetailedMessage} + +import java.io.File +import java.nio.charset.StandardCharsets + +class TableSourceTest + extends AnyFunSuite + with FlinkSpec + with Matchers + with PatientScalaFutures + with LoneElement + with ValidatedValuesDetailedMessage { + + import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ + import pl.touk.nussknacker.engine.spel.SpelExtension._ + + private lazy val tablesDefinition = + s"""CREATE DATABASE testdb; + | + |CREATE TABLE testdb.tablewithqualifiedname ( + | `quantity` INT + |) WITH ( + | 'connector' = 'datagen', + | 'number-of-rows' = '1' + |); + |""".stripMargin + + private lazy val sqlTablesDefinitionFilePath = { + val tempFile = File.createTempFile("tables-definition", ".sql") + tempFile.deleteOnExit() + FileUtils.writeStringToFile(tempFile, tablesDefinition, StandardCharsets.UTF_8) + tempFile.toPath + } + + private lazy val tableComponentsConfig: Config = ConfigFactory.parseString( + s"""{ + | tableDefinitionFilePath: $sqlTablesDefinitionFilePath + |}""".stripMargin + ) + + private lazy val tableComponents: List[ComponentDefinition] = new FlinkTableComponentProvider().create( + tableComponentsConfig, + ProcessObjectDependencies.withConfig(tableComponentsConfig) + ) + + private lazy val runner: FlinkTestScenarioRunner = TestScenarioRunner + .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .withExecutionMode(ExecutionMode.Batch) + .withExtraComponents(tableComponents) + .build() + + test("be possible to user table declared inside a database other than the default one") { + val scenario = ScenarioBuilder + .streaming("test") + .source("start", "table", "Table" -> s"'`default_catalog`.`testdb`.`tablewithqualifiedname`'".spel) + .emptySink(s"end", TestScenarioRunner.testResultSink, "value" -> "#input".spel) + + val result = runner.runWithoutData[Row](scenario).validValue + result.errors shouldBe empty + result.successes.loneElement + } + +} diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/FlinkTableComponentProvider.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/FlinkTableComponentProvider.scala index b0d429b9240..b5c2f3805a7 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/FlinkTableComponentProvider.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/FlinkTableComponentProvider.scala @@ -2,19 +2,21 @@ package pl.touk.nussknacker.engine.flink.table import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.configuration.Configuration import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.flink.table.FlinkTableComponentProvider.configIndependentComponents import pl.touk.nussknacker.engine.flink.table.TableComponentProviderConfig.TestDataGenerationMode import pl.touk.nussknacker.engine.flink.table.TableComponentProviderConfig.TestDataGenerationMode.TestDataGenerationMode import pl.touk.nussknacker.engine.flink.table.aggregate.TableAggregationFactory -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader +import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, SqlStatementReader} import pl.touk.nussknacker.engine.flink.table.join.TableJoinComponent import pl.touk.nussknacker.engine.flink.table.sink.TableSinkFactory import pl.touk.nussknacker.engine.flink.table.source.TableSourceFactory import pl.touk.nussknacker.engine.util.ResourceLoader import java.nio.file.{Path, Paths} +import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} /** @@ -37,17 +39,26 @@ class FlinkTableComponentProvider extends ComponentProvider with LazyLogging { override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = { val parsedConfig = TableComponentProviderConfig.parse(config) val testDataGenerationModeOrDefault = parsedConfig.testDataGenerationMode.getOrElse(TestDataGenerationMode.default) - val sqlStatements = readSqlFromFile(Paths.get(parsedConfig.tableDefinitionFilePath)) + val sqlStatements = parsedConfig.tableDefinitionFilePath.map(Paths.get(_)).map(readSqlFromFile) + val catalogConfigurationOpt = parsedConfig.catalogConfiguration.map(_.asJava).map(Configuration.fromMap) + val flinkDataDefinition = + FlinkDataDefinition + .create(sqlStatements, catalogConfigurationOpt) + .valueOr(_ => + throw new IllegalArgumentException( + "Empty data definition configuration. At least one of either tableDefinitionFilePath or catalogConfiguration should be configured" + ) + ) ComponentDefinition( tableComponentName, new TableSourceFactory( - sqlStatements, + flinkDataDefinition, testDataGenerationModeOrDefault ) ) :: ComponentDefinition( tableComponentName, - new TableSinkFactory(sqlStatements) + new TableSinkFactory(flinkDataDefinition) ) :: configIndependentComponents } @@ -83,7 +94,8 @@ object FlinkTableComponentProvider { } final case class TableComponentProviderConfig( - tableDefinitionFilePath: String, + tableDefinitionFilePath: Option[String], + catalogConfiguration: Option[Map[String, String]], testDataGenerationMode: Option[TestDataGenerationMode] ) diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/TableDefinition.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/TableDefinition.scala index b8879bb093d..fcc60450612 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/TableDefinition.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/TableDefinition.scala @@ -1,12 +1,12 @@ package pl.touk.nussknacker.engine.flink.table -import org.apache.flink.table.catalog.ResolvedSchema +import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedSchema} import org.apache.flink.table.types.DataType // We need to use ResolvedSchema instead of (unresolved) Schema because we need to know the type // of computed columns in sources. UnresolvedComputedColumn holds unresolved Expression which unknown type. // After expression resolution, the type is determined. -final case class TableDefinition(tableName: String, schema: ResolvedSchema) { +final case class TableDefinition(tableId: ObjectIdentifier, schema: ResolvedSchema) { lazy val sourceRowDataType: DataType = schema.toSourceRowDataType diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala new file mode 100644 index 00000000000..a97c5ea019b --- /dev/null +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/FlinkDataDefinition.scala @@ -0,0 +1,106 @@ +package pl.touk.nussknacker.engine.flink.table.definition + +import cats.data.{Validated, ValidatedNel} +import cats.implicits.{toFunctorOps, toTraverseOps} +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.catalog.CatalogDescriptor +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition.internalCatalogName +import pl.touk.nussknacker.engine.flink.table.definition.SqlStatementReader.SqlStatement + +import scala.util.Try + +class FlinkDataDefinition private ( + sqlStatements: Option[List[String]], + catalogConfigurationOpt: Option[Configuration] +) extends Serializable { + + def registerIn(tableEnvironment: TableEnvironment): ValidatedNel[DataDefinitionRegistrationError, Unit] = { + val sqlStatementsExecutionResults = sqlStatements.toList.flatten + .map(s => + Validated + .fromTry(Try(tableEnvironment.executeSql(s))) + .leftMap(SqlStatementExecutionError(s, _): DataDefinitionRegistrationError) + .toValidatedNel + ) + val catalogRegistrationResult = catalogConfigurationOpt.map { catalogConfiguration => + Validated + .fromTry( + Try { + tableEnvironment + .createCatalog(internalCatalogName, CatalogDescriptor.of(internalCatalogName, catalogConfiguration)) + tableEnvironment.useCatalog(internalCatalogName) + } + ) + .leftMap(CatalogRegistrationError(catalogConfiguration, _): DataDefinitionRegistrationError) + .toValidatedNel + } + (sqlStatementsExecutionResults ::: catalogRegistrationResult.toList).sequence.void + } + +} + +object FlinkDataDefinition { + + private[definition] val internalCatalogName = "$nuCatalog" + + def create( + sqlStatements: Option[List[String]], + catalogConfigurationOpt: Option[Configuration] + ): Validated[EmptyDataDefinition.type, FlinkDataDefinition] = { + Validated.cond( + sqlStatements.isDefined || catalogConfigurationOpt.isDefined, + new FlinkDataDefinition(sqlStatements, catalogConfigurationOpt), + EmptyDataDefinition + ) + } + + implicit class DataDefinitionRegistrationResultExtension[T]( + result: ValidatedNel[DataDefinitionRegistrationError, T] + ) { + + def orFail: T = { + result.valueOr { errors => + throw new IllegalStateException( + errors.toList + .map(_.message) + .mkString("Errors occurred when data definition registration in TableEnvironment: ", ", ", "") + ) + } + } + + } + +} + +object EmptyDataDefinition + +sealed trait DataDefinitionRegistrationError { + def message: String +} + +final case class SqlStatementExecutionError(statement: SqlStatement, exception: Throwable) + extends DataDefinitionRegistrationError { + + override def message: String = + s"""Could not execute sql statement. The statement may be malformed. + |Sql statement: $statement + |Caused by: $exception""".stripMargin + +} + +final case class CatalogRegistrationError(catalogConfiguration: Configuration, exception: Throwable) + extends DataDefinitionRegistrationError { + + override def message: String = + s"Could not created catalog with configuration: $catalogConfiguration. Caused by: $exception" + +} + +final case class DefaultDatabaseSetupError(dbName: String, exception: Throwable) + extends DataDefinitionRegistrationError { + + override def message: SqlStatement = + s"Could not set default database to: $dbName. Caused by: $exception" + +} diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/SqlStatementReader.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/SqlStatementReader.scala similarity index 86% rename from engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/SqlStatementReader.scala rename to engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/SqlStatementReader.scala index 47a21dfd6c0..c6ed496adfb 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/SqlStatementReader.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/SqlStatementReader.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.flink.table.extractor +package pl.touk.nussknacker.engine.flink.table.definition import com.typesafe.scalalogging.LazyLogging diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscovery.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscovery.scala new file mode 100644 index 00000000000..c8f9824c508 --- /dev/null +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/definition/TablesDefinitionDiscovery.scala @@ -0,0 +1,57 @@ +package pl.touk.nussknacker.engine.flink.table.definition + +import cats.data.ValidatedNel +import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} +import org.apache.flink.table.catalog.ObjectIdentifier +import pl.touk.nussknacker.engine.flink.table.TableDefinition + +import scala.jdk.OptionConverters.RichOptional +import scala.util.Try + +// TODO: Make this extractor more memory/cpu efficient and ensure closing of resources. For more details see +// https://github.com/TouK/nussknacker/pull/5627#discussion_r1512881038 +class TablesDefinitionDiscovery(tableEnv: TableEnvironment) extends LazyLogging { + + import scala.jdk.CollectionConverters._ + + def listTables: List[TableDefinition] = { + for { + catalogName <- tableEnv.listCatalogs().toList + catalog <- tableEnv.getCatalog(catalogName).toScala.toList + databaseName <- catalog.listDatabases.asScala.toList + tableName <- tableEnv.listTables(catalogName, databaseName).toList + tableId = ObjectIdentifier.of(catalogName, databaseName, tableName) + } yield extractTableDefinition(tableId) + } + + private def extractTableDefinition(tableId: ObjectIdentifier) = { + val table = Try(tableEnv.from(tableId.toString)).fold( + ex => throw new IllegalStateException(s"Table extractor could not locate a created table with id: $tableId", ex), + identity + ) + TableDefinition(tableId, table.getResolvedSchema) + } + +} + +object TablesDefinitionDiscovery { + + def prepareDiscovery( + flinkDataDefinition: FlinkDataDefinition + ): ValidatedNel[DataDefinitionRegistrationError, TablesDefinitionDiscovery] = { + val environmentSettings = EnvironmentSettings + .newInstance() + .build() + prepareDiscovery(flinkDataDefinition, environmentSettings) + } + + private[definition] def prepareDiscovery( + flinkDataDefinition: FlinkDataDefinition, + environmentSettings: EnvironmentSettings + ): ValidatedNel[DataDefinitionRegistrationError, TablesDefinitionDiscovery] = { + val tableEnv = TableEnvironment.create(environmentSettings) + flinkDataDefinition.registerIn(tableEnv).map(_ => new TablesDefinitionDiscovery(tableEnv)) + } + +} diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscovery.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscovery.scala deleted file mode 100644 index 4548aef749f..00000000000 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/extractor/TablesDefinitionDiscovery.scala +++ /dev/null @@ -1,93 +0,0 @@ -package pl.touk.nussknacker.engine.flink.table.extractor - -import cats.data.{NonEmptyList, ValidatedNel} -import cats.implicits.catsSyntaxValidatedId -import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} -import org.apache.flink.table.catalog.ObjectIdentifier -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementNotExecutedError.statementNotExecutedErrorDescription -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement -import pl.touk.nussknacker.engine.flink.table.{TableDefinition, extractor} - -import scala.jdk.OptionConverters.RichOptional -import scala.util.{Failure, Success, Try} - -// TODO: Make this extractor more memory/cpu efficient and ensure closing of resources. For more details see -// https://github.com/TouK/nussknacker/pull/5627#discussion_r1512881038 -class TablesDefinitionDiscovery(tableEnv: TableEnvironment) extends LazyLogging { - - import scala.jdk.CollectionConverters._ - - def listTables: List[TableDefinition] = { - for { - catalogName <- tableEnv.listCatalogs().toList - catalog <- tableEnv.getCatalog(catalogName).toScala.toList - databaseName <- catalog.listDatabases.asScala.toList - tableName <- tableEnv.listTables(catalogName, databaseName).toList - tableId = ObjectIdentifier.of(catalogName, databaseName, tableName) - } yield extractTableDefinition(tableId) - } - - private def extractTableDefinition(tableId: ObjectIdentifier) = { - val table = Try(tableEnv.from(tableId.toString)).getOrElse( - throw new IllegalStateException(s"Table extractor could not locate a created table with path: $tableId") - ) - TableDefinition(tableId.getObjectName, table.getResolvedSchema) - } - -} - -object TablesDefinitionDiscovery { - - def prepareDiscoveryUnsafe(sqlStatements: List[SqlStatement]): TablesDefinitionDiscovery = { - prepareDiscovery(sqlStatements).valueOr { errors => - throw new IllegalStateException( - errors.toList - .map(_.message) - .mkString("Errors occurred when parsing sql component configuration file: ", ", ", "") - ) - } - - } - - def prepareDiscovery( - sqlStatements: List[SqlStatement] - ): ValidatedNel[SqlStatementNotExecutedError, TablesDefinitionDiscovery] = { - val settings = EnvironmentSettings - .newInstance() - .build() - val tableEnv = TableEnvironment.create(settings) - - val sqlErrors = sqlStatements.flatMap(s => - Try(tableEnv.executeSql(s)) match { - case Failure(exception) => Some(SqlStatementNotExecutedError(s, exception)) - case Success(_) => None - } - ) - NonEmptyList - .fromList(sqlErrors) - .map(_.invalid[TablesDefinitionDiscovery]) - .getOrElse(new extractor.TablesDefinitionDiscovery(tableEnv).valid) - } - -} - -final case class SqlStatementNotExecutedError( - statement: SqlStatement, - exception: Throwable, -) { - - val message: String = { - val baseErrorMessage = s"$statementNotExecutedErrorDescription" - val sqlStatementMessage = s"Sql statement: $statement" - val exceptionMessage = s"Caused by: $exception" - s"$baseErrorMessage\n$sqlStatementMessage\n$exceptionMessage." - } - -} - -object SqlStatementNotExecutedError { - - private val statementNotExecutedErrorDescription = "Could not execute sql statement. The statement may be malformed." - -} diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala index b1bd3912132..032a10dad64 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSink.scala @@ -16,13 +16,13 @@ import pl.touk.nussknacker.engine.flink.api.exception.{ExceptionHandler, WithExc import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSink} import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection import pl.touk.nussknacker.engine.flink.table.TableDefinition -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement -import pl.touk.nussknacker.engine.flink.table.utils.ToTableTypeSchemaBasedEncoder +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._ +import pl.touk.nussknacker.engine.flink.table.utils.ToTableTypeSchemaBasedEncoder class TableSink( tableDefinition: TableDefinition, - sqlStatements: List[SqlStatement], + flinkDataDefinition: FlinkDataDefinition, value: LazyParameter[AnyRef] ) extends FlinkSink { @@ -44,6 +44,7 @@ class TableSink( ): DataStreamSink[_] = { val env = dataStream.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env) + flinkDataDefinition.registerIn(tableEnv) /* DataStream to Table transformation: @@ -57,10 +58,8 @@ class TableSink( val streamOfRows = dataStream.flatMap(EncodeAsTableTypeFunction(flinkNodeContext, value.returnType, sinkRowType)) val inputValueTable = tableEnv.fromDataStream(streamOfRows) - sqlStatements.foreach(tableEnv.executeSql) - val statementSet = tableEnv.createStatementSet() - statementSet.add(inputValueTable.insertInto(s"`${tableDefinition.tableName}`")) + statementSet.add(inputValueTable.insertInto(tableDefinition.tableId.toString)) statementSet.attachAsDataStream() /* diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkFactory.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkFactory.scala index c888992de3b..d0463699fde 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkFactory.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/sink/TableSinkFactory.scala @@ -16,13 +16,12 @@ import pl.touk.nussknacker.engine.api.parameter.ParameterName import pl.touk.nussknacker.engine.api.process.{Sink, SinkFactory} import pl.touk.nussknacker.engine.api.{NodeId, Params} import pl.touk.nussknacker.engine.flink.table.TableDefinition -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement -import pl.touk.nussknacker.engine.flink.table.extractor.TablesDefinitionDiscovery +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition._ +import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, TablesDefinitionDiscovery} import pl.touk.nussknacker.engine.flink.table.sink.TableSinkFactory._ -import pl.touk.nussknacker.engine.flink.table.utils.TableComponentFactory._ import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._ import pl.touk.nussknacker.engine.flink.table.utils.TableComponentFactory -import pl.touk.nussknacker.engine.flink.table.utils.TableComponentFactory.getSelectedTableUnsafe +import pl.touk.nussknacker.engine.flink.table.utils.TableComponentFactory._ import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.util.parameters.{ SchemaBasedParameter, @@ -34,13 +33,13 @@ import pl.touk.nussknacker.engine.util.sinkvalue.SinkValue import scala.collection.immutable.ListMap import scala.jdk.CollectionConverters._ -class TableSinkFactory(sqlStatements: List[SqlStatement]) +class TableSinkFactory(flinkDataDefinition: FlinkDataDefinition) extends SingleInputDynamicComponent[Sink] with SinkFactory with BoundedStreamComponent { @transient - private lazy val tablesDiscovery = TablesDefinitionDiscovery.prepareDiscoveryUnsafe(sqlStatements) + private lazy val tablesDiscovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).orFail override type State = TableSinkFactoryState @@ -180,7 +179,7 @@ class TableSinkFactory(sqlStatements: List[SqlStatement]) new TableSink( tableDefinition = finalState.tableDefinition, - sqlStatements = sqlStatements, + flinkDataDefinition = flinkDataDefinition, value = lazyValueParam ) } diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala index e907a78751b..35f822b4578 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/FlinkMiniClusterTableOperations.scala @@ -8,10 +8,12 @@ import org.apache.flink.configuration.{Configuration, CoreOptions, PipelineOptio import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.$ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment -import org.apache.flink.table.api.{EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment} +import org.apache.flink.table.api._ +import org.apache.flink.table.catalog.ObjectIdentifier import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.test.{TestData, TestRecord} -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition._ import pl.touk.nussknacker.engine.util.ThreadUtils import java.nio.charset.StandardCharsets @@ -38,12 +40,12 @@ object FlinkMiniClusterTableOperations extends LazyLogging { def generateLiveTestData( limit: Int, schema: Schema, - sqlStatements: List[SqlStatement], - tableName: TableName + flinkDataDefinition: FlinkDataDefinition, + tableId: ObjectIdentifier ): TestData = generateTestData( limit = limit, schema = schema, - buildSourceTable = createLiveDataGeneratorTable(sqlStatements, tableName, schema) + buildSourceTable = createLiveDataGeneratorTable(flinkDataDefinition, tableId, schema) ) def generateRandomTestData(amount: Int, schema: Schema): TestData = generateTestData( @@ -119,12 +121,12 @@ object FlinkMiniClusterTableOperations extends LazyLogging { } private def createLiveDataGeneratorTable( - sqlStatements: List[SqlStatement], - tableName: TableName, + flinkDataDefinition: FlinkDataDefinition, + tableId: ObjectIdentifier, schema: Schema )(env: TableEnvironment): Table = { - TableSource.executeSqlDDL(sqlStatements, env) - env.from(s"`$tableName`").select(schema.getColumns.asScala.map(_.getName).map($).toList: _*) + flinkDataDefinition.registerIn(env).orFail + env.from(tableId.toString).select(schema.getColumns.asScala.map(_.getName).map($).toList: _*) } private def createTempFileTable(flinkTableSchema: Schema)(implicit env: TableEnvironment): (Path, TableName) = { diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala index 46be4cdbf75..5b826f57979 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSource.scala @@ -1,11 +1,9 @@ package pl.touk.nussknacker.engine.flink.table.source; -import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.Expressions.$ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment -import org.apache.flink.table.api.{DataTypes, Schema, TableEnvironment} +import org.apache.flink.table.api.{DataTypes, Schema} import org.apache.flink.table.catalog.Column.{ComputedColumn, MetadataColumn, PhysicalColumn} import org.apache.flink.types.Row import pl.touk.nussknacker.engine.api.component.SqlFilteringExpression @@ -27,7 +25,8 @@ import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermar import pl.touk.nussknacker.engine.flink.table.TableComponentProviderConfig.TestDataGenerationMode import pl.touk.nussknacker.engine.flink.table.TableComponentProviderConfig.TestDataGenerationMode.TestDataGenerationMode import pl.touk.nussknacker.engine.flink.table.TableDefinition -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition._ import pl.touk.nussknacker.engine.flink.table.source.TableSource._ import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._ import pl.touk.nussknacker.engine.flink.table.utils.SchemaExtensions._ @@ -36,7 +35,7 @@ import scala.jdk.CollectionConverters._ class TableSource( tableDefinition: TableDefinition, - sqlStatements: List[SqlStatement], + flinkDataDefinition: FlinkDataDefinition, testDataGenerationMode: TestDataGenerationMode ) extends StandardFlinkSource[Row] with TestWithParametersSupport[Row] @@ -48,14 +47,14 @@ class TableSource( flinkNodeContext: FlinkCustomNodeContext ): DataStream[Row] = { val tableEnv = StreamTableEnvironment.create(env) + flinkDataDefinition.registerIn(tableEnv).orFail - executeSqlDDL(sqlStatements, tableEnv) - val selectQuery = tableEnv.from(s"`${tableDefinition.tableName}`") + val selectQuery = tableEnv.from(tableDefinition.tableId.toString) val finalQuery = flinkNodeContext.nodeDeploymentData .map { case SqlFilteringExpression(sqlExpression) => tableEnv.executeSql( - s"CREATE TEMPORARY VIEW $filteringInternalViewName AS SELECT * FROM `${tableDefinition.tableName}` WHERE $sqlExpression" + s"CREATE TEMPORARY VIEW $filteringInternalViewName AS SELECT * FROM ${tableDefinition.tableId} WHERE $sqlExpression" ) tableEnv .from(filteringInternalViewName) @@ -114,8 +113,8 @@ class TableSource( FlinkMiniClusterTableOperations.generateLiveTestData( limit = size, schema = generateDataSchema, - sqlStatements = sqlStatements, - tableName = tableDefinition.tableName + flinkDataDefinition = flinkDataDefinition, + tableId = tableDefinition.tableId ) } } @@ -128,10 +127,4 @@ class TableSource( object TableSource { private val filteringInternalViewName = "filteringView" - - private[source] def executeSqlDDL( - sqlStatements: List[SqlStatement], - tableEnv: TableEnvironment - ): Unit = sqlStatements.foreach(tableEnv.executeSql) - } diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceFactory.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceFactory.scala index 3f3e2d7fbe3..c0ce0256ab4 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceFactory.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/source/TableSourceFactory.scala @@ -12,8 +12,8 @@ import pl.touk.nussknacker.engine.api.process.{BasicContextInitializer, Source, import pl.touk.nussknacker.engine.api.{NodeId, Params} import pl.touk.nussknacker.engine.flink.table.TableComponentProviderConfig.TestDataGenerationMode.TestDataGenerationMode import pl.touk.nussknacker.engine.flink.table.TableDefinition -import pl.touk.nussknacker.engine.flink.table.extractor.SqlStatementReader.SqlStatement -import pl.touk.nussknacker.engine.flink.table.extractor.TablesDefinitionDiscovery +import pl.touk.nussknacker.engine.flink.table.definition.{FlinkDataDefinition, TablesDefinitionDiscovery} +import pl.touk.nussknacker.engine.flink.table.definition.FlinkDataDefinition._ import pl.touk.nussknacker.engine.flink.table.source.TableSourceFactory.{ AvailableTables, SelectedTable, @@ -24,14 +24,14 @@ import pl.touk.nussknacker.engine.flink.table.utils.TableComponentFactory import pl.touk.nussknacker.engine.flink.table.utils.TableComponentFactory._ class TableSourceFactory( - sqlStatements: List[SqlStatement], + flinkDataDefinition: FlinkDataDefinition, testDataGenerationMode: TestDataGenerationMode ) extends SingleInputDynamicComponent[Source] with SourceFactory with BoundedStreamComponent { @transient - private lazy val tablesDiscovery = TablesDefinitionDiscovery.prepareDiscoveryUnsafe(sqlStatements) + private lazy val tablesDiscovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).orFail override type State = TableSourceFactoryState @@ -69,7 +69,7 @@ class TableSourceFactory( s"Unexpected final state determined during parameters validation: $finalStateOpt" ) } - new TableSource(selectedTable, sqlStatements, testDataGenerationMode) + new TableSource(selectedTable, flinkDataDefinition, testDataGenerationMode) } override def nodeDependencies: List[NodeDependency] = List.empty diff --git a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/utils/TableComponentFactory.scala b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/utils/TableComponentFactory.scala index 9307c222749..e3653d47537 100644 --- a/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/utils/TableComponentFactory.scala +++ b/engine/flink/components/table/src/main/scala/pl/touk/nussknacker/engine/flink/table/utils/TableComponentFactory.scala @@ -11,7 +11,9 @@ object TableComponentFactory { def buildTableNameParam( defs: List[TableDefinition] ): ParameterExtractor[String] with ParameterCreatorWithNoDependency = { - val possibleTableParamValues = defs.map(c => FixedExpressionValue(s"'${c.tableName}'", c.tableName)) + // TODO: We should use objectName in label only when a table is defined inside the default catalog and the default database + // For other cases, we should use qualified paths + val possibleTableParamValues = defs.map(c => FixedExpressionValue(s"'${c.tableId}'", c.tableId.getObjectName)) ParameterDeclaration .mandatory[String](tableNameParamName) .withCreator( @@ -22,11 +24,12 @@ object TableComponentFactory { } def getSelectedTableUnsafe( - tableName: String, + // There is no easy way to create ObjectIdentifier from String and we don't want to add this type to Nu supported types so we pass String instead + tableIdString: String, configs: List[TableDefinition] ): TableDefinition = configs - .find(_.tableName == tableName) + .find(_.tableId.toString == tableIdString) .getOrElse(throw new IllegalStateException("Table with selected name not found.")) } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala index 6e1328460ab..d60fcc2c07c 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/devmodel/TableKafkaPingPongTest.scala @@ -138,7 +138,7 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { .source( "start", tableComponentName, - TableComponentFactory.tableNameParamName.value -> s"'$sqlInputTableNameTest1'".spel + TableComponentFactory.tableNameParamName.value -> s"'`default_catalog`.`default_database`.`$sqlInputTableNameTest1`'".spel ) .filter("filterId", "#input.someInt != 1".spel) .emptySink( @@ -176,13 +176,13 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { .source( sourceId, tableComponentName, - TableComponentFactory.tableNameParamName.value -> s"'$sqlInputTableNameTest2'".spel + TableComponentFactory.tableNameParamName.value -> s"'`default_catalog`.`default_database`.`$sqlInputTableNameTest2`'".spel ) .filter("filterId", "#input.someInt != 1".spel) .emptySink( "end", tableComponentName, - "Table" -> s"'$sqlOutputTableNameTest2'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$sqlOutputTableNameTest2`'".spel, "Raw editor" -> "true".spel, "Value" -> "#input".spel ) @@ -209,12 +209,12 @@ class TableKafkaPingPongTest extends FlinkWithKafkaSuite { .source( "start", tableComponentName, - TableComponentFactory.tableNameParamName.value -> s"'$sqlInputTableNameTest3'".spel + TableComponentFactory.tableNameParamName.value -> s"'`default_catalog`.`default_database`.`$sqlInputTableNameTest3`'".spel ) .emptySink( "end", tableComponentName, - "Table" -> s"'$sqlOutputTableNameTest3'".spel, + "Table" -> s"'`default_catalog`.`default_database`.`$sqlOutputTableNameTest3`'".spel, "Raw editor" -> "true".spel, "Value" -> "{someInt: 2, someString: 'BBB'}".spel )