diff --git a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala index 8275892..f534440 100644 --- a/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala +++ b/src/main/scala/com/github/matsluni/akkahttpspi/AkkaHttpClient.scala @@ -28,11 +28,11 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`} import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.scaladsl.Source -import akka.stream.{ActorMaterializer, Materializer, SystemMaterializer} +import akka.stream.{Materializer, SystemMaterializer} import akka.util.ByteString import org.slf4j.LoggerFactory import software.amazon.awssdk.http.async._ -import software.amazon.awssdk.http.SdkHttpRequest +import software.amazon.awssdk.http.{SdkHttpConfigurationOption, SdkHttpRequest} import software.amazon.awssdk.utils.AttributeMap import scala.collection.immutable @@ -41,7 +41,7 @@ import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext} -class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { +class AkkaHttpClient(shutdownHandle: () => Unit, private[akkahttpspi] val connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient { import AkkaHttpClient._ lazy val runner = new RequestRunner() @@ -135,17 +135,42 @@ object AkkaHttpClient { else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.") } + // based on NettyNioAsyncHttpClient and ApacheHttpClient + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code + // https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code + private[akkahttpspi] def buildConnectionPoolSettings(base: ConnectionPoolSettings,attributeMap: AttributeMap): ConnectionPoolSettings = { + def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration = + if (duration.isZero) scala.concurrent.duration.Duration.Inf + else toScala(duration) + + def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration = + scala.concurrent.duration.Duration.fromNanos(duration.toNanos) + + base + .withUpdatedConnectionSettings(s => + s.withConnectingTimeout(toScala(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT))) + .withIdleTimeout(toScala(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT))) + ) + .withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()) + .withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE))) + } + def builder() = AkkaHttpClientBuilder() case class AkkaHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None, private val executionContext: Option[ExecutionContext] = None, - private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] { - def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = { + private val connectionPoolSettings: Option[ConnectionPoolSettings] = None, + private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings = (c, _) => c + ) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] { + + def buildWithDefaults(serviceDefaults: AttributeMap): SdkAsyncHttpClient = { implicit val as = actorSystem.getOrElse(ActorSystem("aws-akka-http")) implicit val ec = executionContext.getOrElse(as.dispatcher) val mat: Materializer = SystemMaterializer(as).materializer - val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)) + val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS); + + val cps = connectionPoolSettingsBuilder(connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), resolvedOptions) val shutdownhandleF = () => { if (actorSystem.isEmpty) { Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), Duration.apply(10, TimeUnit.SECONDS)) @@ -158,6 +183,9 @@ object AkkaHttpClient { def withActorSystem(actorSystem: ClassicActorSystemProvider): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem)) def withExecutionContext(executionContext: ExecutionContext): AkkaHttpClientBuilder = copy(executionContext = Some(executionContext)) def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): AkkaHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings)) + def withConnectionPoolSettingsBuilder(connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings): AkkaHttpClientBuilder = + copy(connectionPoolSettingsBuilder = connectionPoolSettingsBuilder) + def withConnectionPoolSettingsBuilderFromAttributeMap(): AkkaHttpClientBuilder = copy(connectionPoolSettingsBuilder = buildConnectionPoolSettings) } lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible)) diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala index 15f6a2c..0b0a413 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/AkkaHttpClientSpec.scala @@ -18,10 +18,15 @@ package com.github.matsluni.akkahttpspi import akka.http.scaladsl.model.headers.`Content-Type` import akka.http.scaladsl.model.MediaTypes +import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} +import com.typesafe.config.ConfigFactory import org.scalatest.OptionValues import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import software.amazon.awssdk.http.SdkHttpConfigurationOption +import software.amazon.awssdk.utils.AttributeMap +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { @@ -46,5 +51,95 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues { contentTypeHeader.value.lowercaseName() shouldBe `Content-Type`.lowercaseName reqHeaders should have size 1 } + + "build() should use default ConnectionPoolSettings" in { + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings shouldBe ConnectionPoolSettings(ConfigFactory.load()) + } + + "withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in { + val attributeMap = AttributeMap.builder() + .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, toJava(1.second)) + .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, toJava(2.second)) + .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, Integer.valueOf(3)) + .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, toJava(4.second)) + .build() + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilderFromAttributeMap() + .buildWithDefaults(attributeMap) + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe 1.second + akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe 2.seconds + akkaClient.connectionSettings.maxConnections shouldBe 3 + akkaClient.connectionSettings.maxConnectionLifetime shouldBe 4.seconds + } + + "withConnectionPoolSettingsBuilderFromAttributeMap().build() should fallback to GLOBAL_HTTP_DEFAULTS" in { + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilderFromAttributeMap() + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe + toScala(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT)) + akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe + toScala(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT)) + akkaClient.connectionSettings.maxConnections shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue() + infiniteToZero(akkaClient.connectionSettings.maxConnectionLifetime) shouldBe + SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE) + } + + "withConnectionPoolSettingsBuilder().build() should use passed connectionPoolSettings builder" in { + val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load()) + .withConnectionSettings( + ClientConnectionSettings(ConfigFactory.load()) + .withConnectingTimeout(1.second) + .withIdleTimeout(2.seconds) + ) + .withMaxConnections(3) + .withMaxConnectionLifetime(4.seconds) + + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettingsBuilder((_, _) => connectionPoolSettings) + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings shouldBe connectionPoolSettings + } + + "withConnectionPoolSettings().build() should use passed ConnectionPoolSettings" in { + val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load()) + .withConnectionSettings( + ClientConnectionSettings(ConfigFactory.load()) + .withConnectingTimeout(1.second) + .withIdleTimeout(2.seconds) + ) + .withMaxConnections(3) + .withMaxConnectionLifetime(4.seconds) + val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory() + .withConnectionPoolSettings(connectionPoolSettings) + .build() + .asInstanceOf[AkkaHttpClient] + + akkaClient.connectionSettings shouldBe connectionPoolSettings + } + } + + private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match { + case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO + case duration: FiniteDuration => toJava(duration) + } + + private def toJava(duration: scala.concurrent.duration.FiniteDuration): java.time.Duration = { + java.time.Duration.ofNanos(duration.toNanos) + } + + private def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration = { + scala.concurrent.duration.Duration.fromNanos(duration.toNanos) } } diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala b/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala index a93ad50..94fbc03 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala @@ -53,7 +53,7 @@ trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] { lazy val exposedServicePort: Int = 4566 private lazy val containerInstance = new GenericContainer( - dockerImage = "localstack/localstack", + dockerImage = "localstack/localstack:1.4.0", exposedPorts = Seq(exposedServicePort), env = Map("SERVICES" -> service), waitStrategy = Some(LocalStackReadyLogWaitStrategy)