Skip to content

Commit

Permalink
Update scalafmt-core to 3.8.0 in series/3.x (#1301)
Browse files Browse the repository at this point in the history
* Update scalafmt-core to 3.8.0 in series/3.x

* Reformat with scalafmt 3.8.0

Executed command: scalafmt --non-interactive

* Add 'Reformat with scalafmt 3.8.0' to .git-blame-ignore-revs
  • Loading branch information
scala-steward authored Feb 23, 2024
1 parent 6973623 commit 6d34008
Show file tree
Hide file tree
Showing 20 changed files with 82 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ db0ea0ffd3d0956211a680941426f73ba7ec581b

# Scala Steward: Reformat with scalafmt 3.7.17
3dfecc9738dfc09b1bbd9a21b7f791f6ee7890b9

# Scala Steward: Reformat with scalafmt 3.8.0
5b4372fb8bddae0f2b4ce5e0f8b1b7e075899359
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.7.17"
version = "3.8.0"

# Scala 2 with -Xsource:3 compiler option
runner.dialect = scala213source3
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ lazy val dependencySettings = Seq(
},
pomPostProcess := { (node: xml.Node) =>
new xml.transform.RuleTransformer(new xml.transform.RewriteRule {

def scopedDependency(e: xml.Elem): Boolean =
e.label == "dependency" && e.child.exists(_.label == "scope")

Expand All @@ -137,6 +138,7 @@ lazy val dependencySettings = Seq(
case e: xml.Elem if scopedDependency(e) => Nil
case _ => Seq(node)
}

}).transform(node).head
}
)
Expand Down
10 changes: 7 additions & 3 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ If we have a Java Kafka deserializer, use `delegate` to create a `Deserializer`.
```scala mdoc:silent
Deserializer.delegate[IO, String] {
new KafkaDeserializer[String] {

def deserialize(topic: String, data: Array[Byte]): String =
new String(data)

}
}
```
Expand All @@ -92,10 +94,12 @@ If the deserializer performs _side effects_, follow with `suspend` to capture th
Deserializer
.delegate[IO, String] {
new KafkaDeserializer[String] {

def deserialize(topic: String, data: Array[Byte]): String = {
println(s"deserializing record on topic $topic")
new String(data)
}

}
}
.suspend
Expand Down Expand Up @@ -286,14 +290,14 @@ The recommended pattern for these use cases is by working on the `Chunk`s of rec

```scala mdoc:silent
object ConsumerChunkExample extends IOApp.Simple {

val run: IO[Unit] = {
def processRecords(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] =
records.traverse(record => IO.println(s"Processing record: $record")).as(CommitNow)

KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.consumeChunk(processRecords)
KafkaConsumer.stream(consumerSettings).subscribeTo("topic").consumeChunk(processRecords)
}

}
```

Expand Down
4 changes: 4 additions & 0 deletions docs/src/main/mdoc/producers.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ If we have a Java Kafka serializer, use `delegate` to create a `Serializer`.
```scala mdoc:silent
Serializer.delegate[IO, String] {
new KafkaSerializer[String] {

def serialize(topic: String, value: String): Array[Byte] =
value.getBytes("UTF-8")

}
}
```
Expand All @@ -90,10 +92,12 @@ If the serializer performs _side effects_, follow with `suspend` to capture them
Serializer
.delegate[IO, String] {
new KafkaSerializer[String] {

def serialize(topic: String, value: String): Array[Byte] = {
println(s"serializing record on topic $topic")
value.getBytes("UTF-8")
}

}
}
.suspend
Expand Down
26 changes: 16 additions & 10 deletions docs/src/main/mdoc/quick-example.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import fs2.kafka._
import fs2.kafka.consumer.KafkaConsumeChunk.CommitNow

object Main extends IOApp.Simple {

val run: IO[Unit] = {
val consumerSettings =
ConsumerSettings[IO, String, String]
Expand All @@ -24,23 +25,28 @@ object Main extends IOApp.Simple {
.withGroupId("group")

val producerSettings =
ProducerSettings[IO, String, String]
.withBootstrapServers("localhost:9092")
ProducerSettings[IO, String, String].withBootstrapServers("localhost:9092")

def processRecords(producer: KafkaProducer[IO, String, String])(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] = {
val producerRecords = records.map(consumerRecord => ProducerRecord("topic", consumerRecord.key, consumerRecord.value))
def processRecords(
producer: KafkaProducer[IO, String, String]
)(records: Chunk[ConsumerRecord[String, String]]): IO[CommitNow] = {
val producerRecords = records
.map(consumerRecord => ProducerRecord("topic", consumerRecord.key, consumerRecord.value))
producer.produce(producerRecords).flatten.as(CommitNow)
}

val stream =
KafkaProducer.stream(producerSettings).evalMap { producer =>
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic")
.consumeChunk(chunk => processRecords(producer)(chunk))
}
KafkaProducer
.stream(producerSettings)
.evalMap { producer =>
KafkaConsumer
.stream(consumerSettings)
.subscribeTo("topic")
.consumeChunk(chunk => processRecords(producer)(chunk))
}

stream.compile.drain
}

}
```
2 changes: 2 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object CommittableOffset {
val _commit = commit

new CommittableOffset[F] {

override val topicPartition: TopicPartition =
_topicPartition

Expand Down Expand Up @@ -119,6 +120,7 @@ object CommittableOffset {
case None =>
show"CommittableOffset($topicPartition -> $offsetAndMetadata)"
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ object CommittableOffsetBatch {
val _commit = commit

new CommittableOffsetBatch[F] {

override def updated(that: CommittableOffset[F]): CommittableOffsetBatch[F] =
CommittableOffsetBatch(
_offsets.updated(that.topicPartition, that.offsetAndMetadata),
Expand Down Expand Up @@ -131,6 +132,7 @@ object CommittableOffsetBatch {

override def toString: String =
Show[CommittableOffsetBatch[F]].show(this)

}
}

Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/Headers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ object Headers {
toChain.iterator.toArray

new KafkaHeaders {

override def add(header: KafkaHeader): KafkaHeaders =
throw new IllegalStateException("Headers#asJava is immutable")

Expand All @@ -128,6 +129,7 @@ object Headers {

override def iterator(): java.util.Iterator[KafkaHeader] =
array.iterator.asJava

}
}

Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/Jitter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ object Jitter {
F.delay(new java.util.Random())
.map { random =>
new Jitter[F] {

override def withJitter(n: Double): F[Double] =
F.delay(random.nextDouble()).map(_ * n)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ object KafkaProducerConnection {
): Resource[F, KafkaProducerConnection[G]] =
WithProducer(mk, settings).map { withProducer =>
new KafkaProducerConnection[G] {

override def produce[K, V](
records: ProducerRecords[K, V]
)(implicit
Expand All @@ -135,6 +136,7 @@ object KafkaProducerConnection {

override def metrics: G[Map[MetricName, Metric]] =
withProducer.blocking(_.metrics().asScala.toMap)

override def withSerializers[K, V](
keySerializer: KeySerializer[G, K],
valueSerializer: ValueSerializer[G, V]
Expand All @@ -148,6 +150,7 @@ object KafkaProducerConnection {

override def partitionsFor(topic: String): G[List[PartitionInfo]] =
withProducer.blocking(_.partitionsFor(topic).asScala.toList)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object TransactionalKafkaProducer {
WithTransactionalProducer(mk, settings)
).mapN { (keySerializer, valueSerializer, withProducer) =>
new TransactionalKafkaProducer.WithoutOffsets[F, K, V] {

override def produce(
records: TransactionalProducerRecords[F, K, V]
): F[ProducerResult[K, V]] =
Expand Down Expand Up @@ -160,6 +161,7 @@ object TransactionalKafkaProducer {

override def toString: String =
"TransactionalKafkaProducer$" + System.identityHashCode(this)

}
}

Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/main/scala/fs2/kafka/internal/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ private[kafka] object Logging {
F.delay(LoggerFactory.getLogger(name))
.map { logger =>
new Logging[F] {

override def log(entry: LogEntry): F[Unit] =
F.delay {
entry.level match {
Expand All @@ -41,6 +42,7 @@ private[kafka] object Logging {
logger.debug(entry.message)
}
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ private[kafka] object WithAdminClient {
mk(settings).map { adminClient =>
val withAdminClient =
new WithAdminClient[G] {

override def apply[A](f: AdminClient => KafkaFuture[A]): G[A] =
G.delay(f(adminClient)).cancelable_

}

val close =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ private[kafka] object WithConsumer {
Resource.make {
mk(settings).map { consumer =>
new WithConsumer[F] {

override def blocking[A](f: KafkaByteConsumer => A): F[A] =
b(f(consumer))

}
}
}(_.blocking(_.close(settings.closeTimeout.toJava)))
Expand Down
13 changes: 7 additions & 6 deletions modules/core/src/test/scala/fs2/kafka/KafkaAdminClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ final class KafkaAdminClientSpec extends BaseKafkaSpec {
.map((groupId, _))
}
_ <- IO(assert(consumerGroupOffsetsPartitions.size == 1))
_ <- IO {
adminClient
.listConsumerGroupOffsets(consumerGroupId)
.forPartitions(List(new TopicPartition("topic", 0)))
.toString shouldBe "ListConsumerGroupOffsetsForPartitions(groupId = test-group-id, partitions = List(topic-0))"
}
_ <-
IO {
adminClient
.listConsumerGroupOffsets(consumerGroupId)
.forPartitions(List(new TopicPartition("topic", 0)))
.toString shouldBe "ListConsumerGroupOffsetsForPartitions(groupId = test-group-id, partitions = List(topic-0))"
}
partition0 = new TopicPartition(topic, 0)
updatedOffset = new OffsetAndMetadata(0)
_ <- adminClient
Expand Down
6 changes: 6 additions & 0 deletions modules/core/src/test/scala/fs2/kafka/SerializerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,13 @@ final class SerializerSpec extends BaseCatsSpec {
Serializer
.delegate[IO, Int](
new KafkaSerializer[Int] {

override def close(): Unit = ()
override def configure(props: java.util.Map[String, ?], isKey: Boolean): Unit = ()

override def serialize(topic: String, int: Int): Array[Byte] =
throw new RuntimeException

}
)
.suspend
Expand Down Expand Up @@ -111,10 +114,13 @@ final class SerializerSpec extends BaseCatsSpec {
Serializer
.delegate[IO, Int](
new KafkaSerializer[Int] {

override def close(): Unit = ()
override def configure(props: java.util.Map[String, ?], isKey: Boolean): Unit = ()

override def serialize(topic: String, int: Int): Array[Byte] =
throw new RuntimeException

}
)
.suspend
Expand Down
Loading

0 comments on commit 6d34008

Please sign in to comment.