Skip to content

Commit

Permalink
http enricher
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Oct 24, 2024
1 parent 0d8c7da commit f02e9df
Show file tree
Hide file tree
Showing 19 changed files with 1,581 additions and 2 deletions.
30 changes: 30 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ lazy val distribution: Project = sbt
(liteKafkaComponents / assembly).value -> "components/lite/liteKafka.jar",
(liteRequestResponseComponents / assembly).value -> "components/lite/liteRequestResponse.jar",
(openapiComponents / assembly).value -> "components/common/openapi.jar",
(httpComponents / assembly).value -> "components/common/http.jar",
(sqlComponents / assembly).value -> "components/common/sql.jar",
)
},
Expand Down Expand Up @@ -1430,6 +1431,7 @@ lazy val liteEngineRuntimeApp: Project = (project in lite("runtime-app"))
(liteKafkaComponents / assembly).value -> "components/lite/liteKafka.jar",
(liteRequestResponseComponents / assembly).value -> "components/lite/liteRequestResponse.jar",
(openapiComponents / assembly).value -> "components/common/openapi.jar",
(httpComponents / assembly).value -> "components/common/http.jar",
(sqlComponents / assembly).value -> "components/common/sql.jar",
),
javaAgents += JavaAgent("io.prometheus.jmx" % "jmx_prometheus_javaagent" % jmxPrometheusJavaagentV % "dist"),
Expand Down Expand Up @@ -1720,6 +1722,33 @@ lazy val openapiComponents = (project in component("openapi"))
flinkComponentsTestkit % "it,test"
)

lazy val httpComponents = (project in component("http"))
.settings(commonSettings)
.settings(assemblyNoScala("http.jar"): _*)
.settings(publishAssemblySettings: _*)
.settings(
name := "nussknacker-http",
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkV % Provided,
"com.beachape" %% "enumeratum" % enumeratumV % Provided,
"org.scalatest" %% "scalatest" % scalaTestV % Test,
"org.wiremock" % "wiremock" % wireMockV % Test,
"com.softwaremill.sttp.client" %% "circe" % "2.2.0",
"io.swagger.core.v3" % "swagger-integration" % swaggerIntegrationV excludeAll (
ExclusionRule(organization = "jakarta.activation"),
ExclusionRule(organization = "jakarta.validation")
),
),
)
.dependsOn(
httpUtils,
componentsUtils % Provided,
jsonUtils % Provided,
commonUtils % Provided,
requestResponseComponentsUtils % Test,
flinkComponentsTestkit % Test
)

lazy val sqlComponents = (project in component("sql"))
.settings(commonSettings)
.settings(assemblyNoScala("sql.jar"): _*)
Expand Down Expand Up @@ -2140,6 +2169,7 @@ lazy val modules = List[ProjectReference](
flinkTableApiComponents,
defaultModel,
openapiComponents,
httpComponents,
scenarioCompiler,
benchmarks,
kafkaUtils,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pl.touk.nussknacker.http.HttpEnricherComponentProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pl.touk.nussknacker.http

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.http.enricher.HttpEnricherFactory

class HttpEnricherComponentProvider extends ComponentProvider with LazyLogging {
override def providerName: String = "http"

override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = {
val conf = HttpEnricherConfig.parse(config)
ComponentDefinition("http", new HttpEnricherFactory(conf)) :: Nil
}

override def isCompatible(version: NussknackerVersion): Boolean = true

override def resolveConfigForExecution(config: Config): Config = config

override def isAutoLoaded: Boolean = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package pl.touk.nussknacker.http

import com.typesafe.config.Config
import net.ceedubs.ficus.readers.ValueReader
import pl.touk.nussknacker.http.backend.{DefaultHttpClientConfig, HttpClientConfig}
import pl.touk.nussknacker.http.enricher.HttpEnricher.ApiKeyConfig.{ApiKeyInCookie, ApiKeyInHeader, ApiKeyInQuery}
import pl.touk.nussknacker.http.enricher.HttpEnricher.HttpMethod.{DELETE, GET, POST, PUT}
import pl.touk.nussknacker.http.enricher.HttpEnricher.{ApiKeyConfig, HttpMethod}

import java.net.URL

final case class HttpEnricherConfig(
rootUrl: Option[URL],
security: Option[List[ApiKeyConfig]],
httpClientConfig: HttpClientConfig,
allowedMethods: List[HttpMethod]
)

object HttpEnricherConfig {
import net.ceedubs.ficus.Ficus._
import pl.touk.nussknacker.engine.util.config.ConfigEnrichments.RichConfig

implicit val apiKeyReader: ValueReader[ApiKeyConfig] = ValueReader.relative(conf => {
val name = conf.as[String]("name")
val value = conf.as[String]("value")
conf.as[String]("in") match {
case "query" => ApiKeyInQuery(name, value)
case "header" => ApiKeyInHeader(name, value)
case "cookie" => ApiKeyInCookie(name, value)
}
})

implicit val httpMethodReader: ValueReader[HttpMethod] = ValueReader.relative(conf => {
import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader
conf.rootAs[HttpMethod]
})

implicit val configValueReader: ValueReader[HttpEnricherConfig] = ValueReader.relative(conf => {
HttpEnricherConfig(
// TODO decision: add '/' in reader if not present? or during evaluation?
rootUrl = optionValueReader[URL].read(conf, "rootUrl").map { url =>
if (url.getQuery != null) {
throw new IllegalArgumentException("Root URL for HTTP enricher has to be without query parameters.")
} else {
url
}
},
security = optionValueReader[List[ApiKeyConfig]].read(conf, "security"),
httpClientConfig =
optionValueReader[HttpClientConfig].read(conf, "httpClientConfig").getOrElse(DefaultHttpClientConfig()),
allowedMethods = {
val methods = optionValueReader[List[HttpMethod]].read(conf, "allowedMethods").getOrElse(DefaultAllowedMethods)
if (methods.isEmpty) {
throw new IllegalArgumentException("Allowed methods cannot be empty.")
}
methods
}
)
})

private val DefaultAllowedMethods = List(GET, POST, PUT, DELETE)

private[http] def parse(config: Config) = config.rootAs[HttpEnricherConfig]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pl.touk.nussknacker.http.client

import org.asynchttpclient.DefaultAsyncHttpClient
import pl.touk.nussknacker.http.backend.{FixedAsyncHttpClientBackendProvider, HttpBackendProvider, HttpClientConfig}

import scala.util.Try

// TODO decision: Copied from OpenAPI enricher - what to do about this?
object HttpClientProvider {

def getBackendProvider(httpClientConfig: HttpClientConfig): HttpBackendProvider = {
val isFlinkBased = Try(
getClass.getClassLoader
.loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
).isSuccess
if (isFlinkBased) {
new SharedHttpClientBackendProvider(httpClientConfig)
} else {
// TODO: figure out how to create client only once and enable its closing. Also: do we want to pass processId here?
// Should client be one per engine deployment, or per scenario?
val httpClient = new DefaultAsyncHttpClient(httpClientConfig.toAsyncHttpClientConfig(None).build())
new FixedAsyncHttpClientBackendProvider(httpClient)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pl.touk.nussknacker.http.client

import com.typesafe.scalalogging.LazyLogging
import org.asynchttpclient.{AsyncHttpClient, DefaultAsyncHttpClient}
import pl.touk.nussknacker.engine.api.MetaData
import pl.touk.nussknacker.engine.api.runtimecontext.EngineRuntimeContext
import pl.touk.nussknacker.engine.util.sharedservice.{SharedService, SharedServiceHolder}
import pl.touk.nussknacker.http.backend.{HttpBackendProvider, HttpClientConfig}
import sttp.client3.SttpBackend
import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend

import scala.concurrent.{ExecutionContext, Future}

// TODO decision: Copied from OpenAPI enricher - what to do about this?
private[client] class SharedHttpClientBackendProvider(httpClientConfig: HttpClientConfig)
extends HttpBackendProvider
with LazyLogging {

private var httpClient: SharedHttpClient = _

override def open(context: EngineRuntimeContext): Unit = {
httpClient = SharedHttpClientBackendProvider.retrieveService(httpClientConfig)(context.jobData.metaData)
}

override def httpBackendForEc(implicit ec: ExecutionContext): SttpBackend[Future, Any] =
AsyncHttpClientFutureBackend.usingClient(httpClient.httpClient)

override def close(): Unit = Option(httpClient).foreach(_.close())

}

private[client] object SharedHttpClientBackendProvider extends SharedServiceHolder[HttpClientConfig, SharedHttpClient] {

override protected def createService(config: HttpClientConfig, metaData: MetaData): SharedHttpClient = {
val httpClientConfig = config.toAsyncHttpClientConfig(Option(metaData.name))
new SharedHttpClient(new DefaultAsyncHttpClient(httpClientConfig.build()), config)
}

}

private[client] class SharedHttpClient(val httpClient: AsyncHttpClient, config: HttpClientConfig)
extends SharedService[HttpClientConfig] {

override def creationData: HttpClientConfig = config

override protected def sharedServiceHolder: SharedHttpClientBackendProvider.type = SharedHttpClientBackendProvider

override def internalClose(): Unit = httpClient.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package pl.touk.nussknacker.http.enricher

import enumeratum.{Enum, EnumEntry}
import pl.touk.nussknacker.engine.api.exception.NonTransientException
import pl.touk.nussknacker.engine.api.process.ComponentUseCase
import pl.touk.nussknacker.engine.api.test.InvocationCollectors
import pl.touk.nussknacker.engine.api.{Context, Params, ServiceInvoker}
import pl.touk.nussknacker.engine.util.json.ToJsonEncoder
import pl.touk.nussknacker.http.backend.HttpBackendProvider
import pl.touk.nussknacker.http.enricher.HttpEnricher.ApiKeyConfig._
import pl.touk.nussknacker.http.enricher.HttpEnricher.{ApiKeyConfig, Body, BodyType, HttpMethod, buildURL, jsonEncoder}
import pl.touk.nussknacker.http.enricher.HttpEnricherParameters.BodyParam
import sttp.client3.basicRequest
import sttp.client3.circe._
import sttp.model.{Header, Method, QueryParams, Uri}

import java.net.URL
import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
import scala.util.Try

class HttpEnricher(
clientProvider: HttpBackendProvider,
params: Params,
method: HttpMethod,
bodyType: BodyType,
rootUrl: Option[URL],
securityConfig: List[ApiKeyConfig]
) extends ServiceInvoker {

override def invoke(context: Context)(
implicit ec: ExecutionContext,
collector: InvocationCollectors.ServiceInvocationCollector,
componentUseCase: ComponentUseCase
): Future[AnyRef] = {
val url = {
val urlParam = HttpEnricherParameters.UrlParam.extractor(context, params)
val queryParamsFromParam: QueryParams = HttpEnricherParameters.QueryParamsParam.extractor(context, params) match {
case null => QueryParams()
case jMap => QueryParams.fromMap(jMap.asScala.toMap)
}
val queryParamsApiKeys = securityConfig.collect { case q: ApiKeyInQuery => q.name -> q.value }.toMap
val allQueryParams = queryParamsFromParam.param(queryParamsApiKeys)
buildURL(rootUrl, urlParam, allQueryParams).fold(ex => throw ex, identity)
}

val headers: List[Header] = HttpEnricherParameters.HeadersParam.extractor(context, params) match {
case null => List.empty
case jMap =>
jMap.asScala.toMap.map { case (k, v) =>
Header(k, v)
}.toList
}

val body = BodyParam.extractor(context, params, bodyType)

val request = buildRequest(method.sttpMethod, url, body, headers, securityConfig)

val httpClient = clientProvider.httpBackendForEc
val response = httpClient.send(request)

response.map(res => HttpEnricherOutput.buildOutput(res, body))
}

private def buildRequest(
method: Method,
url: Uri,
body: Option[Body],
headers: List[Header],
securityConfig: List[ApiKeyConfig]
) = {
val baseRequest = basicRequest.method(method, url)
val requestWithAppliedBody = body match {
case Some(Body(body, BodyType.JSON)) =>
val json = jsonEncoder.encode(body)
baseRequest.body(json)
case Some(Body(body, BodyType.PlainText)) =>
body match {
case strBody: String => baseRequest.body(strBody)
case other =>
throw NonTransientException(
BodyType.PlainText.name,
s"Declared type of request body does not match its value. Expected String. Got: ${other.getClass}"
)
}
case _ => baseRequest
}
val requestWithSecurityApplied = securityConfig.foldLeft(requestWithAppliedBody) { (request, securityToApply) =>
securityToApply match {
case ApiKeyInHeader(name, value) => request.header(name, value)
case ApiKeyInCookie(name, value) => request.cookie(name, value)
case _ => request
}
}
requestWithSecurityApplied.headers(headers: _*)
}

}

object HttpEnricher {

private val jsonEncoder = ToJsonEncoder(failOnUnknown = false, getClass.getClassLoader)

// TODO http: do manual URL validation with reason messages - it can be annoying now - doesnt check things like protocol/host
def buildURL(
rootUrl: Option[URL],
urlFromParam: String,
queryParams: QueryParams
): Either[NonTransientException, Uri] = {
val url = rootUrl.map(r => s"${r.toString}$urlFromParam").getOrElse(urlFromParam)
(for {
url <- Try(new URL(url))
uri <- Try(url.toURI)
sttpUri <- Try(Uri(uri))
finalSttpUri = sttpUri.addParams(queryParams)
} yield finalSttpUri).toEither.left
.map(t => NonTransientException(url, "Invalid URL", cause = t))
}

sealed trait ApiKeyConfig {
val name: String
val value: String
}

object ApiKeyConfig {
final case class ApiKeyInQuery(name: String, value: String) extends ApiKeyConfig
final case class ApiKeyInHeader(name: String, value: String) extends ApiKeyConfig
final case class ApiKeyInCookie(name: String, value: String) extends ApiKeyConfig
}

final case class Body(value: AnyRef, bodyType: BodyType)
sealed abstract class BodyType(val name: String) extends EnumEntry

object BodyType extends Enum[BodyType] {
case object JSON extends BodyType("JSON")
case object PlainText extends BodyType("Plain Text")
case object None extends BodyType("None")
override val values: immutable.IndexedSeq[BodyType] = findValues
}

sealed abstract class HttpMethod(val name: String, val sttpMethod: Method) extends EnumEntry

object HttpMethod extends Enum[HttpMethod] {
case object GET extends HttpMethod("GET", Method.GET)
case object HEAD extends HttpMethod("HEAD", Method.HEAD)
case object POST extends HttpMethod("POST", Method.POST)
case object PUT extends HttpMethod("PUT", Method.PUT)
case object DELETE extends HttpMethod("DELETE", Method.DELETE)
case object CONNECT extends HttpMethod("CONNECT", Method.CONNECT)
case object OPTIONS extends HttpMethod("OPTIONS", Method.OPTIONS)
case object TRACE extends HttpMethod("TRACE", Method.TRACE)
case object PATCH extends HttpMethod("PATCH", Method.PATCH)
override val values: immutable.IndexedSeq[HttpMethod] = findValues
}

}
Loading

0 comments on commit f02e9df

Please sign in to comment.