Skip to content

Commit

Permalink
chore: akka to 2.10.0-M1, align with changes from upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers committed Sep 25, 2024
1 parent 5ee0a0a commit 2397b24
Show file tree
Hide file tree
Showing 34 changed files with 191 additions and 197 deletions.
1 change: 0 additions & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ updates.pin = [
{ groupId = "org.scalatest", artifactId = "scalatest", version = "3.1." }
{ groupId = "org.slf4j", artifactId = "log4j-over-slf4j", version = "1." }
{ groupId = "org.slf4j", artifactId = "jul-to-slf4j", version = "1." }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.2." }
]

commits.message = "bump: ${artifactName} ${nextVersion} (was ${currentVersion})"
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package akka.kafka.benchmarks
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.kafka.ConsumerMessage.CommittableMessage
import akka.kafka.benchmarks.InflightMetrics.{BrokerMetricRequest, ConsumerMetricRequest}
import akka.kafka.scaladsl.Committer
Expand Down Expand Up @@ -163,7 +162,7 @@ object ReactiveKafkaConsumerBenchmarks extends LazyLogging with InflightMetrics
val control = fixture.source
.mapAsync(1) { m =>
meter.mark()
m.committableOffset.commitInternal().map(_ => m)(ExecutionContexts.parasitic)
m.committableOffset.commitInternal().map(_ => m)(ExecutionContext.parasitic)
}
.toMat(Sink.foreach { msg =>
if (msg.committableOffset.partitionOffset.offset >= fixture.msgCount - 1)
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ val ScalaVersions = Seq(Scala213, Scala3)

val Scala3Settings = Seq(crossScalaVersions := ScalaVersions)

val AkkaBinaryVersionForDocs = "2.9"
val akkaVersion = "2.9.3"
val akkaVersion = "2.10.0-M1"
val AkkaBinaryVersionForDocs = VersionNumber(akkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }

// Keep .scala-steward.conf pin in sync
val kafkaVersion = "3.7.1"
Expand All @@ -26,7 +26,7 @@ val KafkaVersionForDocs = "37"
// https://github.com/akka/akka/blob/main/project/Dependencies.scala#L44
val scalatestVersion = "3.2.16"
val testcontainersVersion = "1.20.1"
val slf4jVersion = "1.7.36"
val slf4jVersion = "2.0.16"
// this depends on Kafka, and should be upgraded to such latest version
// that depends on the same Kafka version, as is defined above
// See https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer?repo=confluent-packages
Expand Down Expand Up @@ -304,7 +304,7 @@ lazy val tests = project
"org.hamcrest" % "hamcrest" % "3.0" % Test,
"net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.13" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.7" % Test,
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % Test,
// Schema registry uses Glassfish which uses java.util.logging
"org.slf4j" % "jul-to-slf4j" % slf4jVersion % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import org.apache.kafka.common.utils.Utils

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.DurationConverters._
import scala.jdk.FutureConverters._
import scala.util.{Failure, Success}
import akka.util.JavaDurationConverters._
import org.slf4j.LoggerFactory

import scala.compat.java8.FutureConverters._

/**
* API MAY CHANGE
Expand Down Expand Up @@ -81,9 +81,9 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
def messageExtractor[M](topic: String,
timeout: java.time.Duration,
settings: ConsumerSettings[_, _]): CompletionStage[KafkaShardingMessageExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
getPartitionCount(topic, timeout.toScala, settings)
.map(new KafkaShardingMessageExtractor[M](_))(system.dispatcher)
.toJava
.asJava

/**
* API MAY CHANGE
Expand Down Expand Up @@ -147,11 +147,11 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
entityIdExtractor: java.util.function.Function[M, String],
settings: ConsumerSettings[_, _]
): CompletionStage[KafkaShardingNoEnvelopeExtractor[M]] =
getPartitionCount(topic, timeout.asScala, settings)
getPartitionCount(topic, timeout.toScala, settings)
.map(partitions => new KafkaShardingNoEnvelopeExtractor[M](partitions, e => entityIdExtractor.apply(e)))(
system.dispatcher
)
.toJava
.asJava

/**
* API MAY CHANGE
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/akka/kafka/CommitterSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ package akka.kafka
import java.util.concurrent.TimeUnit

import akka.annotation.ApiMayChange
import akka.util.JavaDurationConverters._

import com.typesafe.config.Config

import scala.concurrent.duration._
import scala.jdk.DurationConverters._

@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/882")
sealed trait CommitDelivery
Expand Down Expand Up @@ -176,7 +177,7 @@ class CommitterSettings private (
copy(maxInterval = maxInterval)

def withMaxInterval(maxInterval: java.time.Duration): CommitterSettings =
copy(maxInterval = maxInterval.asScala)
copy(maxInterval = maxInterval.toScala)

def withParallelism(parallelism: Int): CommitterSettings =
copy(parallelism = parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package akka.kafka

import akka.util.JavaDurationConverters._
import com.typesafe.config.Config

import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import java.time.{Duration => JDuration}

Expand Down Expand Up @@ -40,7 +40,7 @@ class ConnectionCheckerSettings private[kafka] (val enable: Boolean,

/** Java API */
def withCheckInterval(checkInterval: JDuration): ConnectionCheckerSettings =
copy(checkInterval = checkInterval.asScala)
copy(checkInterval = checkInterval.toScala)

override def toString: String =
s"akka.kafka.ConnectionCheckerSettings(" +
Expand Down Expand Up @@ -70,7 +70,7 @@ object ConnectionCheckerSettings {
if (enable) {
val retries = config.getInt("max-retries")
val factor = config.getDouble("backoff-factor")
val checkInterval = config.getDuration("check-interval").asScala
val checkInterval = config.getDuration("check-interval").toScala
apply(retries, checkInterval, factor)
} else Disabled
}
Expand Down
80 changes: 40 additions & 40 deletions core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import java.util.concurrent.{CompletionStage, Executor}

import akka.annotation.InternalApi
import akka.kafka.internal._
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer

import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters._

object ConsumerSettings {

Expand Down Expand Up @@ -74,25 +74,25 @@ object ConsumerSettings {
(valueDeserializer.isDefined || properties.contains(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
"Value deserializer should be defined or declared in configuration"
)
val pollInterval = config.getDuration("poll-interval").asScala
val pollTimeout = config.getDuration("poll-timeout").asScala
val stopTimeout = config.getDuration("stop-timeout").asScala
val closeTimeout = config.getDuration("close-timeout").asScala
val commitTimeout = config.getDuration("commit-timeout").asScala
val commitTimeWarning = config.getDuration("commit-time-warning").asScala
val pollInterval = config.getDuration("poll-interval").toScala
val pollTimeout = config.getDuration("poll-timeout").toScala
val stopTimeout = config.getDuration("stop-timeout").toScala
val closeTimeout = config.getDuration("close-timeout").toScala
val commitTimeout = config.getDuration("commit-timeout").toScala
val commitTimeWarning = config.getDuration("commit-time-warning").toScala
val commitRefreshInterval = ConfigSettings.getPotentiallyInfiniteDuration(config, "commit-refresh-interval")
val dispatcher = config.getString("use-dispatcher")
val waitClosePartition = config.getDuration("wait-close-partition").asScala
val positionTimeout = config.getDuration("position-timeout").asScala
val offsetForTimesTimeout = config.getDuration("offset-for-times-timeout").asScala
val metadataRequestTimeout = config.getDuration("metadata-request-timeout").asScala
val drainingCheckInterval = config.getDuration("eos-draining-check-interval").asScala
val waitClosePartition = config.getDuration("wait-close-partition").toScala
val positionTimeout = config.getDuration("position-timeout").toScala
val offsetForTimesTimeout = config.getDuration("offset-for-times-timeout").toScala
val metadataRequestTimeout = config.getDuration("metadata-request-timeout").toScala
val drainingCheckInterval = config.getDuration("eos-draining-check-interval").toScala
val connectionCheckerSettings = ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
val partitionHandlerWarning = config.getDuration("partition-handler-warning").asScala
val partitionHandlerWarning = config.getDuration("partition-handler-warning").toScala
val resetProtectionThreshold = OffsetResetProtectionSettings(
config.getConfig(OffsetResetProtectionSettings.configPath)
)
val consumerGroupUpdateInterval = config.getDuration("consumer-group-update-interval").asScala
val consumerGroupUpdateInterval = config.getDuration("consumer-group-update-interval").toScala

new ConsumerSettings[K, V](
properties,
Expand Down Expand Up @@ -168,7 +168,7 @@ object ConsumerSettings {
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
): ConsumerSettings[K, V] =
apply(system, keyDeserializer.asScala, valueDeserializer.asScala)
apply(system, keyDeserializer.toScala, valueDeserializer.toScala)

/**
* Java API: Create settings from the default configuration
Expand All @@ -182,7 +182,7 @@ object ConsumerSettings {
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
): ConsumerSettings[K, V] =
apply(system, keyDeserializer.asScala, valueDeserializer.asScala)
apply(system, keyDeserializer.toScala, valueDeserializer.toScala)

/**
* Java API: Create settings from a configuration with the same layout as
Expand All @@ -194,7 +194,7 @@ object ConsumerSettings {
keyDeserializer: Optional[Deserializer[K]],
valueDeserializer: Optional[Deserializer[V]]
): ConsumerSettings[K, V] =
apply(config, keyDeserializer.asScala, valueDeserializer.asScala)
apply(config, keyDeserializer.toScala, valueDeserializer.toScala)

/**
* Java API: Create settings from the default configuration
Expand Down Expand Up @@ -369,7 +369,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Set the maximum duration a poll to the Kafka broker is allowed to take.
*/
def withPollTimeout(pollTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(pollTimeout = pollTimeout.asScala)
copy(pollTimeout = pollTimeout.toScala)

/**
* Set the interval from one scheduled poll to the next.
Expand All @@ -382,7 +382,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Set the interval from one scheduled poll to the next.
*/
def withPollInterval(pollInterval: java.time.Duration): ConsumerSettings[K, V] =
copy(pollInterval = pollInterval.asScala)
copy(pollInterval = pollInterval.toScala)

/**
* The stage will await outstanding offset commit requests before
Expand All @@ -399,7 +399,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* stop forcefully.
*/
def withStopTimeout(stopTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(stopTimeout = stopTimeout.asScala)
copy(stopTimeout = stopTimeout.toScala)

/**
* Set duration to wait for `KafkaConsumer.close` to finish.
Expand All @@ -412,7 +412,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Set duration to wait for `KafkaConsumer.close` to finish.
*/
def withCloseTimeout(closeTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(closeTimeout = closeTimeout.asScala)
copy(closeTimeout = closeTimeout.toScala)

/**
* If offset commit requests are not completed within this timeout
Expand All @@ -427,7 +427,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* the returned Future is completed with [[akka.kafka.CommitTimeoutException]].
*/
def withCommitTimeout(commitTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(commitTimeout = commitTimeout.asScala)
copy(commitTimeout = commitTimeout.toScala)

/**
* If commits take longer than this time a warning is logged
Expand All @@ -440,7 +440,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* If commits take longer than this time a warning is logged
*/
def withCommitWarning(commitTimeWarning: java.time.Duration): ConsumerSettings[K, V] =
copy(commitTimeWarning = commitTimeWarning.asScala)
copy(commitTimeWarning = commitTimeWarning.toScala)

/**
* Fully qualified config path which holds the dispatcher configuration
Expand Down Expand Up @@ -468,7 +468,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
*/
def withCommitRefreshInterval(commitRefreshInterval: java.time.Duration): ConsumerSettings[K, V] =
if (commitRefreshInterval.isZero) copy(commitRefreshInterval = Duration.Inf)
else copy(commitRefreshInterval = commitRefreshInterval.asScala)
else copy(commitRefreshInterval = commitRefreshInterval.toScala)

/**
* Time to wait for pending requests when a partition is closed.
Expand All @@ -489,47 +489,47 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* Time to wait for pending requests when a partition is closed.
*/
def withWaitClosePartition(waitClosePartition: java.time.Duration): ConsumerSettings[K, V] =
copy(waitClosePartition = waitClosePartition.asScala)
copy(waitClosePartition = waitClosePartition.toScala)

/** Scala API: Limits the blocking on Kafka consumer position calls. */
def withPositionTimeout(positionTimeout: FiniteDuration): ConsumerSettings[K, V] =
copy(positionTimeout = positionTimeout)

/** Java API: Limits the blocking on Kafka consumer position calls. */
def withPositionTimeout(positionTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(positionTimeout = positionTimeout.asScala)
copy(positionTimeout = positionTimeout.toScala)

/** Scala API: Limits the blocking on Kafka consumer offsetForTimes calls. */
def withOffsetForTimesTimeout(offsetForTimesTimeout: FiniteDuration): ConsumerSettings[K, V] =
copy(offsetForTimesTimeout = offsetForTimesTimeout)

/** Java API: Limits the blocking on Kafka consumer offsetForTimes calls. */
def withOffsetForTimesTimeout(offsetForTimesTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(offsetForTimesTimeout = offsetForTimesTimeout.asScala)
copy(offsetForTimesTimeout = offsetForTimesTimeout.toScala)

/** Scala API */
def withMetadataRequestTimeout(metadataRequestTimeout: FiniteDuration): ConsumerSettings[K, V] =
copy(metadataRequestTimeout = metadataRequestTimeout)

/** Java API */
def withMetadataRequestTimeout(metadataRequestTimeout: java.time.Duration): ConsumerSettings[K, V] =
copy(metadataRequestTimeout = metadataRequestTimeout.asScala)
copy(metadataRequestTimeout = metadataRequestTimeout.toScala)

/** Scala API: Check interval for TransactionalProducer when finishing transaction before shutting down consumer */
def withDrainingCheckInterval(drainingCheckInterval: FiniteDuration): ConsumerSettings[K, V] =
copy(drainingCheckInterval = drainingCheckInterval)

/** Java API: Check interval for TransactionalProducer when finishing transaction before shutting down consumer */
def withDrainingCheckInterval(drainingCheckInterval: java.time.Duration): ConsumerSettings[K, V] =
copy(drainingCheckInterval = drainingCheckInterval.asScala)
copy(drainingCheckInterval = drainingCheckInterval.toScala)

/** Scala API */
def withPartitionHandlerWarning(partitionHandlerWarning: FiniteDuration): ConsumerSettings[K, V] =
copy(partitionHandlerWarning = partitionHandlerWarning)

/** Java API */
def withPartitionHandlerWarning(partitionHandlerWarning: java.time.Duration): ConsumerSettings[K, V] =
copy(partitionHandlerWarning = partitionHandlerWarning.asScala)
copy(partitionHandlerWarning = partitionHandlerWarning.toScala)

/**
* Scala API.
Expand All @@ -547,7 +547,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
def withEnrichCompletionStage(
value: java.util.function.Function[ConsumerSettings[K, V], CompletionStage[ConsumerSettings[K, V]]]
): ConsumerSettings[K, V] =
copy(enrichAsync = Some((s: ConsumerSettings[K, V]) => value.apply(s).toScala))
copy(enrichAsync = Some((s: ConsumerSettings[K, V]) => value.apply(s).asScala))

/**
* Replaces the default Kafka consumer creation logic.
Expand Down Expand Up @@ -576,17 +576,17 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* of more work sending those updates.
*/
def withConsumerGroupUpdateInterval(interval: java.time.Duration): ConsumerSettings[K, V] =
copy(consumerGroupUpdateInterval = interval.asScala)
copy(consumerGroupUpdateInterval = interval.toScala)

/**
* Get the Kafka consumer settings as map.
*/
def getProperties: java.util.Map[String, AnyRef] = properties.asInstanceOf[Map[String, AnyRef]].asJava

def getCloseTimeout: java.time.Duration = closeTimeout.asJava
def getPositionTimeout: java.time.Duration = positionTimeout.asJava
def getOffsetForTimesTimeout: java.time.Duration = offsetForTimesTimeout.asJava
def getMetadataRequestTimeout: java.time.Duration = metadataRequestTimeout.asJava
def getCloseTimeout: java.time.Duration = closeTimeout.toJava
def getPositionTimeout: java.time.Duration = positionTimeout.toJava
def getOffsetForTimesTimeout: java.time.Duration = offsetForTimesTimeout.toJava
def getMetadataRequestTimeout: java.time.Duration = metadataRequestTimeout.toJava

private def copy(
properties: Map[String, String] = properties,
Expand Down Expand Up @@ -675,7 +675,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
* (without blocking for `enriched`).
*/
def createKafkaConsumerCompletionStage(executor: Executor): CompletionStage[Consumer[K, V]] =
enriched.map(consumerFactory)(ExecutionContext.fromExecutor(executor)).toJava
enriched.map(consumerFactory)(ExecutionContext.fromExecutor(executor)).asJava

private final val propertiesAllowList = Set(
"auto.offset.reset",
Expand Down
Loading

0 comments on commit 2397b24

Please sign in to comment.