Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using SdkHttpConfigurationOption over default akka-http connection settings #255

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer, SystemMaterializer}
import akka.stream.{Materializer, SystemMaterializer}
import akka.util.ByteString
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.async._
import software.amazon.awssdk.http.SdkHttpRequest
import software.amazon.awssdk.http.{SdkHttpConfigurationOption, SdkHttpRequest}
import software.amazon.awssdk.utils.AttributeMap

import scala.collection.immutable
Expand All @@ -41,7 +41,7 @@ import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}

class AkkaHttpClient(shutdownHandle: () => Unit, connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient {
class AkkaHttpClient(shutdownHandle: () => Unit, private[akkahttpspi] val connectionSettings: ConnectionPoolSettings)(implicit actorSystem: ActorSystem, ec: ExecutionContext, mat: Materializer) extends SdkAsyncHttpClient {
import AkkaHttpClient._

lazy val runner = new RequestRunner()
Expand Down Expand Up @@ -135,17 +135,42 @@ object AkkaHttpClient {
else throw new RuntimeException(s"Could not parse custom content type '$contentTypeStr'.")
}

// based on NettyNioAsyncHttpClient and ApacheHttpClient
// https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fnetty-nio-client%5C%2Fsrc%5C%2Fmain%2F&type=code
// https://github.com/search?q=repo%3Aaws%2Faws-sdk-java-v2+SdkHttpConfigurationOption+path%3A%2F%5Ehttp-clients%5C%2Fapache-client%5C%2Fsrc%5C%2Fmain%2F&type=code
private[akkahttpspi] def buildConnectionPoolSettings(base: ConnectionPoolSettings,attributeMap: AttributeMap): ConnectionPoolSettings = {
def zeroToInfinite(duration: java.time.Duration): scala.concurrent.duration.Duration =
if (duration.isZero) scala.concurrent.duration.Duration.Inf
else toScala(duration)

def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration =
scala.concurrent.duration.Duration.fromNanos(duration.toNanos)

base
.withUpdatedConnectionSettings(s =>
s.withConnectingTimeout(toScala(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT)))
.withIdleTimeout(toScala(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT)))
)
.withMaxConnections(attributeMap.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue())
.withMaxConnectionLifetime(zeroToInfinite(attributeMap.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)))
}

def builder() = AkkaHttpClientBuilder()

case class AkkaHttpClientBuilder(private val actorSystem: Option[ActorSystem] = None,
private val executionContext: Option[ExecutionContext] = None,
private val connectionPoolSettings: Option[ConnectionPoolSettings] = None) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] {
def buildWithDefaults(attributeMap: AttributeMap): SdkAsyncHttpClient = {
private val connectionPoolSettings: Option[ConnectionPoolSettings] = None,
private val connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings = (c, _) => c
) extends SdkAsyncHttpClient.Builder[AkkaHttpClientBuilder] {

def buildWithDefaults(serviceDefaults: AttributeMap): SdkAsyncHttpClient = {
implicit val as = actorSystem.getOrElse(ActorSystem("aws-akka-http"))
implicit val ec = executionContext.getOrElse(as.dispatcher)
val mat: Materializer = SystemMaterializer(as).materializer

val cps = connectionPoolSettings.getOrElse(ConnectionPoolSettings(as))
val resolvedOptions = serviceDefaults.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS);

val cps = connectionPoolSettingsBuilder(connectionPoolSettings.getOrElse(ConnectionPoolSettings(as)), resolvedOptions)
val shutdownhandleF = () => {
if (actorSystem.isEmpty) {
Await.result(Http().shutdownAllConnectionPools().flatMap(_ => as.terminate()), Duration.apply(10, TimeUnit.SECONDS))
Expand All @@ -158,6 +183,9 @@ object AkkaHttpClient {
def withActorSystem(actorSystem: ClassicActorSystemProvider): AkkaHttpClientBuilder = copy(actorSystem = Some(actorSystem.classicSystem))
def withExecutionContext(executionContext: ExecutionContext): AkkaHttpClientBuilder = copy(executionContext = Some(executionContext))
def withConnectionPoolSettings(connectionPoolSettings: ConnectionPoolSettings): AkkaHttpClientBuilder = copy(connectionPoolSettings = Some(connectionPoolSettings))
def withConnectionPoolSettingsBuilder(connectionPoolSettingsBuilder: (ConnectionPoolSettings, AttributeMap) => ConnectionPoolSettings): AkkaHttpClientBuilder =
copy(connectionPoolSettingsBuilder = connectionPoolSettingsBuilder)
def withConnectionPoolSettingsBuilderFromAttributeMap(): AkkaHttpClientBuilder = copy(connectionPoolSettingsBuilder = buildConnectionPoolSettings)
}

lazy val xAmzJson = ContentType(MediaType.customBinary("application", "x-amz-json-1.0", Compressible))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ package com.github.matsluni.akkahttpspi

import akka.http.scaladsl.model.headers.`Content-Type`
import akka.http.scaladsl.model.MediaTypes
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import com.typesafe.config.ConfigFactory
import org.scalatest.OptionValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import software.amazon.awssdk.http.SdkHttpConfigurationOption
import software.amazon.awssdk.utils.AttributeMap

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues {
Expand All @@ -46,5 +51,95 @@ class AkkaHttpClientSpec extends AnyWordSpec with Matchers with OptionValues {
contentTypeHeader.value.lowercaseName() shouldBe `Content-Type`.lowercaseName
reqHeaders should have size 1
}

"build() should use default ConnectionPoolSettings" in {
val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()
.build()
.asInstanceOf[AkkaHttpClient]

akkaClient.connectionSettings shouldBe ConnectionPoolSettings(ConfigFactory.load())
}

"withConnectionPoolSettingsBuilderFromAttributeMap().buildWithDefaults() should propagate configuration options" in {
val attributeMap = AttributeMap.builder()
.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, toJava(1.second))
.put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, toJava(2.second))
.put(SdkHttpConfigurationOption.MAX_CONNECTIONS, Integer.valueOf(3))
.put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, toJava(4.second))
.build()
val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()
.withConnectionPoolSettingsBuilderFromAttributeMap()
.buildWithDefaults(attributeMap)
.asInstanceOf[AkkaHttpClient]

akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe 1.second
akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe 2.seconds
akkaClient.connectionSettings.maxConnections shouldBe 3
akkaClient.connectionSettings.maxConnectionLifetime shouldBe 4.seconds
}

"withConnectionPoolSettingsBuilderFromAttributeMap().build() should fallback to GLOBAL_HTTP_DEFAULTS" in {
val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()
.withConnectionPoolSettingsBuilderFromAttributeMap()
.build()
.asInstanceOf[AkkaHttpClient]

akkaClient.connectionSettings.connectionSettings.connectingTimeout shouldBe
toScala(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT))
akkaClient.connectionSettings.connectionSettings.idleTimeout shouldBe
toScala(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT))
akkaClient.connectionSettings.maxConnections shouldBe
SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.MAX_CONNECTIONS).intValue()
infiniteToZero(akkaClient.connectionSettings.maxConnectionLifetime) shouldBe
SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS.get(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE)
}

"withConnectionPoolSettingsBuilder().build() should use passed connectionPoolSettings builder" in {
val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load())
.withConnectionSettings(
ClientConnectionSettings(ConfigFactory.load())
.withConnectingTimeout(1.second)
.withIdleTimeout(2.seconds)
)
.withMaxConnections(3)
.withMaxConnectionLifetime(4.seconds)

val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()
.withConnectionPoolSettingsBuilder((_, _) => connectionPoolSettings)
.build()
.asInstanceOf[AkkaHttpClient]

akkaClient.connectionSettings shouldBe connectionPoolSettings
}

"withConnectionPoolSettings().build() should use passed ConnectionPoolSettings" in {
val connectionPoolSettings = ConnectionPoolSettings(ConfigFactory.load())
.withConnectionSettings(
ClientConnectionSettings(ConfigFactory.load())
.withConnectingTimeout(1.second)
.withIdleTimeout(2.seconds)
)
.withMaxConnections(3)
.withMaxConnectionLifetime(4.seconds)
val akkaClient: AkkaHttpClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory()
.withConnectionPoolSettings(connectionPoolSettings)
.build()
.asInstanceOf[AkkaHttpClient]

akkaClient.connectionSettings shouldBe connectionPoolSettings
}
}

private def infiniteToZero(duration: scala.concurrent.duration.Duration): java.time.Duration = duration match {
case _: scala.concurrent.duration.Duration.Infinite => java.time.Duration.ZERO
case duration: FiniteDuration => toJava(duration)
}

private def toJava(duration: scala.concurrent.duration.FiniteDuration): java.time.Duration = {
java.time.Duration.ofNanos(duration.toNanos)
}

private def toScala(duration: java.time.Duration): scala.concurrent.duration.FiniteDuration = {
scala.concurrent.duration.Duration.fromNanos(duration.toNanos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] {
lazy val exposedServicePort: Int = 4566

private lazy val containerInstance = new GenericContainer(
dockerImage = "localstack/localstack",
dockerImage = "localstack/localstack:1.4.0",
exposedPorts = Seq(exposedServicePort),
env = Map("SERVICES" -> service),
waitStrategy = Some(LocalStackReadyLogWaitStrategy)
Expand Down
Loading