Skip to content

Commit

Permalink
Merge branch 'staging' into windows-update
Browse files Browse the repository at this point in the history
  • Loading branch information
JulianWielga authored Sep 10, 2024
2 parents 7342333 + 2827844 commit a3694a1
Show file tree
Hide file tree
Showing 25 changed files with 472 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy

protected val scenario: CanonicalProcess = ScenarioBuilder
.streaming(scenarioName)
.source(sourceNodeId, "table", "Table" -> Expression.spel("'transactions'"))
.source(sourceNodeId, "table", "Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions`'"))
.customNode(
id = "aggregate",
outputVar = "agg",
Expand All @@ -60,7 +60,7 @@ trait BaseDeploymentApiHttpServiceBusinessSpec extends WithFlinkContainersDeploy
.emptySink(
id = "sink",
typ = "table",
"Table" -> Expression.spel("'transactions_summary'"),
"Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions_summary`'"),
"Raw editor" -> Expression.spel("false"),
"client_id" -> Expression.spel("#keyValues[0]"),
"date" -> Expression.spel("#keyValues[1]"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ class ManagementResourcesBatchBusinessSpec

private lazy val exampleScenario = ScenarioBuilder
.streaming(exampleScenarioName)
.source("sourceId", "table", "Table" -> Expression.spel("'transactions'"))
.source("sourceId", "table", "Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions`'"))
.emptySink(
"sinkId",
"table",
"Table" -> Expression.spel("'transactions'"),
"Table" -> Expression.spel("'`default_catalog`.`default_database`.`transactions_summary`'"),
"Raw editor" -> Expression.spel("true"),
"Value" -> Expression.spel("#input")
)
Expand Down
6 changes: 4 additions & 2 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
* [#6734](https://github.com/TouK/nussknacker/pull/6734) Tables from external catalogs are now refreshed automatically
when entering into node form. Please be aware that changes in `tableDefinition.sql` are not refreshed.
To do this, use `/app/processingtype/reload` API
* [#6741](https://github.com/TouK/nussknacker/pull/6741) Added `catalogConfiguration` configuration option allowing to set up catalog
directly in config instead of by `tableDefinition.sql`
* [#6741](https://github.com/TouK/nussknacker/pull/6741) (Breaking change) Fully qualified table paths are used instead of table names
in table source and sink components in `Table` parameter
* [#6716](https://github.com/TouK/nussknacker/pull/6716) Fix type hints for #COLLECTION.merge function.
* [#6695](https://github.com/TouK/nussknacker/pull/6695) From now on, arrays on UI are visible as lists but on a
background they are stored as it is and SpeL converts them to lists in a runtime.
Expand Down Expand Up @@ -74,7 +78,6 @@
* [#6445](https://github.com/TouK/nussknacker/pull/6445) [#6499](https://github.com/TouK/nussknacker/pull/6499) Add support to seconds in a duration editor
* [#6436](https://github.com/TouK/nussknacker/pull/6436) Typed SpEL list expressions will now infer their compile-time known values, instead of only the supertype of its elements. These values can be used in custom components or validators.
* NOTE: selection (`.?`), projection (`.!`) or operations from the `#COLLECTIONS` helper cause the typed list to lose its elements' values
* [#6445](https://github.com/TouK/nussknacker/pull/6445) [#6499](https://github.com/TouK/nussknacker/pull/6499) Add support to seconds in a duration editor
* [#6570](https://github.com/TouK/nussknacker/pull/6570) Generic parameters of collection types are better typed now: e.g. `List[Integer]` can be passed to `List[Number]` but not the other way
* Batch processing mode related improvements:
* [#6353](https://github.com/TouK/nussknacker/pull/6353) [#6467](https://github.com/TouK/nussknacker/pull/6467) Added `join` component
Expand All @@ -97,7 +100,6 @@
* [#6546](https://github.com/TouK/nussknacker/pull/6546) Error handling during saving data to table sink
* [#6567](https://github.com/TouK/nussknacker/pull/6567) Flink's [execution mode](https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/datastream/execution_mode)
can now be set for Flink-based scenario types under `modelConfig.executionMode`.
* [#6570](https://github.com/TouK/nussknacker/pull/6570) Generic parameters of collection types are better typed now: e.g. `List[Integer]` can be passed to `List[Number]` but not the other way
* [#6615](https://github.com/TouK/nussknacker/pull/6615) Add encode/decode support for typed SpEL values of types: `java.time.LocalDateTime`, `java.time.LocalDate`, `java.time.LocalTime`, `java.time.Duration`, `java.time.Period`
* [#6591](https://github.com/TouK/nussknacker/pull/6591) The whole model can be reloaded with `POST /api/app/processingtype/reload` now - you can use this endpoint to reconfigure Nu processing types without need to restart the app
* [#6623](https://github.com/TouK/nussknacker/pull/6623) Added `sortedAscBy` and `reverse` functions to `#COLLECTION` helper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class BatchDataGenerationSpec

private val simpleBatchTableScenario = ScenarioBuilder
.streaming("SumTransactions")
.source("sourceId", "table", "Table" -> "'transactions'".spel)
.source("sourceId", "table", "Table" -> "'`default_catalog`.`default_database`.`transactions`'".spel)
.emptySink("end", "dead-end")

private val designerServiceUrl = "http://localhost:8080"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pl.touk.nussknacker.engine.flink.table.definition.MockableCatalogFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package pl.touk.nussknacker.engine.flink.table.definition

import org.apache.flink.table.catalog.{Catalog, GenericInMemoryCatalog}
import org.apache.flink.table.factories.CatalogFactory

class MockableCatalogFactory extends CatalogFactory {

override def factoryIdentifier(): String = MockableCatalogFactory.catalogName

override def createCatalog(context: CatalogFactory.Context): Catalog = MockableCatalogFactory.catalog

}

// Warning: this implementation can't be used by concurrent threads
object MockableCatalogFactory {

private val catalogName = "mockable"

@volatile
var catalog: GenericInMemoryCatalog = createCatalog

private def createCatalog = {
new GenericInMemoryCatalog(catalogName)
}

def resetCatalog(): Unit = {
catalog = createCatalog
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.touk.nussknacker.engine.flink.table.extractor
package pl.touk.nussknacker.engine.flink.table.definition

import org.scalatest.Inside.inside
import org.scalatest.funsuite.AnyFunSuite
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package pl.touk.nussknacker.engine.flink.table.extractor
package pl.touk.nussknacker.engine.flink.table.definition

import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.{Column, ResolvedSchema}
import cats.data.Validated.Invalid
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.api.{DataTypes, Schema}
import org.apache.flink.table.catalog._
import org.scalatest.LoneElement
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._
import pl.touk.nussknacker.engine.flink.table.TableTestCases.SimpleTable
import pl.touk.nussknacker.engine.flink.table._
import pl.touk.nussknacker.engine.flink.table.extractor.TablesDefinitionDiscoveryTest.invalidSqlStatements
import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage
import pl.touk.nussknacker.engine.flink.table.definition.TablesDefinitionDiscoveryTest.invalidSqlStatements
import pl.touk.nussknacker.engine.flink.table.utils.DataTypesExtensions._
import pl.touk.nussknacker.test.{PatientScalaFutures, ValidatedValuesDetailedMessage}

import scala.jdk.CollectionConverters._

Expand All @@ -19,11 +20,25 @@ class TablesDefinitionDiscoveryTest
with Matchers
with LoneElement
with ValidatedValuesDetailedMessage
with TableDrivenPropertyChecks {
with TableDrivenPropertyChecks
with PatientScalaFutures {

test("return error for empty flink data definition") {
FlinkDataDefinition.create(
sqlStatements = None,
catalogConfigurationOpt = None,
) should matchPattern { case Invalid(EmptyDataDefinition) =>
}
}

test("extracts configuration from valid sql statement") {
val statements = SqlStatementReader.readSql(SimpleTable.sqlStatement)
val discovery = TablesDefinitionDiscovery.prepareDiscovery(statements).validValue
val flinkDataDefinition = FlinkDataDefinition
.create(
sqlStatements = Some(SqlStatementReader.readSql(SimpleTable.sqlStatement)),
catalogConfigurationOpt = None,
)
.validValue
val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue
val tablesDefinitions = discovery.listTables
val tableDefinition = tablesDefinitions.loneElement
val sourceRowType = tableDefinition.sourceRowDataType.toLogicalRowTypeUnsafe
Expand Down Expand Up @@ -64,25 +79,53 @@ class TablesDefinitionDiscoveryTest
| 'connector' = 'datagen'
|);""".stripMargin

val statements = SqlStatementReader.readSql(statementsStr)
val discovery = TablesDefinitionDiscovery.prepareDiscovery(statements).validValue
val tablesDefinitions = discovery.listTables
val flinkDataDefinition = FlinkDataDefinition
.create(
sqlStatements = Some(SqlStatementReader.readSql(statementsStr)),
catalogConfigurationOpt = None,
)
.validValue
val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue
val tablesDefinition = discovery.listTables.loneElement

tablesDefinitions.loneElement shouldBe TableDefinition(
tableName,
ResolvedSchema.of(Column.physical("someString", DataTypes.STRING()))
)
tablesDefinition.tableId.toString shouldBe "`someCatalog`.`someDatabase`.`testTable2`"
tablesDefinition.schema shouldBe ResolvedSchema.of(Column.physical("someString", DataTypes.STRING()))
}

test("returns errors for statements that cannot be executed") {
invalidSqlStatements.foreach { invalidStatement =>
val parsedStatement = SqlStatementReader.readSql(invalidStatement)
val sqlStatementExecutionErrors = TablesDefinitionDiscovery.prepareDiscovery(parsedStatement).invalidValue
val flinkDataDefinition = FlinkDataDefinition
.create(
sqlStatements = Some(SqlStatementReader.readSql(invalidStatement)),
catalogConfigurationOpt = None,
)
.validValue
val sqlStatementExecutionErrors = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).invalidValue

sqlStatementExecutionErrors.size shouldBe 1
}
}

test("use catalog configuration in data definition") {
val catalogConfiguration = Configuration.fromMap(Map("type" -> "mockable").asJava)
val catalogTable = CatalogTable.of(
Schema.newBuilder().column("fooColumn", DataTypes.STRING()).build(),
null,
List.empty[String].asJava,
Map.empty[String, String].asJava
)
MockableCatalogFactory.resetCatalog()
MockableCatalogFactory.catalog.createTable(ObjectPath.fromString("default.fooTable"), catalogTable, false)
val flinkDataDefinition = FlinkDataDefinition.create(None, Some(catalogConfiguration)).validValue

val discovery = TablesDefinitionDiscovery.prepareDiscovery(flinkDataDefinition).validValue

val tableDefinition = discovery.listTables.loneElement

tableDefinition.tableId.toString shouldBe s"`${FlinkDataDefinition.internalCatalogName}`.`default`.`fooTable`"
tableDefinition.schema shouldBe ResolvedSchema.of(Column.physical("fooColumn", DataTypes.STRING()))
}

}

object TablesDefinitionDiscoveryTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ class TableFileSinkTest
| 'path' = 'file:///$datetimeExpressionOutputDirectory',
| 'format' = 'json'
|) LIKE `$datetimePingPongInputTableName`;
|
|CREATE DATABASE testdb;
|
|CREATE TABLE testdb.tablewithqualifiedname (
| `quantity` INT
|) WITH (
| 'connector' = 'datagen',
| 'number-of-rows' = '1'
|);
|""".stripMargin

private lazy val sqlTablesDefinitionFilePath = {
Expand Down Expand Up @@ -253,7 +262,11 @@ class TableFileSinkTest
test("should do file-to-file ping-pong for all basic types") {
val scenario = ScenarioBuilder
.streaming("test")
.source("start", "table", "Table" -> s"'$basicPingPongInputTableName'".spel)
.source(
"start",
"table",
"Table" -> s"'`default_catalog`.`default_database`.`$basicPingPongInputTableName`'".spel
)
.buildVariable(
"example-transformations",
"out",
Expand All @@ -270,7 +283,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$basicPingPongOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$basicPingPongOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> "#input".spel
)
Expand Down Expand Up @@ -305,11 +318,15 @@ class TableFileSinkTest
test("should be able to access virtual columns in input table") {
val scenario = ScenarioBuilder
.streaming("test")
.source("start", "table", "Table" -> s"'$virtualColumnInputTableName'".spel)
.source(
"start",
"table",
"Table" -> s"'`default_catalog`.`default_database`.`$virtualColumnInputTableName`'".spel
)
.emptySink(
"end",
"table",
"Table" -> s"'$virtualColumnOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$virtualColumnOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> "#input".spel
)
Expand Down Expand Up @@ -356,7 +373,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$basicExpressionOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$basicExpressionOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> basicTypesExpression
)
Expand All @@ -383,7 +400,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$oneColumnOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$oneColumnOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> valueExpression
)
Expand All @@ -397,7 +414,11 @@ class TableFileSinkTest
test("should do file-to-file ping-pong for advanced types") {
val scenario = ScenarioBuilder
.streaming("test")
.source("start", "table", "Table" -> s"'$advancedPingPongInputTableName'".spel)
.source(
"start",
"table",
"Table" -> s"'`default_catalog`.`default_database`.`$advancedPingPongInputTableName`'".spel
)
.buildVariable(
"example-transformations",
"out",
Expand All @@ -409,7 +430,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$advancedPingPongOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$advancedPingPongOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> "#input".spel
)
Expand Down Expand Up @@ -487,7 +508,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$advancedExpressionOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$advancedExpressionOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> valueExpression
)
Expand Down Expand Up @@ -526,7 +547,11 @@ class TableFileSinkTest
test("should do file-to-file ping-pong for datetime types") {
val scenario = ScenarioBuilder
.streaming("test")
.source("start", "table", "Table" -> s"'$datetimePingPongInputTableName'".spel)
.source(
"start",
"table",
"Table" -> s"'`default_catalog`.`default_database`.`$datetimePingPongInputTableName`'".spel
)
.buildVariable(
"example-transformations",
"out",
Expand All @@ -538,7 +563,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$datetimePingPongOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$datetimePingPongOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> "#input".spel
)
Expand Down Expand Up @@ -581,7 +606,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$datetimeExpressionOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$datetimeExpressionOutputTableName`'".spel,
"Raw editor" -> "false".spel,
"date" -> "T(java.time.LocalDate).parse('2024-01-01')".spel,
"time" -> "T(java.time.LocalTime).parse('12:01:02.000000003')".spel,
Expand Down Expand Up @@ -623,7 +648,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$genericsOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$genericsOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> valueExpression
)
Expand Down Expand Up @@ -654,7 +679,7 @@ class TableFileSinkTest
.emptySink(
"end",
"table",
"Table" -> s"'$oneColumnOutputTableName'".spel,
"Table" -> s"'`default_catalog`.`default_database`.`$oneColumnOutputTableName`'".spel,
"Raw editor" -> "true".spel,
"Value" -> valueExpression
)
Expand Down
Loading

0 comments on commit a3694a1

Please sign in to comment.