Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scalafmt-core to 3.0.8 #344

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.0.5
version = 3.0.8
maxColumn = 100
align.openBracketDefnSite = true
align.arrowEnumeratorGenerator = true
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ lazy val catsMTLVersion = "0.7.1"
lazy val commonSettings = Seq(
scalacOptions += "-Xsource:2.13",
scalacOptions ~= { opts => opts.filterNot(Set("-Xlint:nullary-override")) },
libraryDependencies ++= Seq(
compilerPlugin("org.typelevel" % "kind-projector" % kindProjectorVersion cross CrossVersion.full)
libraryDependencies ++= Seq(
compilerPlugin(
"org.typelevel" % "kind-projector" % kindProjectorVersion cross CrossVersion.full
)
),
addCompilerPlugin("com.olegpy" %% "better-monadic-for" % betterMonadicForVersion),
parallelExecution in Test := false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ final class DistributedProcessing(settings: DistributedProcessingSettings) {
}
}

/**
* Starts `processes` distributed over internal Kafka topic consumer.
*
* @param name - used as groupId for underlying Kafka partition assignment machinery
* @param processes - list of processes to distribute
/** Starts `processes` distributed over internal Kafka topic consumer.
*
* @param name
* - used as groupId for underlying Kafka partition assignment machinery
* @param processes
* - list of processes to distribute
*/
def start[F[_]: Async: Temporal](name: String, processes: List[F[Unit]]): F[Unit] =
Kafka
Expand Down Expand Up @@ -73,7 +73,8 @@ final case class DistributedProcessingSettings(brokers: Set[String],
topicName: String,
pollingInterval: FiniteDuration = 500.millis,
pollTimeout: FiniteDuration = 50.millis,
consumerSettings: Map[String, String] = Map.empty) {
consumerSettings: Map[String, String] = Map.empty
) {
def withClientId(clientId: String): DistributedProcessingSettings =
withConsumerSetting(ConsumerConfig.CLIENT_ID_CONFIG, clientId)

Expand All @@ -84,8 +85,8 @@ final case class DistributedProcessingSettings(brokers: Set[String],

def asProperties(groupId: String): Properties = {
val properties = new Properties()
consumerSettings.foreach {
case (key, value) => properties.setProperty(key, value)
consumerSettings.foreach { case (key, value) =>
properties.setProperty(key, value)
}
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers.mkString(","))
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ import scala.concurrent.duration._
object Supervision {
type Supervision[F[_]] = F[Unit] => F[Unit]
def exponentialBackoff[F[_]: Temporal: ApplicativeError[*[_], Throwable]](
minBackoff: FiniteDuration = 2.seconds,
maxBackoff: FiniteDuration = 10.seconds,
randomFactor: Double = 0.2,
maxAttempts: Int = Int.MaxValue
minBackoff: FiniteDuration = 2.seconds,
maxBackoff: FiniteDuration = 10.seconds,
randomFactor: Double = 0.2,
maxAttempts: Int = Int.MaxValue
): Supervision[F] = {
def nextDelay(in: FiniteDuration): FiniteDuration =
FiniteDuration((in.toMillis * (1 + randomFactor)).toLong, MILLISECONDS).min(maxBackoff)
fa =>
retry(fa, minBackoff, nextDelay, maxAttempts, Function.const(true)).compile.drain
fa => retry(fa, minBackoff, nextDelay, maxAttempts, Function.const(true)).compile.drain
}
def noop[F[_]]: Supervision[F] = identity
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import cats.implicits._

private[kafkadistributedprocessing] final case class Channel[F[_]](watch: F[CompletionCallback[F]],
close: F[Unit],
call: F[Unit])
call: F[Unit]
)

private[kafkadistributedprocessing] object Channel {
type CompletionCallback[F[_]] = F[Unit]
Expand All @@ -20,12 +21,12 @@ private[kafkadistributedprocessing] object Channel {
close = closed.complete(()).void
watch = deferredCallback.get
call = Deferred[F, Unit]
.flatMap { deferredCompletion =>
deferredCallback
.complete(deferredCompletion.complete(()).attempt.void) >> deferredCompletion.get
}
.race(closed.get)
.void
.flatMap { deferredCompletion =>
deferredCallback
.complete(deferredCompletion.complete(()).attempt.void) >> deferredCompletion.get
}
.race(closed.get)
.void

} yield internal.Channel(watch, close, call)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ private[kafkadistributedprocessing] object Kafka {
final case class AssignedPartition[F[_]](partition: Partition,
partitionCount: Int,
watchRevocation: F[CompletionCallback[F]],
release: F[Unit])
release: F[Unit]
)

def watchRebalanceEvents[F[_]: Async](
consumer: KafkaConsumer[F, Unit, Unit],
topic: String,
pollingInterval: FiniteDuration,
pollTimeout: FiniteDuration
consumer: KafkaConsumer[F, Unit, Unit],
topic: String,
pollingInterval: FiniteDuration,
pollTimeout: FiniteDuration
)(implicit T: Temporal[F]): Stream[F, Committable[F, RebalanceEvent]] =
Stream
.force(
Expand All @@ -42,10 +43,10 @@ private[kafkadistributedprocessing] object Kafka {
)

def assignPartitions[F[_]: Async: Temporal](
config: Properties,
topic: String,
pollingInterval: FiniteDuration,
pollTimeout: FiniteDuration
config: Properties,
topic: String,
pollingInterval: FiniteDuration,
pollTimeout: FiniteDuration
): Stream[F, AssignedPartition[F]] =
Stream
.resource(KafkaConsumer.create[F](config, new UnitDeserializer, new UnitDeserializer))
Expand All @@ -60,9 +61,12 @@ private[kafkadistributedprocessing] object Kafka {
case RebalanceEvent.PartitionsAssigned(partitions) =>
partitions.toList
.traverse { partition =>
Channel.create[F].map {
case Channel(watch, close, call) =>
AssignedPartition(partition.partition(), partitionCount, watch, close) -> call
Channel.create[F].map { case Channel(watch, close, call) =>
AssignedPartition(partition.partition(),
partitionCount,
watch,
close
) -> call
}
}
.map { list =>
Expand All @@ -88,9 +92,8 @@ private[kafkadistributedprocessing] object Kafka {
handleEvent <* commit
}
}
.flatMap {
case (assignedPartitions, _) =>
Stream.emits(assignedPartitions)
.flatMap { case (assignedPartitions, _) =>
Stream.emits(assignedPartitions)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

private[kafkadistributedprocessing] final class KafkaConsumer[F[_], K, V](
withConsumer: (Consumer[K, V] => *) ~> F
withConsumer: (Consumer[K, V] => *) ~> F
) {

def subscribe(topics: Set[String], listener: ConsumerRebalanceListener): F[Unit] =
Expand All @@ -41,26 +41,24 @@ private[kafkadistributedprocessing] final class KafkaConsumer[F[_], K, V](
private[kafkadistributedprocessing] object KafkaConsumer {
final class Create[F[_]] {
def apply[K, V](
config: Properties,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
config: Properties,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V]
)(implicit F: Async[F]): Resource[F, KafkaConsumer[F, K, V]] = {
val create = F.defer {
val executor = Executors.newSingleThreadExecutor()

def eval[A](a: => A): F[A] =
F.async_[A] { cb =>
executor.execute(
() =>
cb {
try Right(a)
catch {
case e: Throwable => Left(e)
}
executor.execute(() =>
cb {
try Right(a)
catch {
case e: Throwable => Left(e)
}
)
}
.evalOn(ExecutionContext.fromExecutor(executor))
}
)
}.evalOn(ExecutionContext.fromExecutor(executor))

eval {
val original = Thread.currentThread.getContextClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@ import scala.collection.JavaConverters._
private[kafkadistributedprocessing] object RebalanceEvents {
final class UsePartiallyApplied[F[_]] {
def subscribe[A](
f: ConsumerRebalanceListener => F[Unit]
f: ConsumerRebalanceListener => F[Unit]
)(implicit F: Async[F]): F[Stream[F, Committable[F, RebalanceEvent]]] =
for {
queue <- Queue.unbounded[F, Option[Committable[F, RebalanceEvent]]]
dispatcher <- Dispatcher[F].allocated.map(_._1)
listener = new Listener[F](
event =>
Deferred[F, Unit]
.flatMap { completion =>
queue.offer(Committable(completion.complete(()).void, event).some) >> completion.get
},
dispatcher
)
event =>
Deferred[F, Unit]
.flatMap { completion =>
queue.offer(
Committable(completion.complete(()).void, event).some
) >> completion.get
},
dispatcher
)
_ <- f(listener)
} yield Stream.fromQueueNoneTerminated(queue)
}
Expand All @@ -42,8 +44,8 @@ private[kafkadistributedprocessing] object RebalanceEvents {
}

private final class Listener[F[_]: Async](processEvent: RebalanceEvent => F[Unit],
dispatcher: Dispatcher[F])
extends ConsumerRebalanceListener {
dispatcher: Dispatcher[F]
) extends ConsumerRebalanceListener {

override def onPartitionsRevoked(partitions: util.Collection[common.TopicPartition]): Unit =
dispatcher.unsafeRunSync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ class ChannelTest extends AnyFunSuite with IOSupport {
test("Channel#call completes only after completion callback") {
val out = Channel
.create[IO]
.flatMap {
case Channel(watch, _, call) =>
Queue.unbounded[IO, String].flatMap { queue =>
for {
fiber <- watch.flatMap { callback =>
queue.enqueue1("before callback") >> callback
}.start
_ <- queue.enqueue1("before call") >> call >> queue.enqueue1("after call")
_ <- fiber.join
out <- queue.dequeue.take(3).compile.toList
} yield out
}
.flatMap { case Channel(watch, _, call) =>
Queue.unbounded[IO, String].flatMap { queue =>
for {
fiber <- watch.flatMap { callback =>
queue.enqueue1("before callback") >> callback
}.start
_ <- queue.enqueue1("before call") >> call >> queue.enqueue1("after call")
_ <- fiber.join
out <- queue.dequeue.take(3).compile.toList
} yield out
}
}
.unsafeRunTimed(1.seconds)
.get
Expand All @@ -34,15 +33,14 @@ class ChannelTest extends AnyFunSuite with IOSupport {

Channel
.create[IO]
.flatMap {
case Channel(watch, close, call) =>
for {
w <- watch.start
c <- call.start
_ <- close
_ <- c.join
_ <- w.cancel
} yield ()
.flatMap { case Channel(watch, close, call) =>
for {
w <- watch.start
c <- call.start
_ <- close
_ <- c.join
_ <- w.cancel
} yield ()
}
.replicateA(100)
.unsafeRunTimed(10.seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class KafkaTest extends AnyFunSuite with IOSupport with KafkaSupport {
queue <- Queue.unbounded[IO, (Int, RebalanceEvent)]

run = (n: Int) =>
watchRebalanceEvents
.evalMap { x =>
val e = n -> x.value
queue.enqueue1(e) >> x.commit
}
.compile
.drain
.start
watchRebalanceEvents
.evalMap { x =>
val e = n -> x.value
queue.enqueue1(e) >> x.commit
}
.compile
.drain
.start

p1 <- run(1)

Expand All @@ -70,14 +70,13 @@ class KafkaTest extends AnyFunSuite with IOSupport with KafkaSupport {
val (l1, l2, l3) = program.unsafeRunTimed(40.seconds).get

def fold(list: List[(Int, RebalanceEvent)]): Map[Int, Set[Int]] =
list.foldLeft(Map.empty[Int, Set[Int]]) {
case (s, (c, e)) =>
e match {
case PartitionsRevoked(partitions) =>
s.updated(c, s.getOrElse(c, Set.empty[Int]) -- partitions.map(_.partition()))
case PartitionsAssigned(partitions) =>
s.updated(c, s.getOrElse(c, Set.empty[Int]) ++ partitions.map(_.partition()))
}
list.foldLeft(Map.empty[Int, Set[Int]]) { case (s, (c, e)) =>
e match {
case PartitionsRevoked(partitions) =>
s.updated(c, s.getOrElse(c, Set.empty[Int]) -- partitions.map(_.partition()))
case PartitionsAssigned(partitions) =>
s.updated(c, s.getOrElse(c, Set.empty[Int]) ++ partitions.map(_.partition()))
}
}

assert(fold(l1) == Map(1 -> Set(1, 0, 3, 2)))
Expand Down