From 72808f27e0f8f470df5c9e462fd82f9544fded8d Mon Sep 17 00:00:00 2001 From: gskrobisz Date: Fri, 13 Sep 2024 11:47:13 +0200 Subject: [PATCH] [NU-1806] Test with BoundedSourceWithOffset --- .../api/component/NodesDeploymentData.scala | 37 ++-- .../description/DeploymentApiEndpoints.scala | 2 +- .../ui/api/ActivityInfoResourcesSpec.scala | 127 +++++++----- .../ComponentApiHttpServiceBusinessSpec.scala | 1 + .../ComponentApiHttpServiceSecuritySpec.scala | 4 + ...DeploymentApiHttpServiceBusinessSpec.scala | 2 +- ...tApiHttpServiceDeploymentCommentSpec.scala | 6 +- docs-internal/api/nu-designer-openapi.yaml | 189 ++++++++++-------- .../flink/util/source/CollectionSource.scala | 10 +- .../kafka/source/flink/FlinkKafkaSource.scala | 6 +- .../sample/DevProcessConfigCreator.scala | 13 +- .../sample/source/BoundedSource.scala | 41 +++- .../ModelDataActivityInfoProvider.scala | 44 ++-- 13 files changed, 308 insertions(+), 174 deletions(-) diff --git a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala index 8893f78eaa6..4f1e0196ec8 100644 --- a/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala +++ b/components-api/src/main/scala/pl/touk/nussknacker/engine/api/component/NodesDeploymentData.scala @@ -1,8 +1,9 @@ package pl.touk.nussknacker.engine.api.component -import io.circe.generic.JsonCodec -import io.circe.generic.extras.semiauto.{deriveUnwrappedDecoder, deriveUnwrappedEncoder} +import cats.syntax.functor._ import io.circe.{Decoder, Encoder} +import io.circe.generic.auto._ +import io.circe.syntax._ import pl.touk.nussknacker.engine.api.NodeId final case class NodesDeploymentData(dataByNodeId: Map[NodeId, NodeDeploymentData]) @@ -20,20 +21,24 @@ object NodesDeploymentData { } -@JsonCodec sealed trait NodeDeploymentData +sealed trait NodeDeploymentData final case class SqlFilteringExpression(sqlExpression: String) extends NodeDeploymentData -final case class KafkaSourceDeploymentData(offset: String) extends NodeDeploymentData - -//object NodeDeploymentData { -// -// implicit val nodeDeploymentDataEncoder: Encoder[NodeDeploymentData] = -// deriveUnwrappedEncoder[SqlFilteringExpression].contramap { case sqlExpression: SqlFilteringExpression => -// sqlExpression -// } -// -// implicit val nodeDeploymentDataDecoder: Decoder[NodeDeploymentData] = -// deriveUnwrappedDecoder[SqlFilteringExpression].map(identity) -// -//} +final case class KafkaSourceOffset(offset: Long) extends NodeDeploymentData + +object NodeDeploymentData { + + implicit val nodeDeploymentDataEncoder: Encoder[NodeDeploymentData] = + Encoder.instance { + case s: SqlFilteringExpression => s.asJson + case o: KafkaSourceOffset => o.asJson + } + + implicit val nodeDeploymentDataDecoder: Decoder[NodeDeploymentData] = + List[Decoder[NodeDeploymentData]]( + Decoder[SqlFilteringExpression].widen, + Decoder[KafkaSourceOffset].widen + ).reduceLeft(_ or _) + +} diff --git a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala index 971887ae8d2..8e14c688134 100644 --- a/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala +++ b/designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/DeploymentApiEndpoints.scala @@ -210,7 +210,7 @@ object DeploymentApiEndpoints { modifiedAt: Instant ) - implicit val nodeDeploymentDataCodec: Schema[NodeDeploymentData] = Schema.string[SqlFilteringExpression].as + implicit val nodeDeploymentDataCodec: Schema[NodeDeploymentData] = Schema.derived implicit val nodesDeploymentDataCodec: Schema[NodesDeploymentData] = Schema .schemaForMap[NodeId, NodeDeploymentData](_.id) diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala index 76694f952be..6fa9d2e521f 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ActivityInfoResourcesSpec.scala @@ -1,52 +1,89 @@ package pl.touk.nussknacker.ui.api -import akka.http.scaladsl.model.StatusCodes -import akka.http.scaladsl.testkit.ScalatestRouteTest -import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport -import io.circe.Json -import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.engine.api.graph.ScenarioGraph -import pl.touk.nussknacker.security.Permission -import pl.touk.nussknacker.test.base.it.NuResourcesTest -import pl.touk.nussknacker.test.config.WithSimplifiedDesignerConfig.TestProcessingType.Streaming -import pl.touk.nussknacker.test.utils.domain.ProcessTestData -import pl.touk.nussknacker.test.utils.domain.TestFactory.{mapProcessingTypeDataProvider, withPermissions} -import pl.touk.nussknacker.test.{EitherValuesDetailedMessage, PatientScalaFutures} -import pl.touk.nussknacker.test.utils.scalas.AkkaHttpExtensions.toRequestEntity +import io.restassured.RestAssured.`given` +import io.restassured.module.scala.RestAssuredSupport.AddThenToResponse +import org.hamcrest.Matchers.equalTo +import org.scalatest.freespec.AnyFreeSpecLike +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.test.base.it.{NuItTest, WithSimplifiedConfigScenarioHelper} +import pl.touk.nussknacker.test.config.{WithBusinessCaseRestAssuredUsersExtensions, WithSimplifiedDesignerConfig} +import pl.touk.nussknacker.test.{NuRestAssureMatchers, RestAssuredVerboseLoggingIfValidationFails} +import pl.touk.nussknacker.engine.spel.SpelExtension._ +import pl.touk.nussknacker.test.utils.domain.TestProcessUtil class ActivityInfoResourcesSpec - extends AnyFunSuite - with ScalatestRouteTest - with Matchers - with FailFastCirceSupport - with NuResourcesTest - with PatientScalaFutures - with EitherValuesDetailedMessage { - - private val scenarioGraph: ScenarioGraph = ProcessTestData.sampleScenarioGraph - private val testPermissionAll = List(Permission.Deploy, Permission.Read, Permission.Write) - - private def route() = new ActivityInfoResources( - processService, - mapProcessingTypeDataProvider( - Streaming.stringify -> createScenarioActivityService - ) - ) - - test("get activity parameters") { - saveProcess(scenarioGraph) { - Post( - s"/activityInfo/${ProcessTestData.sampleProcessName}/activityParameters", - scenarioGraph.toJsonRequestEntity() - ) ~> withPermissions( - route(), - testPermissionAll: _* - ) ~> check { - status shouldEqual StatusCodes.OK - val content = entityAs[Json].noSpaces - content shouldBe """{}""" - } + extends AnyFreeSpecLike + with NuItTest + with WithSimplifiedDesignerConfig + with WithSimplifiedConfigScenarioHelper + with WithBusinessCaseRestAssuredUsersExtensions + with NuRestAssureMatchers + with RestAssuredVerboseLoggingIfValidationFails { + + "The scenario activity info endpoint when" - { + "return activity parameters when defined" in { + val scenario = ScenarioBuilder + .streaming("scenarioWithSourceWithDeployParameters") + .source("sourceWithParametersId", "boundedSourceWithOffset", "elements" -> "{'one', 'two', 'three'}".spel) + .emptySink("exampleSinkId", "emptySink") + + given() + .applicationState { + createSavedScenario(scenario) + } + .when() + .basicAuthAllPermUser() + .jsonBody(TestProcessUtil.toJson(scenario).noSpaces) + .post(s"$nuDesignerHttpAddress/api/activityInfo/${scenario.name.value}/activityParameters") + .Then() + .statusCode(200) + .body( + "DEPLOY[0].sourceId", + equalTo("sourceWithParametersId"), + "DEPLOY[0].parameters[0].name", + equalTo("offset"), + "DEPLOY[0].parameters[0].typ.display", + equalTo("Long") + ) + } + + "return empty map when no activity parameters" in { + val scenario = ScenarioBuilder + .streaming("scenarioWithoutParameters") + .source("sourceNoParamsId", "boundedSource", "elements" -> "{'one', 'two', 'three'}".spel) + .emptySink("exampleSinkId", "emptySink") + + given() + .applicationState { + createSavedScenario(scenario) + } + .when() + .basicAuthAllPermUser() + .jsonBody(TestProcessUtil.toJson(scenario).noSpaces) + .post(s"$nuDesignerHttpAddress/api/activityInfo/${scenario.name.value}/activityParameters") + .Then() + .statusCode(200) + .equalsJsonBody( + "{}" + ) + } + + "return no data found when there is no scenario" in { + val scenario = ScenarioBuilder + .streaming("invalidScenario") + .source("exampleSource", "boundedSource", "elements" -> "{'one', 'two', 'three'}".spel) + .emptySink("exampleSinkId", "emptySink") + + given() + .when() + .basicAuthAllPermUser() + .jsonBody(TestProcessUtil.toJson(scenario).noSpaces) + .post(s"$nuDesignerHttpAddress/api/activityInfo/${scenario.name.value}/activityParameters") + .Then() + .statusCode(404) + .equalsPlainBody( + s"No scenario ${scenario.name.value} found" + ) } } diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceBusinessSpec.scala index 449d2f851a2..549c7ca717f 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceBusinessSpec.scala @@ -178,6 +178,7 @@ class ComponentApiHttpServiceBusinessSpec "streaming-sink-monitor", "streaming-sink-sendsms", "streaming-source-boundedsource", + "streaming-source-boundedsourcewithoffset", "streaming-source-classinstancesource", "streaming-source-communicationsource", "streaming-source-csv-source", diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceSecuritySpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceSecuritySpec.scala index 218897b3200..efbe512da75 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceSecuritySpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/ComponentApiHttpServiceSecuritySpec.scala @@ -262,6 +262,7 @@ class ComponentApiHttpServiceSecuritySpec "streaming1-sink-monitor", "streaming1-sink-sendsms", "streaming1-source-boundedsource", + "streaming1-source-boundedsourcewithoffset", "streaming1-source-classinstancesource", "streaming1-source-communicationsource", "streaming1-source-csv-source", @@ -327,6 +328,7 @@ class ComponentApiHttpServiceSecuritySpec "streaming2-sink-monitor", "streaming2-sink-sendsms", "streaming2-source-boundedsource", + "streaming2-source-boundedsourcewithoffset", "streaming2-source-classinstancesource", "streaming2-source-communicationsource", "streaming2-source-csv-source", @@ -391,6 +393,7 @@ class ComponentApiHttpServiceSecuritySpec "streaming1-sink-monitor", "streaming1-sink-sendsms", "streaming1-source-boundedsource", + "streaming1-source-boundedsourcewithoffset", "streaming1-source-classinstancesource", "streaming1-source-communicationsource", "streaming1-source-csv-source", @@ -448,6 +451,7 @@ class ComponentApiHttpServiceSecuritySpec "streaming2-sink-monitor", "streaming2-sink-sendsms", "streaming2-source-boundedsource", + "streaming2-source-boundedsourcewithoffset", "streaming2-source-classinstancesource", "streaming2-source-communicationsource", "streaming2-source-csv-source", diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala index 5f12530554a..c4620212221 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceBusinessSpec.scala @@ -65,7 +65,7 @@ class DeploymentApiHttpServiceBusinessSpec private val correctDeploymentRequest = s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": "`date` = '2024-01-01'" + | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} | } |}""".stripMargin diff --git a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala index aedab7354bb..e6042ead2a1 100644 --- a/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala +++ b/designer/server/src/test/scala/pl/touk/nussknacker/ui/api/DeploymentApiHttpServiceDeploymentCommentSpec.scala @@ -79,7 +79,7 @@ class DeploymentApiHttpServiceDeploymentCommentSpec .jsonBody(s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": "`date` = '2024-01-01'" + | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} | } |}""".stripMargin) .put(s"$nuDesignerHttpAddress/api/deployments/${DeploymentId.generate}") @@ -99,7 +99,7 @@ class DeploymentApiHttpServiceDeploymentCommentSpec .jsonBody(s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": "`date` = '2024-01-01'" + | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} | }, | "comment": "deployment comment not matching configured pattern" |}""".stripMargin) @@ -121,7 +121,7 @@ class DeploymentApiHttpServiceDeploymentCommentSpec .jsonBody(s"""{ | "scenarioName": "$scenarioName", | "nodesDeploymentData": { - | "$sourceNodeId": "`date` = '2024-01-01'" + | "$sourceNodeId": {"sqlExpression":"`date` = '2024-01-01'"} | }, | "comment": "comment with $configuredPhrase" |}""".stripMargin) diff --git a/docs-internal/api/nu-designer-openapi.yaml b/docs-internal/api/nu-designer-openapi.yaml index 5e78df23cba..980f48a66f8 100644 --- a/docs-internal/api/nu-designer-openapi.yaml +++ b/docs-internal/api/nu-designer-openapi.yaml @@ -229,87 +229,6 @@ paths: security: - {} - httpAuth: [] - /api/processManagement/customAction/{scenarioName}/validation: - post: - tags: - - CustomAction - summary: Endpoint to validate input in custom action fields - operationId: postApiProcessmanagementCustomactionScenarionameValidation - parameters: - - name: Nu-Impersonate-User-Identity - in: header - required: false - schema: - type: - - string - - 'null' - - name: scenarioName - in: path - required: true - schema: - type: string - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/CustomActionRequest' - required: true - responses: - '200': - description: '' - content: - application/json: - schema: - $ref: '#/components/schemas/CustomActionValidationDto' - '400': - description: 'Invalid value for: header Nu-Impersonate-User-Identity, Invalid - value for: body' - content: - text/plain: - schema: - type: string - '401': - description: '' - content: - text/plain: - schema: - type: string - examples: - Example: - summary: Authentication failed - value: The supplied authentication is invalid - '403': - description: '' - content: - text/plain: - schema: - type: string - examples: - Example: - summary: Authorization failed - value: The supplied authentication is not authorized to access this - resource - '404': - description: Identity provided in the Nu-Impersonate-User-Identity header - did not match any user - content: - text/plain: - schema: - type: string - '501': - description: Impersonation is not supported for defined authentication mechanism - content: - text/plain: - schema: - type: string - examples: - Example: - summary: Cannot authenticate impersonated user as impersonation - is not supported by the authentication mechanism - value: Provided authentication method does not support impersonation - security: - - {} - - httpAuth: [] /api/app/healthCheck: get: tags: @@ -898,7 +817,8 @@ paths: example: scenarioName: scenario1 nodesDeploymentData: - sourceNodeId1: field1 = 'value' + sourceNodeId1: + sqlExpression: field1 = 'value' required: true responses: '202': @@ -1154,6 +1074,87 @@ paths: security: - {} - httpAuth: [] + /api/processManagement/customAction/{scenarioName}/validation: + post: + tags: + - CustomAction + summary: Endpoint to validate input in custom action fields + operationId: postApiProcessmanagementCustomactionScenarionameValidation + parameters: + - name: Nu-Impersonate-User-Identity + in: header + required: false + schema: + type: + - string + - 'null' + - name: scenarioName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/CustomActionRequest' + required: true + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/CustomActionValidationDto' + '400': + description: 'Invalid value for: header Nu-Impersonate-User-Identity, Invalid + value for: body' + content: + text/plain: + schema: + type: string + '401': + description: '' + content: + text/plain: + schema: + type: string + examples: + Example: + summary: Authentication failed + value: The supplied authentication is invalid + '403': + description: '' + content: + text/plain: + schema: + type: string + examples: + Example: + summary: Authorization failed + value: The supplied authentication is not authorized to access this + resource + '404': + description: Identity provided in the Nu-Impersonate-User-Identity header + did not match any user + content: + text/plain: + schema: + type: string + '501': + description: Impersonation is not supported for defined authentication mechanism + content: + text/plain: + schema: + type: string + examples: + Example: + summary: Cannot authenticate impersonated user as impersonation + is not supported by the authentication mechanism + value: Provided authentication method does not support impersonation + security: + - {} + - httpAuth: [] /api/migrate: post: tags: @@ -3548,6 +3549,15 @@ components: JsonParameterEditor: title: JsonParameterEditor type: object + KafkaSourceOffset: + title: KafkaSourceOffset + type: object + required: + - offset + properties: + offset: + type: integer + format: int64 LayoutData: title: LayoutData type: object @@ -3603,7 +3613,7 @@ components: title: Map_NodeId_NodeDeploymentData type: object additionalProperties: - type: string + $ref: '#/components/schemas/NodeDeploymentData' Map_String: title: Map_String type: object @@ -3992,6 +4002,11 @@ components: type: string type: $ref: '#/components/schemas/NodeTypes12' + NodeDeploymentData: + title: NodeDeploymentData + oneOf: + - $ref: '#/components/schemas/KafkaSourceOffset' + - $ref: '#/components/schemas/SqlFilteringExpression' NodeTypes: title: NodeTypes type: string @@ -4527,6 +4542,14 @@ components: SpelTemplateParameterEditor: title: SpelTemplateParameterEditor type: object + SqlFilteringExpression: + title: SqlFilteringExpression + type: object + required: + - sqlExpression + properties: + sqlExpression: + type: string SqlParameterEditor: title: SqlParameterEditor type: object diff --git a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/source/CollectionSource.scala b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/source/CollectionSource.scala index 1482658f7bb..5e996846743 100644 --- a/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/source/CollectionSource.scala +++ b/engine/flink/components-utils/src/main/scala/pl/touk/nussknacker/engine/flink/util/source/CollectionSource.scala @@ -26,10 +26,18 @@ case class CollectionSource[T]( ) extends StandardFlinkSource[T] with ReturningType { - @silent("deprecated") override def sourceStream( env: StreamExecutionEnvironment, flinkNodeContext: FlinkCustomNodeContext + ): DataStreamSource[T] = { + createSourceStream(list, env, flinkNodeContext) + } + + @silent("deprecated") + protected def createSourceStream[T]( + list: List[T], + env: StreamExecutionEnvironment, + flinkNodeContext: FlinkCustomNodeContext ): DataStreamSource[T] = { val typeInformation = TypeInformationDetection.instance.forType[T](returnType) boundedness match { diff --git a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala index c95f4188017..8d2f6265f5f 100644 --- a/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala +++ b/engine/flink/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/source/flink/FlinkKafkaSource.scala @@ -11,7 +11,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaConsumerBase} import org.apache.kafka.clients.consumer.ConsumerRecord import pl.touk.nussknacker.engine.api.NodeId -import pl.touk.nussknacker.engine.api.component.KafkaSourceDeploymentData +import pl.touk.nussknacker.engine.api.component.KafkaSourceOffset import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy @@ -84,7 +84,7 @@ class FlinkKafkaSource[T]( override def activityParametersDefinition: Map[String, List[Parameter]] = Map( ScenarioActionName.Deploy.value -> List( - Parameter(ParameterName("offset"), Typed.apply[String]) + Parameter(ParameterName("offset"), Typed.apply[Long]) ) ) @@ -94,7 +94,7 @@ class FlinkKafkaSource[T]( flinkNodeContext: FlinkCustomNodeContext ): SourceFunction[T] = { // TODO: handle deployment parameters -> offset - val deploymentDataOpt = flinkNodeContext.nodeDeploymentData.collect { case d: KafkaSourceDeploymentData => d } + val deploymentDataOpt = flinkNodeContext.nodeDeploymentData.collect { case d: KafkaSourceOffset => d } topics.toList.foreach(KafkaUtils.setToLatestOffsetIfNeeded(kafkaConfig, _, consumerGroupId)) createFlinkSource(consumerGroupId, flinkNodeContext) } diff --git a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala index 0ce3696e419..21a1772d617 100644 --- a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala +++ b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala @@ -91,12 +91,13 @@ class DevProcessConfigCreator extends ProcessConfigCreator { )(TypeInformation.of(classOf[SampleProduct])) ) ), - "kafka-transaction" -> all(SourceFactory.noParamUnboundedStreamFactory[String](new NoEndingSource)), - "boundedSource" -> all(BoundedSource), - "oneSource" -> categories(SourceFactory.noParamUnboundedStreamFactory[String](new OneSource)), - "communicationSource" -> categories(DynamicParametersSource), - "csv-source" -> categories(SourceFactory.noParamUnboundedStreamFactory[CsvRecord](new CsvSource)), - "csv-source-lite" -> categories(SourceFactory.noParamUnboundedStreamFactory[CsvRecord](new LiteCsvSource(_))), + "kafka-transaction" -> all(SourceFactory.noParamUnboundedStreamFactory[String](new NoEndingSource)), + "boundedSource" -> all(BoundedSource), + "boundedSourceWithOffset" -> all(BoundedSourceWithOffset), + "oneSource" -> categories(SourceFactory.noParamUnboundedStreamFactory[String](new OneSource)), + "communicationSource" -> categories(DynamicParametersSource), + "csv-source" -> categories(SourceFactory.noParamUnboundedStreamFactory[CsvRecord](new CsvSource)), + "csv-source-lite" -> categories(SourceFactory.noParamUnboundedStreamFactory[CsvRecord](new LiteCsvSource(_))), "genericSourceWithCustomVariables" -> categories(GenericSourceWithCustomVariablesSample), "sql-source" -> categories(SqlSource), "classInstanceSource" -> all(new ReturningClassInstanceSource) diff --git a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala index 05a090f6347..62d680bdbe4 100644 --- a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala +++ b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/source/BoundedSource.scala @@ -1,9 +1,15 @@ package pl.touk.nussknacker.engine.management.sample.source -import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent -import pl.touk.nussknacker.engine.api.process.SourceFactory -import pl.touk.nussknacker.engine.api.typed.typing.Unknown +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import pl.touk.nussknacker.engine.api.component.{KafkaSourceOffset, UnboundedStreamComponent} +import pl.touk.nussknacker.engine.api.definition.Parameter +import pl.touk.nussknacker.engine.api.deployment.ScenarioActionName +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.process.{SourceFactory, WithActivityParameters} +import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown} import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName} +import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomNodeContext import pl.touk.nussknacker.engine.flink.util.source.CollectionSource import scala.jdk.CollectionConverters._ @@ -15,3 +21,32 @@ object BoundedSource extends SourceFactory with UnboundedStreamComponent { new CollectionSource[Any](elements.asScala.toList, None, Unknown) } + +object BoundedSourceWithOffset extends SourceFactory with UnboundedStreamComponent { + + @MethodToInvoke + def source(@ParamName("elements") elements: java.util.List[Any]) = + new CollectionSource[Any](elements.asScala.toList, None, Unknown) with WithActivityParameters { + + override def activityParametersDefinition: Map[String, List[Parameter]] = Map( + ScenarioActionName.Deploy.value -> List( + Parameter(ParameterName("offset"), Typed.apply[Long]) + ) + ) + + override protected def createSourceStream[T]( + list: List[T], + env: StreamExecutionEnvironment, + flinkNodeContext: FlinkCustomNodeContext + ): DataStreamSource[T] = { + val deploymentDataOpt = flinkNodeContext.nodeDeploymentData.collect { case d: KafkaSourceOffset => d } + val elementsWithOffset = deploymentDataOpt match { + case Some(data) => list.drop(data.offset.toInt) + case _ => list + } + super.createSourceStream(elementsWithOffset, env, flinkNodeContext) + } + + } + +} diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala index f74d02717d3..bfe87bb6773 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/definition/activity/ModelDataActivityInfoProvider.scala @@ -8,9 +8,9 @@ import pl.touk.nussknacker.engine.canonicalgraph.CanonicalProcess import pl.touk.nussknacker.engine.compile.ExpressionCompiler import pl.touk.nussknacker.engine.compile.nodecompilation.{LazyParameterCreationStrategy, NodeCompiler} import pl.touk.nussknacker.engine.definition.fragment.FragmentParametersDefinitionExtractor -import pl.touk.nussknacker.engine.graph.node.{SourceNodeData, asSource} +import pl.touk.nussknacker.engine.graph.node.{SourceNodeData, asFragmentInputDefinition, asSource} import pl.touk.nussknacker.engine.resultcollector.ProductionServiceInvocationCollector -import shapeless.syntax.typeable._ +import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap class ModelDataActivityInfoProvider(modelData: ModelData) extends ActivityInfoProvider { @@ -35,17 +35,37 @@ class ModelDataActivityInfoProvider(modelData: ModelData) extends ActivityInfoPr override def getActivityParameters(scenario: CanonicalProcess): Map[String, Map[String, List[Parameter]]] = { modelData.withThisAsContextClassLoader { - val asdf = scenario.collectAllNodes.flatMap(asSource) - val compiledSources = for { - source <- scenario.collectAllNodes.flatMap(asSource) - sourceObj <- prepareSourceObj(source)(scenario.metaData, NodeId(source.id)) - } yield sourceObj - val stefan = compiledSources - .flatMap(_.cast[WithActivityParameters]) - .map(_.activityParametersDefinition) - - Map.empty + val nodeToActivityToParameters = collectAllSources(scenario) + .map(source => source.id -> getActivityParameters(source, scenario.metaData)) + .toMap + groupByActivity(nodeToActivityToParameters) } } + private def groupByActivity( + nodeToActivityToParameters: Map[String, Map[String, List[Parameter]]] + ): Map[String, Map[String, List[Parameter]]] = { + val activityToNodeToParameters = for { + (node, activityToParams) <- nodeToActivityToParameters.toList + (activity, params) <- activityToParams.toList + } yield (activity, node -> params) + activityToNodeToParameters + .groupBy(_._1) + .mapValuesNow(_.map(_._2).toMap) + } + + private def getActivityParameters(source: SourceNodeData, metaData: MetaData): Map[String, List[Parameter]] = { + modelData.withThisAsContextClassLoader { + val compiledSource = prepareSourceObj(source)(metaData, NodeId(source.id)) + compiledSource match { + case Some(s: WithActivityParameters) => s.activityParametersDefinition + case _ => Map.empty + } + } + } + + private def collectAllSources(scenario: CanonicalProcess): List[SourceNodeData] = { + scenario.collectAllNodes.flatMap(asSource) ++ scenario.collectAllNodes.flatMap(asFragmentInputDefinition) + } + }