Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - serde improvements for 3.0 #601

Draft
wants to merge 20 commits into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions docs/src/main/mdoc/modules.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,25 @@ val avroSettings =
}
```

We can then create a `Serializer` and `Deserializer` instance for `Person`.
We use these `AvroSettings` to create a `SchemaRegistryClient`. Here we use `unsafeRunSync`, but this should never be done in production code. Instead the effectful allocation should be composed monadically, as shown below.

```scala mdoc:silent
import fs2.kafka.{RecordDeserializer, RecordSerializer}
import fs2.kafka.vulcan.{avroDeserializer, avroSerializer}
import fs2.kafka.vulcan.SchemaRegistryClient
import cats.effect.unsafe.implicits.global

implicit val personSerializer: RecordSerializer[IO, Person] =
avroSerializer[Person].using(avroSettings)
val schemaRegistryClient = SchemaRegistryClient(avroSettings).unsafeRunSync()
```

We can then create a `ValueSerializer` and `ValueDeserializer` instance for `Person`.

```scala mdoc:silent
import fs2.kafka.{ValueDeserializer, ValueSerializer}

implicit val personSerializer: ValueSerializer[IO, Person] =
schemaRegistryClient.valueSerializer[Person]

implicit val personDeserializer: RecordDeserializer[IO, Person] =
avroDeserializer[Person].using(avroSettings)
implicit val personDeserializer: ValueDeserializer[IO, Person] =
schemaRegistryClient.valueDeserializer[Person]
```

Finally, we can create settings, passing the `Serializer`s and `Deserializer`s implicitly.
Expand Down Expand Up @@ -89,29 +97,18 @@ ProducerSettings(
).withBootstrapServers("localhost:9092")
```

### Sharing Client
### Composing

When creating `AvroSettings` with `SchemaRegistryClientSettings`, one schema registry client will be created per `Serializer` or `Deserializer`. For many cases, this is completely fine, but it's possible to reuse a single client for multiple `Serializer`s and `Deserializer`s.

To share a `SchemaRegistryClient`, we first create it and then pass it to `AvroSettings`.

```scala mdoc:silent
val avroSettingsSharedClient: IO[AvroSettings[IO]] =
SchemaRegistryClientSettings[IO]("http://localhost:8081")
.withAuth(Auth.Basic("username", "password"))
.createSchemaRegistryClient
.map(AvroSettings(_))
```

We can then create multiple `Serializer`s and `Deserializer`s using the `AvroSettings`.

```scala mdoc:silent
avroSettingsSharedClient.map { avroSettings =>
val personSerializer: RecordSerializer[IO, Person] =
avroSerializer[Person].using(avroSettings)
AvroSchemaRegistryClient(avroSettings).map { schemaRegistryClient =>
val personSerializer: ValueSerializer[IO, Person] =
schemaRegistryClient.valueSerializer[Person]

val personDeserializer: RecordDeserializer[IO, Person] =
avroDeserializer[Person].using(avroSettings)
val personDeserializer: ValueDeserializer[IO, Person] =
schemaRegistryClient.valueDeserializer[Person]

val consumerSettings =
ConsumerSettings(
Expand Down
8 changes: 4 additions & 4 deletions modules/core/src/main/scala/fs2/kafka/ConsumerRecord.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ object ConsumerRecord {
private[this] def deserializeFromBytes[F[_], K, V](
record: KafkaByteConsumerRecord,
headers: Headers,
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
keyDeserializer: KeyDeserializer[F, K],
valueDeserializer: ValueDeserializer[F, V]
)(implicit F: Apply[F]): F[(K, V)] = {
val key = keyDeserializer.deserialize(record.topic, headers, record.key)
val value = valueDeserializer.deserialize(record.topic, headers, record.value)
Expand All @@ -177,8 +177,8 @@ object ConsumerRecord {

private[kafka] def fromJava[F[_], K, V](
record: KafkaByteConsumerRecord,
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
keyDeserializer: KeyDeserializer[F, K],
valueDeserializer: ValueDeserializer[F, V]
)(implicit F: Apply[F]): F[ConsumerRecord[K, V]] = {
val headers = record.headers.asScala
deserializeFromBytes(record, headers, keyDeserializer, valueDeserializer).map {
Expand Down
53 changes: 8 additions & 45 deletions modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

package fs2.kafka

import cats.{Applicative, Show}
import cats.Show
import fs2.kafka.security.KafkaCredentialStore
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.requests.OffsetFetchResponse
Expand Down Expand Up @@ -40,12 +40,12 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
/**
* The `Deserializer` to use for deserializing record keys.
*/
def keyDeserializer: F[Deserializer[F, K]]
def keyDeserializer: KeyDeserializer[F, K]

/**
* The `Deserializer` to use for deserializing record values.
*/
def valueDeserializer: F[Deserializer[F, V]]
def valueDeserializer: ValueDeserializer[F, V]

/**
* A custom `ExecutionContext` to use for blocking Kafka operations. If not
Expand Down Expand Up @@ -395,8 +395,8 @@ sealed abstract class ConsumerSettings[F[_], K, V] {

object ConsumerSettings {
private[this] final case class ConsumerSettingsImpl[F[_], K, V](
override val keyDeserializer: F[Deserializer[F, K]],
override val valueDeserializer: F[Deserializer[F, V]],
override val keyDeserializer: KeyDeserializer[F, K],
override val valueDeserializer: ValueDeserializer[F, V],
override val customBlockingContext: Option[ExecutionContext],
override val properties: Map[String, String],
override val closeTimeout: FiniteDuration,
Expand Down Expand Up @@ -529,9 +529,9 @@ object ConsumerSettings {
s"ConsumerSettings(closeTimeout = $closeTimeout, commitTimeout = $commitTimeout, pollInterval = $pollInterval, pollTimeout = $pollTimeout, commitRecovery = $commitRecovery)"
}

private[this] def create[F[_], K, V](
keyDeserializer: F[Deserializer[F, K]],
valueDeserializer: F[Deserializer[F, V]]
def apply[F[_], K, V](
implicit keyDeserializer: KeyDeserializer[F, K],
valueDeserializer: ValueDeserializer[F, V]
): ConsumerSettings[F, K, V] =
ConsumerSettingsImpl(
customBlockingContext = None,
Expand All @@ -550,43 +550,6 @@ object ConsumerSettings {
maxPrefetchBatches = 2
)

def apply[F[_], K, V](
keyDeserializer: Deserializer[F, K],
valueDeserializer: Deserializer[F, V]
)(implicit F: Applicative[F]): ConsumerSettings[F, K, V] =
create(
keyDeserializer = F.pure(keyDeserializer),
valueDeserializer = F.pure(valueDeserializer)
)

def apply[F[_], K, V](
keyDeserializer: RecordDeserializer[F, K],
valueDeserializer: Deserializer[F, V]
)(implicit F: Applicative[F]): ConsumerSettings[F, K, V] =
create(
keyDeserializer = keyDeserializer.forKey,
valueDeserializer = F.pure(valueDeserializer)
)

def apply[F[_], K, V](
keyDeserializer: Deserializer[F, K],
valueDeserializer: RecordDeserializer[F, V]
)(implicit F: Applicative[F]): ConsumerSettings[F, K, V] =
create(
keyDeserializer = F.pure(keyDeserializer),
valueDeserializer = valueDeserializer.forValue
)

def apply[F[_], K, V](
implicit
keyDeserializer: RecordDeserializer[F, K],
valueDeserializer: RecordDeserializer[F, V]
): ConsumerSettings[F, K, V] =
create(
keyDeserializer = keyDeserializer.forKey,
valueDeserializer = valueDeserializer.forValue
)

implicit def consumerSettingsShow[F[_], K, V]: Show[ConsumerSettings[F, K, V]] =
Show.fromToString
}
Loading