From f02e9dfbad83a80e989db6727eb7580ef248597e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Wed, 2 Oct 2024 11:11:22 +0200 Subject: [PATCH] http enricher --- build.sbt | 30 +++ ...ker.engine.api.component.ComponentProvider | 1 + .../http/HttpEnricherComponentProvider.scala | 22 ++ .../nussknacker/http/HttpEnricherConfig.scala | 64 +++++ .../http/client/HttpClientProvider.scala | 26 ++ .../http/client/SharedHttpClient.scala | 49 ++++ .../http/enricher/HttpEnricher.scala | 157 +++++++++++ .../http/enricher/HttpEnricherFactory.scala | 182 +++++++++++++ .../http/enricher/HttpEnricherOutput.scala | 96 +++++++ .../enricher/HttpEnricherParameters.scala | 120 +++++++++ .../http/enricher/MapExtensions.scala | 16 ++ .../http/HttpEnricherBodyTest.scala | 248 ++++++++++++++++++ .../http/HttpEnricherConfigTest.scala | 46 ++++ .../http/HttpEnricherHeadersTest.scala | 223 ++++++++++++++++ .../http/HttpEnricherOutputTest.scala | 95 +++++++ .../http/HttpEnricherTestSuite.scala | 71 +++++ .../http/HttpEnricherURLTest.scala | 132 ++++++++++ .../src/universal/conf/dev-application.conf | 3 + .../DynamicNodeValidator.scala | 2 - 19 files changed, 1581 insertions(+), 2 deletions(-) create mode 100644 components/http/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherComponentProvider.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherConfig.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/client/HttpClientProvider.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/client/SharedHttpClient.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricher.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherFactory.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherOutput.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherParameters.scala create mode 100644 components/http/src/main/scala/pl/touk/nussknacker/http/enricher/MapExtensions.scala create mode 100644 components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherBodyTest.scala create mode 100644 components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherConfigTest.scala create mode 100644 components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherHeadersTest.scala create mode 100644 components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherOutputTest.scala create mode 100644 components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherTestSuite.scala create mode 100644 components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherURLTest.scala diff --git a/build.sbt b/build.sbt index d83e32a5ed6..2cd4ca7fd60 100644 --- a/build.sbt +++ b/build.sbt @@ -498,6 +498,7 @@ lazy val distribution: Project = sbt (liteKafkaComponents / assembly).value -> "components/lite/liteKafka.jar", (liteRequestResponseComponents / assembly).value -> "components/lite/liteRequestResponse.jar", (openapiComponents / assembly).value -> "components/common/openapi.jar", + (httpComponents / assembly).value -> "components/common/http.jar", (sqlComponents / assembly).value -> "components/common/sql.jar", ) }, @@ -1430,6 +1431,7 @@ lazy val liteEngineRuntimeApp: Project = (project in lite("runtime-app")) (liteKafkaComponents / assembly).value -> "components/lite/liteKafka.jar", (liteRequestResponseComponents / assembly).value -> "components/lite/liteRequestResponse.jar", (openapiComponents / assembly).value -> "components/common/openapi.jar", + (httpComponents / assembly).value -> "components/common/http.jar", (sqlComponents / assembly).value -> "components/common/sql.jar", ), javaAgents += JavaAgent("io.prometheus.jmx" % "jmx_prometheus_javaagent" % jmxPrometheusJavaagentV % "dist"), @@ -1720,6 +1722,33 @@ lazy val openapiComponents = (project in component("openapi")) flinkComponentsTestkit % "it,test" ) +lazy val httpComponents = (project in component("http")) + .settings(commonSettings) + .settings(assemblyNoScala("http.jar"): _*) + .settings(publishAssemblySettings: _*) + .settings( + name := "nussknacker-http", + libraryDependencies ++= Seq( + "org.apache.flink" % "flink-streaming-java" % flinkV % Provided, + "com.beachape" %% "enumeratum" % enumeratumV % Provided, + "org.scalatest" %% "scalatest" % scalaTestV % Test, + "org.wiremock" % "wiremock" % wireMockV % Test, + "com.softwaremill.sttp.client" %% "circe" % "2.2.0", + "io.swagger.core.v3" % "swagger-integration" % swaggerIntegrationV excludeAll ( + ExclusionRule(organization = "jakarta.activation"), + ExclusionRule(organization = "jakarta.validation") + ), + ), + ) + .dependsOn( + httpUtils, + componentsUtils % Provided, + jsonUtils % Provided, + commonUtils % Provided, + requestResponseComponentsUtils % Test, + flinkComponentsTestkit % Test + ) + lazy val sqlComponents = (project in component("sql")) .settings(commonSettings) .settings(assemblyNoScala("sql.jar"): _*) @@ -2140,6 +2169,7 @@ lazy val modules = List[ProjectReference]( flinkTableApiComponents, defaultModel, openapiComponents, + httpComponents, scenarioCompiler, benchmarks, kafkaUtils, diff --git a/components/http/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider b/components/http/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider new file mode 100644 index 00000000000..466812fc7a2 --- /dev/null +++ b/components/http/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider @@ -0,0 +1 @@ +pl.touk.nussknacker.http.HttpEnricherComponentProvider diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherComponentProvider.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherComponentProvider.scala new file mode 100644 index 00000000000..5c9b2ba8ea9 --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherComponentProvider.scala @@ -0,0 +1,22 @@ +package pl.touk.nussknacker.http + +import com.typesafe.config.Config +import com.typesafe.scalalogging.LazyLogging +import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion} +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.http.enricher.HttpEnricherFactory + +class HttpEnricherComponentProvider extends ComponentProvider with LazyLogging { + override def providerName: String = "http" + + override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = { + val conf = HttpEnricherConfig.parse(config) + ComponentDefinition("http", new HttpEnricherFactory(conf)) :: Nil + } + + override def isCompatible(version: NussknackerVersion): Boolean = true + + override def resolveConfigForExecution(config: Config): Config = config + + override def isAutoLoaded: Boolean = true +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherConfig.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherConfig.scala new file mode 100644 index 00000000000..86def1c3e98 --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/HttpEnricherConfig.scala @@ -0,0 +1,64 @@ +package pl.touk.nussknacker.http + +import com.typesafe.config.Config +import net.ceedubs.ficus.readers.ValueReader +import pl.touk.nussknacker.http.backend.{DefaultHttpClientConfig, HttpClientConfig} +import pl.touk.nussknacker.http.enricher.HttpEnricher.ApiKeyConfig.{ApiKeyInCookie, ApiKeyInHeader, ApiKeyInQuery} +import pl.touk.nussknacker.http.enricher.HttpEnricher.HttpMethod.{DELETE, GET, POST, PUT} +import pl.touk.nussknacker.http.enricher.HttpEnricher.{ApiKeyConfig, HttpMethod} + +import java.net.URL + +final case class HttpEnricherConfig( + rootUrl: Option[URL], + security: Option[List[ApiKeyConfig]], + httpClientConfig: HttpClientConfig, + allowedMethods: List[HttpMethod] +) + +object HttpEnricherConfig { + import net.ceedubs.ficus.Ficus._ + import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig + + implicit val apiKeyReader: ValueReader[ApiKeyConfig] = ValueReader.relative(conf => { + val name = conf.as[String]("name") + val value = conf.as[String]("value") + conf.as[String]("in") match { + case "query" => ApiKeyInQuery(name, value) + case "header" => ApiKeyInHeader(name, value) + case "cookie" => ApiKeyInCookie(name, value) + } + }) + + implicit val httpMethodReader: ValueReader[HttpMethod] = ValueReader.relative(conf => { + import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader + conf.rootAs[HttpMethod] + }) + + implicit val configValueReader: ValueReader[HttpEnricherConfig] = ValueReader.relative(conf => { + HttpEnricherConfig( + // TODO decision: add '/' in reader if not present? or during evaluation? + rootUrl = optionValueReader[URL].read(conf, "rootUrl").map { url => + if (url.getQuery != null) { + throw new IllegalArgumentException("Root URL for HTTP enricher has to be without query parameters.") + } else { + url + } + }, + security = optionValueReader[List[ApiKeyConfig]].read(conf, "security"), + httpClientConfig = + optionValueReader[HttpClientConfig].read(conf, "httpClientConfig").getOrElse(DefaultHttpClientConfig()), + allowedMethods = { + val methods = optionValueReader[List[HttpMethod]].read(conf, "allowedMethods").getOrElse(DefaultAllowedMethods) + if (methods.isEmpty) { + throw new IllegalArgumentException("Allowed methods cannot be empty.") + } + methods + } + ) + }) + + private val DefaultAllowedMethods = List(GET, POST, PUT, DELETE) + + private[http] def parse(config: Config) = config.rootAs[HttpEnricherConfig] +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/client/HttpClientProvider.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/client/HttpClientProvider.scala new file mode 100644 index 00000000000..fa838a341a7 --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/client/HttpClientProvider.scala @@ -0,0 +1,26 @@ +package pl.touk.nussknacker.http.client + +import org.asynchttpclient.DefaultAsyncHttpClient +import pl.touk.nussknacker.http.backend.{FixedAsyncHttpClientBackendProvider, HttpBackendProvider, HttpClientConfig} + +import scala.util.Try + +// TODO decision: Copied from OpenAPI enricher - what to do about this? +object HttpClientProvider { + + def getBackendProvider(httpClientConfig: HttpClientConfig): HttpBackendProvider = { + val isFlinkBased = Try( + getClass.getClassLoader + .loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment") + ).isSuccess + if (isFlinkBased) { + new SharedHttpClientBackendProvider(httpClientConfig) + } else { + // TODO: figure out how to create client only once and enable its closing. Also: do we want to pass processId here? + // Should client be one per engine deployment, or per scenario? + val httpClient = new DefaultAsyncHttpClient(httpClientConfig.toAsyncHttpClientConfig(None).build()) + new FixedAsyncHttpClientBackendProvider(httpClient) + } + } + +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/client/SharedHttpClient.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/client/SharedHttpClient.scala new file mode 100644 index 00000000000..12ad9100640 --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/client/SharedHttpClient.scala @@ -0,0 +1,49 @@ +package pl.touk.nussknacker.http.client + +import com.typesafe.scalalogging.LazyLogging +import org.asynchttpclient.{AsyncHttpClient, DefaultAsyncHttpClient} +import pl.touk.nussknacker.engine.api.MetaData +import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext +import pl.touk.nussknacker.engine.util.sharedservice.{SharedService, SharedServiceHolder} +import pl.touk.nussknacker.http.backend.{HttpBackendProvider, HttpClientConfig} +import sttp.client3.SttpBackend +import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend + +import scala.concurrent.{ExecutionContext, Future} + +// TODO decision: Copied from OpenAPI enricher - what to do about this? +private[client] class SharedHttpClientBackendProvider(httpClientConfig: HttpClientConfig) + extends HttpBackendProvider + with LazyLogging { + + private var httpClient: SharedHttpClient = _ + + override def open(context: EngineRuntimeContext): Unit = { + httpClient = SharedHttpClientBackendProvider.retrieveService(httpClientConfig)(context.jobData.metaData) + } + + override def httpBackendForEc(implicit ec: ExecutionContext): SttpBackend[Future, Any] = + AsyncHttpClientFutureBackend.usingClient(httpClient.httpClient) + + override def close(): Unit = Option(httpClient).foreach(_.close()) + +} + +private[client] object SharedHttpClientBackendProvider extends SharedServiceHolder[HttpClientConfig, SharedHttpClient] { + + override protected def createService(config: HttpClientConfig, metaData: MetaData): SharedHttpClient = { + val httpClientConfig = config.toAsyncHttpClientConfig(Option(metaData.name)) + new SharedHttpClient(new DefaultAsyncHttpClient(httpClientConfig.build()), config) + } + +} + +private[client] class SharedHttpClient(val httpClient: AsyncHttpClient, config: HttpClientConfig) + extends SharedService[HttpClientConfig] { + + override def creationData: HttpClientConfig = config + + override protected def sharedServiceHolder: SharedHttpClientBackendProvider.type = SharedHttpClientBackendProvider + + override def internalClose(): Unit = httpClient.close() +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricher.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricher.scala new file mode 100644 index 00000000000..c7467617a9a --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricher.scala @@ -0,0 +1,157 @@ +package pl.touk.nussknacker.http.enricher + +import enumeratum.{Enum, EnumEntry} +import pl.touk.nussknacker.engine.api.exception.NonTransientException +import pl.touk.nussknacker.engine.api.process.ComponentUseCase +import pl.touk.nussknacker.engine.api.test.InvocationCollectors +import pl.touk.nussknacker.engine.api.{Context, Params, ServiceInvoker} +import pl.touk.nussknacker.engine.util.json.ToJsonEncoder +import pl.touk.nussknacker.http.backend.HttpBackendProvider +import pl.touk.nussknacker.http.enricher.HttpEnricher.ApiKeyConfig._ +import pl.touk.nussknacker.http.enricher.HttpEnricher.{ApiKeyConfig, Body, BodyType, HttpMethod, buildURL, jsonEncoder} +import pl.touk.nussknacker.http.enricher.HttpEnricherParameters.BodyParam +import sttp.client3.basicRequest +import sttp.client3.circe._ +import sttp.model.{Header, Method, QueryParams, Uri} + +import java.net.URL +import scala.collection.immutable +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters._ +import scala.util.Try + +class HttpEnricher( + clientProvider: HttpBackendProvider, + params: Params, + method: HttpMethod, + bodyType: BodyType, + rootUrl: Option[URL], + securityConfig: List[ApiKeyConfig] +) extends ServiceInvoker { + + override def invoke(context: Context)( + implicit ec: ExecutionContext, + collector: InvocationCollectors.ServiceInvocationCollector, + componentUseCase: ComponentUseCase + ): Future[AnyRef] = { + val url = { + val urlParam = HttpEnricherParameters.UrlParam.extractor(context, params) + val queryParamsFromParam: QueryParams = HttpEnricherParameters.QueryParamsParam.extractor(context, params) match { + case null => QueryParams() + case jMap => QueryParams.fromMap(jMap.asScala.toMap) + } + val queryParamsApiKeys = securityConfig.collect { case q: ApiKeyInQuery => q.name -> q.value }.toMap + val allQueryParams = queryParamsFromParam.param(queryParamsApiKeys) + buildURL(rootUrl, urlParam, allQueryParams).fold(ex => throw ex, identity) + } + + val headers: List[Header] = HttpEnricherParameters.HeadersParam.extractor(context, params) match { + case null => List.empty + case jMap => + jMap.asScala.toMap.map { case (k, v) => + Header(k, v) + }.toList + } + + val body = BodyParam.extractor(context, params, bodyType) + + val request = buildRequest(method.sttpMethod, url, body, headers, securityConfig) + + val httpClient = clientProvider.httpBackendForEc + val response = httpClient.send(request) + + response.map(res => HttpEnricherOutput.buildOutput(res, body)) + } + + private def buildRequest( + method: Method, + url: Uri, + body: Option[Body], + headers: List[Header], + securityConfig: List[ApiKeyConfig] + ) = { + val baseRequest = basicRequest.method(method, url) + val requestWithAppliedBody = body match { + case Some(Body(body, BodyType.JSON)) => + val json = jsonEncoder.encode(body) + baseRequest.body(json) + case Some(Body(body, BodyType.PlainText)) => + body match { + case strBody: String => baseRequest.body(strBody) + case other => + throw NonTransientException( + BodyType.PlainText.name, + s"Declared type of request body does not match its value. Expected String. Got: ${other.getClass}" + ) + } + case _ => baseRequest + } + val requestWithSecurityApplied = securityConfig.foldLeft(requestWithAppliedBody) { (request, securityToApply) => + securityToApply match { + case ApiKeyInHeader(name, value) => request.header(name, value) + case ApiKeyInCookie(name, value) => request.cookie(name, value) + case _ => request + } + } + requestWithSecurityApplied.headers(headers: _*) + } + +} + +object HttpEnricher { + + private val jsonEncoder = ToJsonEncoder(failOnUnknown = false, getClass.getClassLoader) + + // TODO http: do manual URL validation with reason messages - it can be annoying now - doesnt check things like protocol/host + def buildURL( + rootUrl: Option[URL], + urlFromParam: String, + queryParams: QueryParams + ): Either[NonTransientException, Uri] = { + val url = rootUrl.map(r => s"${r.toString}$urlFromParam").getOrElse(urlFromParam) + (for { + url <- Try(new URL(url)) + uri <- Try(url.toURI) + sttpUri <- Try(Uri(uri)) + finalSttpUri = sttpUri.addParams(queryParams) + } yield finalSttpUri).toEither.left + .map(t => NonTransientException(url, "Invalid URL", cause = t)) + } + + sealed trait ApiKeyConfig { + val name: String + val value: String + } + + object ApiKeyConfig { + final case class ApiKeyInQuery(name: String, value: String) extends ApiKeyConfig + final case class ApiKeyInHeader(name: String, value: String) extends ApiKeyConfig + final case class ApiKeyInCookie(name: String, value: String) extends ApiKeyConfig + } + + final case class Body(value: AnyRef, bodyType: BodyType) + sealed abstract class BodyType(val name: String) extends EnumEntry + + object BodyType extends Enum[BodyType] { + case object JSON extends BodyType("JSON") + case object PlainText extends BodyType("Plain Text") + case object None extends BodyType("None") + override val values: immutable.IndexedSeq[BodyType] = findValues + } + + sealed abstract class HttpMethod(val name: String, val sttpMethod: Method) extends EnumEntry + + object HttpMethod extends Enum[HttpMethod] { + case object GET extends HttpMethod("GET", Method.GET) + case object HEAD extends HttpMethod("HEAD", Method.HEAD) + case object POST extends HttpMethod("POST", Method.POST) + case object PUT extends HttpMethod("PUT", Method.PUT) + case object DELETE extends HttpMethod("DELETE", Method.DELETE) + case object CONNECT extends HttpMethod("CONNECT", Method.CONNECT) + case object OPTIONS extends HttpMethod("OPTIONS", Method.OPTIONS) + case object TRACE extends HttpMethod("TRACE", Method.TRACE) + case object PATCH extends HttpMethod("PATCH", Method.PATCH) + override val values: immutable.IndexedSeq[HttpMethod] = findValues + } + +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherFactory.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherFactory.scala new file mode 100644 index 00000000000..e7c9bee6295 --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherFactory.scala @@ -0,0 +1,182 @@ +package pl.touk.nussknacker.http.enricher + +import pl.touk.nussknacker.engine.api._ +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError +import pl.touk.nussknacker.engine.api.context.transformation.{ + DefinedEagerParameter, + DefinedLazyParameter, + NodeDependencyValue, + SingleInputDynamicComponent +} +import pl.touk.nussknacker.engine.api.context.{OutputVar, ValidationContext} +import pl.touk.nussknacker.engine.api.definition._ +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext +import pl.touk.nussknacker.engine.api.typed.typing +import pl.touk.nussknacker.engine.api.typed.typing.TypingResult +import pl.touk.nussknacker.http.HttpEnricherConfig +import pl.touk.nussknacker.http.client.HttpClientProvider +import pl.touk.nussknacker.http.enricher.HttpEnricher.{BodyType, HttpMethod, buildURL} +import pl.touk.nussknacker.http.enricher.HttpEnricherFactory.{ + BodyParamExtractor, + BodyTypeParamExtractor, + TransformationState +} +import pl.touk.nussknacker.http.enricher.HttpEnricherParameters._ +import sttp.model.QueryParams + +class HttpEnricherFactory(val config: HttpEnricherConfig) + extends EagerService + with SingleInputDynamicComponent[ServiceInvoker] { + + override type State = TransformationState + + override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])( + implicit nodeId: NodeId + ): ContextTransformationDefinition = { + initializeParams orElse addBodyParam orElse finalStep(context, dependencies) + } + + private val initializeParams: ContextTransformationDefinition = { case TransformationStep(Nil, _) => + NextParameters( + parameters = UrlParam.declaration(config.rootUrl).createParameter() :: + QueryParamsParam.declaration.createParameter() :: + MethodParam.declaration(config.allowedMethods).createParameter() :: + HeadersParam.declaration.createParameter() :: + BodyTypeParam.declaration.createParameter() :: Nil, + state = Some(TransformationState.InitialState) + ) + } + + private val addBodyParam: ContextTransformationDefinition = { + case TransformationStep( + BodyTypeParamExtractor(bodyTypeParamValue), + Some(TransformationState.InitialState) + ) => { + val bodyType = BodyType.values + .find(_.name == bodyTypeParamValue) + .getOrElse(throw new IllegalStateException("Invalid body type parameter value.")) + NextParameters( + parameters = BodyParam.declaration(bodyType).map(_.createParameter()).toList, + state = Some(TransformationState.BodyTypeDeclared(bodyType)) + ) + } + } + + private def finalStep(context: ValidationContext, dependencies: List[NodeDependencyValue])( + implicit nodeId: NodeId + ): ContextTransformationDefinition = { + case TransformationStep( + (UrlParam.name, DefinedLazyParameter(lazyUrlParam)) :: + (QueryParamsParam.name, _) :: + (MethodParam.name, DefinedEagerParameter(httpMethod: String, _)) :: + (HeadersParam.name, _) :: + (BodyTypeParam.name, _) :: + parametersTail, + Some(TransformationState.BodyTypeDeclared(bodyType)) + ) => + val outName = OutputVariableNameDependency.extract(dependencies) + + val method = HttpMethod.values + .find(_.name == httpMethod) + .getOrElse(throw new IllegalStateException("Invalid body type parameter value.")) + + val compileTimeUrlValidationErrorOpt = lazyUrlParam.valueOpt.flatMap { + case url: String => + buildURL(config.rootUrl, url, QueryParams()).swap.toOption.map(ex => + CustomNodeError(s"Invalid URL: ${ex.cause.getMessage}", Some(UrlParam.name)) + ) + case _ => None + } + + val requestBodyTypingResult = bodyType match { + case BodyType.None => typing.TypedNull + case nonEmptyBodyType => + parametersTail match { + case BodyParamExtractor(typ) => typ + case _ => + throw new IllegalStateException( + s"Expected body param based on body type parameter ${nonEmptyBodyType.name}. Got none." + ) + } + } + + FinalResults.forValidation( + context, + compileTimeUrlValidationErrorOpt.toList, + Some(TransformationState.FinalState(bodyType, method)) + )(ctx => + ctx.withVariable( + outName, + HttpEnricherOutput.typingResult(requestBodyTypingResult), + Some(ParameterName(OutputVar.CustomNodeFieldName)) + ) + ) + } + + private val httpBeProvider = HttpClientProvider.getBackendProvider(config.httpClientConfig) + + override def implementation( + params: Params, + dependencies: List[NodeDependencyValue], + finalState: Option[TransformationState] + ): ServiceInvoker = { + finalState match { + case Some(TransformationState.FinalState(bodyType, method)) => + new HttpEnricher( + httpBeProvider, + params, + method, + bodyType, + config.rootUrl, + config.security.getOrElse(List.empty) + ) + case other => throw new IllegalStateException(s"Expected final state at end of transformation. Got $other") + } + } + + override def open(runtimeContext: EngineRuntimeContext): Unit = { + super.open(runtimeContext) + httpBeProvider.open(runtimeContext) + } + + override def close(): Unit = { + super.close() + httpBeProvider.close() + } + + override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency) +} + +object HttpEnricherFactory { + sealed trait TransformationState + + private[http] object TransformationState { + case object InitialState extends TransformationState + case class BodyTypeDeclared(bodyType: BodyType) extends TransformationState + case class FinalState(bodyType: BodyType, method: HttpMethod) extends TransformationState + } + + private object BodyTypeParamExtractor { + + def unapply(params: List[(_, _)]): Option[String] = { + params.collectFirst { + case (HttpEnricherParameters.BodyTypeParam.name, DefinedEagerParameter(bodyTypeParamValue: String, _)) => + bodyTypeParamValue + } + } + + } + + private object BodyParamExtractor { + + def unapply(params: List[(_, _)]): Option[TypingResult] = { + params.collectFirst { + case (HttpEnricherParameters.BodyParam.name, DefinedLazyParameter(bodyTypingResult: TypingResult)) => + bodyTypingResult + } + } + + } + +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherOutput.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherOutput.scala new file mode 100644 index 00000000000..52e0452e6bb --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherOutput.scala @@ -0,0 +1,96 @@ +package pl.touk.nussknacker.http.enricher + +import pl.touk.nussknacker.engine.api.exception.NonTransientException +import pl.touk.nussknacker.engine.api.typed.typing._ +import pl.touk.nussknacker.engine.util.json.JsonUtils +import pl.touk.nussknacker.http.enricher.HttpEnricher.Body +import pl.touk.nussknacker.http.enricher.MapExtensions.MapToHashMapExtension +import sttp.client3.Response +import sttp.model.MediaType + +// TODO decision: can we leak headers / url / body in scenario? would it be enough to filter out configured securities? +private[enricher] object HttpEnricherOutput { + + // TODO: fill out request typing result with values determined at validation + def typingResult(requestBodyTypingResult: TypingResult): TypedObjectTypingResult = Typed.record( + List( + "request" -> Typed.record( + List( + "url" -> Typed[String], + "method" -> Typed[String], + "headers" -> Typed.typedClass[java.util.Map[String, String]], + "body" -> requestBodyTypingResult + ) + ), + "response" -> Typed.record( + List( + "statusCode" -> Typed[Int], + "statusText" -> Typed[String], + "headers" -> Typed.typedClass[java.util.Map[String, String]], + "body" -> Unknown + ) + ), + ) + ) + + def buildOutput(response: Response[Either[String, String]], requestBody: Option[Body]): java.util.Map[String, _] = + Map( + "request" -> Map( + "url" -> response.request.uri.toString(), + "method" -> response.request.method.method, + "headers" -> response.request.headers + .map { h => + h.name -> h.value + } + .toMap + .toHashMap, + "body" -> requestBody.map(_.value).orNull + ).toHashMap, + "response" -> Map( + "statusCode" -> response.code.code, + "statusText" -> response.statusText, + "headers" -> + response.headers + .map { h => + h.name -> h.value + } + .toMap + .toHashMap, + "body" -> parseBody(response) + ).toHashMap + ).toHashMap + + private def parseBody(response: Response[Either[String, String]]) = { + response.contentType match { + case Some(contentType) => { + val body = response.body match { + // TODO decision: error handling strategy - what to do if returned not 2xx? + case Left(value) => value + case Right(value) => value + } + contentType match { + case s if s == MediaType.ApplicationJson.toString() => + io.circe.parser.parse(body) match { + case Right(json) => JsonUtils.jsonToAny(json) + case Left(err) => + throw NonTransientException( + input = body, + message = s"Could not parse json: ${err.message}", + cause = err.underlying + ) // TODO decision: if we cant parse - throw exception or return null? + } + case s if s == MediaType.TextPlain.toString() => body + /* + TODO decision: if we cant parse body, do we: + 1. treat it as text/plain - pass it as string without parsing + 2. throw exception + 3. return null + */ + case _ => body + } + } + case None => null + } + } + +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherParameters.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherParameters.scala new file mode 100644 index 00000000000..2e6a4826716 --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/HttpEnricherParameters.scala @@ -0,0 +1,120 @@ +package pl.touk.nussknacker.http.enricher + +import pl.touk.nussknacker.engine.api.{Context, Params} +import pl.touk.nussknacker.engine.api.definition.{ + FixedExpressionValue, + FixedValuesParameterEditor, + ParameterDeclaration +} +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.http.enricher.HttpEnricher.{Body, BodyType, HttpMethod} + +import java.net.URL + +/* + TODO decision: add advanced parameters: + - cache - for GET, HEAD, POST, PUT + - retry strategy - retry count + retry interval + - error handling strategy - or do it through configuration + */ +private[enricher] object HttpEnricherParameters { + + object UrlParam { + val name: ParameterName = ParameterName("URL") + + def declaration(configuredRootUrl: Option[URL]) = { + val rootUrlHint = + configuredRootUrl.map(r => s"""Root URL is configured. For current environment its value is: "${r.toString}"""") + ParameterDeclaration + .lazyMandatory[String](name) + .withCreator(modify = _.copy(hintText = rootUrlHint)) + } + + val extractor: (Context, Params) => String = (context: Context, params: Params) => + declaration(None).extractValueUnsafe(params).evaluate(context) + } + + object MethodParam { + val name: ParameterName = ParameterName("HTTP Method") + + def declaration(allowedMethods: List[HttpMethod]) = ParameterDeclaration + .mandatory[String](name) + .withCreator(modify = + _.copy(editor = + Some( + FixedValuesParameterEditor( + allowedMethods.map(m => FixedExpressionValue(s"'${m.name}'", m.name)) + ) + ) + ) + ) + + } + + // TODO decision: change the type to Map[String,AnyRef] to enable multiple values in header + object HeadersParam { + val name: ParameterName = ParameterName("Headers") + val declaration = + ParameterDeclaration.lazyOptional[java.util.Map[String, String]](name).withCreator() + val extractor: (Context, Params) => java.util.Map[String, String] = (context: Context, params: Params) => + declaration.extractValueUnsafe(params).evaluate(context) + } + + // TODO decision: change the type to Map[String,AnyRef] to enable multiple values in query + object QueryParamsParam { + val name: ParameterName = ParameterName("Query Parameters") + val declaration = + ParameterDeclaration.lazyOptional[java.util.Map[String, String]](name).withCreator() + val extractor: (Context, Params) => java.util.Map[String, String] = (context: Context, params: Params) => + declaration.extractValueUnsafe(params).evaluate(context) + } + + object BodyTypeParam { + val name: ParameterName = ParameterName("Body Type") + + val declaration = ParameterDeclaration + .mandatory[String](name) + .withCreator(modify = + _.copy(editor = + Some( + FixedValuesParameterEditor( + HttpEnricher.BodyType.values + .map(v => { + FixedExpressionValue(s"'${v.name}'", v.name) + }) + .toList + ) + ) + ) + ) + + } + + object BodyParam { + val name: ParameterName = ParameterName("Body") + + def declaration(bodyType: BodyType) = + bodyType match { + case BodyType.JSON => + Some( + ParameterDeclaration + .lazyOptional[AnyRef](name) + .withCreator() + ) + case BodyType.PlainText => + Some( + ParameterDeclaration + .lazyOptional[String](name) + .withCreator() + ) + case BodyType.None => + None + } + + def extractor: (Context, Params, BodyType) => Option[Body] = + (context: Context, params: Params, bodyType: BodyType) => + declaration(bodyType).map(extractor => Body(extractor.extractValueUnsafe(params).evaluate(context), bodyType)) + + } + +} diff --git a/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/MapExtensions.scala b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/MapExtensions.scala new file mode 100644 index 00000000000..8cfb59409ea --- /dev/null +++ b/components/http/src/main/scala/pl/touk/nussknacker/http/enricher/MapExtensions.scala @@ -0,0 +1,16 @@ +package pl.touk.nussknacker.http.enricher + +import scala.jdk.CollectionConverters._ + +// TODO http: extract to utils outside of this subproject? +object MapExtensions { + + implicit class MapToHashMapExtension[K, V](map: Map[K, V]) { + + def toHashMap: java.util.HashMap[K, V] = { + new java.util.HashMap(map.asJava) + } + + } + +} diff --git a/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherBodyTest.scala b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherBodyTest.scala new file mode 100644 index 00000000000..9fb7544e926 --- /dev/null +++ b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherBodyTest.scala @@ -0,0 +1,248 @@ +package pl.touk.nussknacker.http + +import com.github.tomakehurst.wiremock.client.WireMock._ +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.graph.expression.Expression +import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage.convertValidatedToValuable + +class HttpEnricherBodyTest extends HttpEnricherTestSuite { + + import org.scalatest.prop.TableDrivenPropertyChecks._ + + test("returns response body as value based on content type header with fallback to passing it a string") { + forAll( + Table( + ("contentType", "body", "expectedBodyRuntimeValue"), + ("application/json", """ "string" """, "string"), + ("application/json", "true", true), + ( + "application/json", + TestData.recordWithAllTypesNestedAsJson, + TestData.recordWithAllTypesNestedAsComparableAsNuRuntimeValue + ), + ("text/plain", "string", "string"), + ("text/plain", "string", "string"), + ("text/html", "value", "value"), + ) + ) { case (contentType, body, expected) => + wireMock.stubFor( + get(urlEqualTo("/body-test")).willReturn( + aResponse() + .withHeader("Content-Type", contentType) + .withBody(body) + ) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http-node-id", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> s"'${wireMock.baseUrl()}/body-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Body Type" -> "'None'".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.body".spel) + + val result = runner + .runWithData[String, AnyRef](scenario, List("irrelevantInput")) + .validValue + .successes + .head + deepToScala(result) shouldBe expected + } + } + + test("makes request without body") { + wireMock.stubFor( + post(urlEqualTo("/body-test")) + .withRequestBody(absent()) + .willReturn(aResponse()) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> s"'${wireMock.baseUrl()}/body-test'".spel, + "HTTP Method" -> "'POST'".spel, + "Body Type" -> "'None'".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("irrelevantInput")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("makes request with plain text body") { + wireMock.stubFor( + post(urlEqualTo("/body-test")) + .withRequestBody(equalTo("Plain text body")) + .withHeader("Content-Type", equalTo("text/plain; charset=UTF-8")) + .willReturn(aResponse()) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> s"'${wireMock.baseUrl()}/body-test'".spel, + "HTTP Method" -> "'POST'".spel, + "Body Type" -> "'Plain Text'".spel, + "Body" -> "'Plain text body'".spel + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("irrelevantInput")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("makes request with json body parsed from spel") { + forAll( + Table( + ("spelJsonBody", "stringJsonBody"), + ("null".spel, "null"), + ("{}".spel, "[]"), + ("{1,2,3}".spel, "[1,2,3]"), + (TestData.recordWithAllTypesNestedAsSpelString, TestData.recordWithAllTypesNestedAsJson) + ) + ) { + case (spelJsonBody, requestJsonBodyString) => { + wireMock.stubFor( + post(urlEqualTo("/body-test")) + .withRequestBody(equalToJson(requestJsonBodyString)) + .withHeader("Content-Type", equalTo("application/json; charset=UTF-8")) + .willReturn( + aResponse().withStatus(200) + ) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> s"'${wireMock.baseUrl()}/body-test'".spel, + "HTTP Method" -> "'POST'".spel, + "Body Type" -> "'JSON'".spel, + "Body" -> spelJsonBody + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("irrelevantInput")) + .validValue + .successes + .head + + result shouldBe 200 + } + } + } + + object TestData { + + val recordWithAllTypesNestedAsJson: String = + """ + |{ + | "string": "this is a string", + | "number": 123, + | "decimal": 123.456, + | "booleanTrue": true, + | "nullValue": null, + | "object": { + | "nestedString": "nested value", + | "nestedArray": [1, 2, 3], + | "nestedObject": { + | "innerKey": "innerValue" + | } + | }, + | "arrayOfObjects": [ + | {"key1": "value1"}, + | {"key2": "value2"} + | ], + | "array": [1, "string", true, null, {"innerArrayObject": [1, 2, 3]}] + |} + |""".stripMargin + + val recordWithAllTypesNestedAsSpelString: Expression = + """ + |{ + | "string": "this is a string", + | "number": 123, + | "decimal": 123.456, + | "booleanTrue": true, + | "nullValue": null, + | "object": { + | "nestedString": "nested value", + | "nestedArray": {1, 2, 3}, + | "nestedObject": { + | "innerKey": "innerValue" + | } + | }, + | "arrayOfObjects": { + | {"key1": "value1"}, + | {"key2": "value2"} + | }, + | "array": {1, "string", true, null, {"innerArrayObject": {1, 2, 3}}} + |} + |""".stripMargin.spel + + + val recordWithAllTypesNestedAsComparableAsNuRuntimeValue: Map[String, Any] = Map( + "string" -> "this is a string", + "number" -> java.math.BigDecimal.valueOf(123), + "decimal" -> java.math.BigDecimal.valueOf(123.456), + "booleanTrue" -> true, + "nullValue" -> null, + "object" -> Map( + "nestedString" -> "nested value", + "nestedArray" -> List( + java.math.BigDecimal.valueOf(1), + java.math.BigDecimal.valueOf(2), + java.math.BigDecimal.valueOf(3) + ), + "nestedObject" -> Map( + "innerKey" -> "innerValue" + ) + ), + "arrayOfObjects" -> List( + Map("key1" -> "value1"), + Map("key2" -> "value2") + ), + "array" -> List( + java.math.BigDecimal.valueOf(1), + "string", + true, + null, + Map( + "innerArrayObject" -> List( + java.math.BigDecimal.valueOf(1), + java.math.BigDecimal.valueOf(2), + java.math.BigDecimal.valueOf(3) + ) + ) + ) + ) + + } + +} diff --git a/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherConfigTest.scala b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherConfigTest.scala new file mode 100644 index 00000000000..ad9d03ab02d --- /dev/null +++ b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherConfigTest.scala @@ -0,0 +1,46 @@ +package pl.touk.nussknacker.http + +import com.typesafe.config.ConfigFactory +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.http.enricher.HttpEnricher.HttpMethod.{DELETE, GET, POST, PUT} +import pl.touk.nussknacker.http.enricher.HttpEnricherFactory + +class HttpEnricherConfigTest extends AnyFunSuite with Matchers { + + test("should create enricher with defaults for empty config") { + new HttpEnricherComponentProvider() + .create(ConfigFactory.empty(), ProcessObjectDependencies.withConfig(ConfigFactory.empty())) + .head + .component + .asInstanceOf[HttpEnricherFactory] + .config should matchPattern { case HttpEnricherConfig(None, None, _, List(GET, POST, PUT, DELETE)) => + } + } + + test("should throw exception when creating enricher with root url with query parameters") { + val config = ConfigFactory.parseString(s""" + |{ + | rootUrl: "http://example.io?someIntParam=123" + |} + |""".stripMargin) + intercept[IllegalArgumentException] { + new HttpEnricherComponentProvider() + .create(config, ProcessObjectDependencies.withConfig(config)) + }.getMessage shouldBe "Root URL for HTTP enricher has to be without query parameters." + } + + test("should throw exception when creating enricher with empty allowed methods") { + val config = ConfigFactory.parseString(s""" + |{ + | allowedMethods: [] + |} + |""".stripMargin) + intercept[IllegalArgumentException] { + new HttpEnricherComponentProvider() + .create(config, ProcessObjectDependencies.withConfig(config)) + }.getMessage shouldBe "Allowed methods cannot be empty." + } + +} diff --git a/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherHeadersTest.scala b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherHeadersTest.scala new file mode 100644 index 00000000000..e178fb72697 --- /dev/null +++ b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherHeadersTest.scala @@ -0,0 +1,223 @@ +package pl.touk.nussknacker.http + +import com.github.tomakehurst.wiremock.client.WireMock._ +import com.github.tomakehurst.wiremock.matching.EqualToPattern +import com.typesafe.config.ConfigFactory +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.ExpressionParserCompilationError +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage.convertValidatedToValuable + +import scala.jdk.CollectionConverters._ + +class HttpEnricherHeadersTest extends HttpEnricherTestSuite { + + val configuredHeadersEnricher: ComponentDefinition = { + val configuredAdditionalHeadersHttpConfig = ConfigFactory.parseString( + s""" + |{ + | security: [ + | { + | in: "header" + | name: "configured_header_1_key" + | value: "configured_header_1_value" + | }, + | { + | in: "header" + | name: "configured_header_2_key" + | value: "configured_header_2_value" + | }, + | { + | in: "cookie" + | name: "configured_cookie_key" + | value: "configured_cookie_value" + | } + | ] + |} + |""".stripMargin + ) + new HttpEnricherComponentProvider() + .create(configuredAdditionalHeadersHttpConfig, ProcessObjectDependencies.withConfig(ConfigFactory.empty())) + .head + .copy(name = "configuredSecurityInHeadersHttp") + } + + override protected lazy val additionalComponents: List[ComponentDefinition] = List(configuredHeadersEnricher) + + test("makes request with lazily evaluated header") { + wireMock.stubFor( + get(urlEqualTo("/header-test")) + .withHeader("spel_header_1_key", new EqualToPattern("spel_header_1_value")) + .withHeader("spel_header_2_key", new EqualToPattern("input_header_2_key")) + .willReturn(aResponse().withStatus(200)) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http-node-id", + "httpOutput", + configuredHeadersEnricher.name, + "URL" -> s"'${wireMock.baseUrl()}/header-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Headers" -> "{ spel_header_1_key : 'spel_header_1_value', spel_header_2_key : #input }".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("input_header_2_key")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("makes request with header from config") { + wireMock.stubFor( + get(urlEqualTo("/header-test")) + .withHeader("configured_header_1_key", new EqualToPattern("configured_header_1_value")) + .withHeader("configured_header_2_key", new EqualToPattern("configured_header_2_value")) + .willReturn(aResponse().withStatus(200)) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + configuredHeadersEnricher.name, + "URL" -> s"'${wireMock.baseUrl()}/header-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Headers" -> "".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("irrelevant value")) + .validValue + .successes + .head + + result shouldBe 200 + } + + // TODO http: is this ok? even if we validate not overriding we cant ensure that in runtime + test("makes request with header from parameter that overwrites configured header") { + wireMock.stubFor( + get(urlEqualTo("/header-test")) + .withHeader("configured_header_1_key", new EqualToPattern("overwriten_spel_header_1_value")) + .willReturn(aResponse().withStatus(200)) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + configuredHeadersEnricher.name, + "URL" -> s"'${wireMock.baseUrl()}/header-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Headers" -> "{ configured_header_1_key : 'overwriten_spel_header_1_value' }".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("irrelevant value")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("makes request with cookie from config") { + wireMock.stubFor( + get(urlEqualTo("/header-test")) + .withCookie("configured_cookie_key", new EqualToPattern("configured_cookie_value")) + .willReturn(aResponse().withStatus(200)) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + configuredHeadersEnricher.name, + "URL" -> s"'${wireMock.baseUrl()}/header-test'".spel, + "HTTP Method" -> "'GET'".spel + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("irrelevant value")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("returns received headers from response") { + wireMock.stubFor( + get(urlEqualTo("/header-test")) + .willReturn(aResponse().withStatus(200).withHeader("response_header_key", "response_header_value")) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + configuredHeadersEnricher.name, + "URL" -> s"'${wireMock.baseUrl()}/header-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Headers" -> "".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.headers".spel) + + val result = runner + .runWithData[String, java.util.Map[String, String]](scenario, List("irrelevant value")) + .validValue + .successes + .head + + result.asScala should contain("response_header_key" -> "response_header_value") + } + + // TODO http: is this behaviour ok? or should we treat {} as empty map to not confuse users? + test("returns error when using list in headers parameter") { + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + configuredHeadersEnricher.name, + "URL" -> s"'${wireMock.baseUrl()}/header-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Headers" -> "{}".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.headers".spel) + + val result = runner + .runWithData[String, java.util.Map[String, String]](scenario, List("irrelevant value")) + .invalidValue + .head + + result should matchPattern { + case ExpressionParserCompilationError( + "Bad expression type, expected: Map[String,String], found: List[Unknown]({})", + _, + Some(ParameterName("Headers")), + _, + _ + ) => + } + } + +} diff --git a/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherOutputTest.scala b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherOutputTest.scala new file mode 100644 index 00000000000..4a20c8448cb --- /dev/null +++ b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherOutputTest.scala @@ -0,0 +1,95 @@ +package pl.touk.nussknacker.http + +import com.github.tomakehurst.wiremock.client.WireMock._ +import pl.touk.nussknacker.engine.api.NodeId +import pl.touk.nussknacker.engine.api.context.ValidationContext +import pl.touk.nussknacker.engine.api.context.transformation.{ + DefinedEagerParameter, + DefinedLazyParameter, + DynamicComponent, + OutputVariableNameValue +} +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.typed.typing.Typed +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.http.backend.DefaultHttpClientConfig +import pl.touk.nussknacker.http.enricher.HttpEnricher.{BodyType, HttpMethod} +import pl.touk.nussknacker.http.enricher.HttpEnricherFactory +import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage.convertValidatedToValuable + +// TODO: add test for request data +// TODO: add test for typing +class HttpEnricherOutputTest extends HttpEnricherTestSuite { + + test("returns request and response data as seperate fields") { + wireMock.stubFor( + get(urlEqualTo("/code-test")).willReturn( + aResponse().withStatus(200).withHeader("response_header_key_1", "response_header_value_1") + ) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http-node-id", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> s"'${wireMock.baseUrl()}/code-test'".spel, + "HTTP Method" -> "'GET'".spel, + "Headers" -> "{ spel_header_1_key : 'spel_header_1_value' }".spel, + "Body Type" -> "'None'".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput".spel) + + val result = runner + .runWithData[String, java.util.Map[String, AnyRef]](scenario, List("irrelevantInput")) + .validValue + .successes + .head + + val resultMap = deepToScala(result).asInstanceOf[Map[String, AnyRef]] + + val request = resultMap("request").asInstanceOf[Map[String, AnyRef]] + request("url") shouldBe s"${wireMock.baseUrl()}/code-test" + request("method") shouldBe "GET" + request("body") shouldBe null + + val requestHeaders = request("headers").asInstanceOf[Map[String, String]] + requestHeaders("spel_header_1_key") shouldBe "spel_header_1_value" + + val response = resultMap("response").asInstanceOf[Map[String, AnyRef]] + response("statusCode") shouldBe 200 + response("statusText") shouldBe "OK" + response("body") shouldBe null + + val responseHeaders = response("headers").asInstanceOf[Map[String, String]] + responseHeaders("response_header_key_1") shouldBe "response_header_value_1" + } + + // TODO: how to test typing result from context transformation + ignore("return typing result of request body if can be determined") { + val enricherService = + new HttpEnricherFactory(HttpEnricherConfig.apply(None, None, DefaultHttpClientConfig(), List(HttpMethod.GET))) + val transformation = { + enricherService.contextTransformation(ValidationContext.empty, List(OutputVariableNameValue("outputVar")))( + NodeId("test") + )( + enricherService.TransformationStep( + List( + (ParameterName("URL"), DefinedLazyParameter(Typed.fromInstance("http://somehost.com"))), + (ParameterName("Query Parameters"), DefinedLazyParameter(Typed.fromInstance(null))), + (ParameterName("HTTP Method"), DefinedEagerParameter("GET", Typed.fromInstance("GET"))), + (ParameterName("Headers"), DefinedLazyParameter(Typed.fromInstance(null))), + (ParameterName("Body Type"), DefinedEagerParameter("None", Typed.fromInstance("None"))), + (ParameterName("Body"), DefinedLazyParameter(Typed.fromInstance(null))), + ), + Some(HttpEnricherFactory.TransformationState.BodyTypeDeclared(BodyType.PlainText)) + ) + ) + } + // cant match on DynamicComponent.FinalResults - no way to do it from here? + } + +} diff --git a/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherTestSuite.scala b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherTestSuite.scala new file mode 100644 index 00000000000..35d23a73368 --- /dev/null +++ b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherTestSuite.scala @@ -0,0 +1,71 @@ +package pl.touk.nussknacker.http + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.core.WireMockConfiguration +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.engine.flink.test.FlinkSpec +import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner +import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner.FlinkTestScenarioRunnerExt +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.AvailablePortFinder + +import scala.jdk.CollectionConverters._ + +class HttpEnricherTestSuite extends AnyFunSuite with BeforeAndAfterAll with FlinkSpec with Matchers { + + protected val wireMock: WireMockServer = { + val server = AvailablePortFinder.withAvailablePortsBlocked(1)(l => { + new WireMockServer( + WireMockConfiguration + .wireMockConfig() + .port(l.head) + ) + }) + server.start() + server + } + + override protected def afterAll(): Unit = { + try { + wireMock.stop() + } finally { + super.afterAll() + } + } + + protected val noConfigHttpEnricherName = "no-config-http" + + protected lazy val additionalComponents: List[ComponentDefinition] = List.empty + + protected lazy val runner: FlinkTestScenarioRunner = TestScenarioRunner + .flinkBased(ConfigFactory.empty(), flinkMiniCluster) + .withExtraComponents( + new HttpEnricherComponentProvider() + .create( + ConfigFactory.empty(), + ProcessObjectDependencies.withConfig(ConfigFactory.empty()) + ) + .head + .copy(name = noConfigHttpEnricherName) +: additionalComponents + ) + .build() + + protected def deepToScala(obj: Any): Any = obj match { + case map: java.util.Map[_, _] => + map.asScala.map { case (key, value) => (deepToScala(key), deepToScala(value)) }.toMap + case list: java.util.List[_] => + list.asScala.map(deepToScala).toList + case set: java.util.Set[_] => + set.asScala.map(deepToScala).toSet + case array: Array[_] => + array.map(deepToScala) + case other => + other + } + +} diff --git a/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherURLTest.scala b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherURLTest.scala new file mode 100644 index 00000000000..c495db015a1 --- /dev/null +++ b/components/http/src/test/scala/pl/touk/nussknacker/http/HttpEnricherURLTest.scala @@ -0,0 +1,132 @@ +package pl.touk.nussknacker.http + +import com.github.tomakehurst.wiremock.client.WireMock._ +import com.typesafe.config.ConfigFactory +import pl.touk.nussknacker.engine.api.component.ComponentDefinition +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError +import pl.touk.nussknacker.engine.api.parameter.ParameterName +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.engine.build.ScenarioBuilder +import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion +import pl.touk.nussknacker.engine.util.test.TestScenarioRunner +import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage.convertValidatedToValuable + +class HttpEnricherURLTest extends HttpEnricherTestSuite { + + import org.scalatest.prop.TableDrivenPropertyChecks._ + + lazy val configuredRootURLEnricher: ComponentDefinition = { + val configuredAdditionalHeadersHttpConfig = ConfigFactory.parseString( + s""" + |{ + | rootUrl: "${wireMock.baseUrl()}", + | security: [ + | { + | in: "query" + | name: "configured_query_1_key" + | value: "configured_query_1_value" + | } + | ] + |} + |""".stripMargin + ) + new HttpEnricherComponentProvider() + .create(configuredAdditionalHeadersHttpConfig, ProcessObjectDependencies.withConfig(ConfigFactory.empty())) + .head + .copy(name = "configured-root-url-and-query-security-key-http") + } + + override protected lazy val additionalComponents: List[ComponentDefinition] = List(configuredRootURLEnricher) + + test("makes request under specified url with configured root url and query params") { + wireMock.stubFor( + get(urlEqualTo("/url-test?spel_query_key_1=input_query_value_1&configured_query_1_key=configured_query_1_value")) + .willReturn( + aResponse().withStatus(200) + ) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + configuredRootURLEnricher.name, + "URL" -> s"'/url-test'".spel, + "Query Parameters" -> "{ spel_query_key_1 : #input }".spel, + "HTTP Method" -> "'GET'".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("input_query_value_1")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("makes request under specified url with encoded query params") { + wireMock.stubFor( + get(urlEqualTo("/url-test?spel_query_unencoded_header_+!%23=spel_query_unencoded_header_+!%23")) + .willReturn( + aResponse().withStatus(200) + ) + ) + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> s"'${wireMock.baseUrl}/url-test'".spel, + "Query Parameters" -> "{ 'spel_query_unencoded_header_ !#' : 'spel_query_unencoded_header_ !#' }".spel, + "HTTP Method" -> "'GET'".spel, + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput.response.statusCode".spel) + + val result = runner + .runWithData[String, Integer](scenario, List("input_query_value_1")) + .validValue + .successes + .head + + result shouldBe 200 + } + + test("validates url during parameters validation") { + forAll( + Table( + ("invalidUrlExpr", "expectedErrorMessage"), + ("'noturl'".spel, "Invalid URL: no protocol: noturl"), + ("'badprotocol://'".spel, "Invalid URL: unknown protocol: badprotocol"), + ("'http://'".spel, "Invalid URL: Expected scheme-specific part at index 5: http:"), + ( + "'http://host_with_space in_authority'".spel, + "Invalid URL: Illegal character in authority at index 7: http://host_with_space in_authority" + ) + ) + ) { (invalidUrlExpr, errorMessage) => + val scenario = ScenarioBuilder + .streaming("id") + .source("start", TestScenarioRunner.testDataSource) + .enricher( + "http", + "httpOutput", + noConfigHttpEnricherName, + "URL" -> invalidUrlExpr, + "HTTP Method" -> "'GET'".spel + ) + .emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#httpOutput".spel) + + val errors = runner + .runWithData[String, Integer](scenario, List("irrelevant")) + .invalidValue + + errors.head shouldBe CustomNodeError("http", errorMessage, Some(ParameterName("URL"))) + } + } + +} diff --git a/nussknacker-dist/src/universal/conf/dev-application.conf b/nussknacker-dist/src/universal/conf/dev-application.conf index ddf3c4c27aa..3fbadbc34dd 100644 --- a/nussknacker-dist/src/universal/conf/dev-application.conf +++ b/nussknacker-dist/src/universal/conf/dev-application.conf @@ -237,6 +237,9 @@ scenarioTypes { url: ${OPENAPI_SERVICE_URL}"/swagger" rootUrl: ${OPENAPI_SERVICE_URL} } + http { + // http enricher configuration here + } } } category: "RequestResponse" diff --git a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/DynamicNodeValidator.scala b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/DynamicNodeValidator.scala index 10f1e641bc3..e6eb398d1c1 100644 --- a/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/DynamicNodeValidator.scala +++ b/scenario-compiler/src/main/scala/pl/touk/nussknacker/engine/compile/nodecompilation/DynamicNodeValidator.scala @@ -108,8 +108,6 @@ class DynamicNodeValidator( // we add distinct here, as multi-step, partial validation of parameters can cause duplicate errors if implementation is not v. careful val allErrors = (errorsCombined ++ errors).distinct Valid(TransformationResult(allErrors, evaluatedSoFar.map(_._1), finalContext, state, nodeParameters)) - case component.NextParameters(Nil, _, _) => - returnUnmatchedFallback case component.NextParameters(newParameters, newParameterErrors, state) => val enrichedParameters = StandardParameterEnrichment.enrichParameterDefinitions(newParameters, parametersConfig)