Skip to content
This repository has been archived by the owner on Apr 12, 2023. It is now read-only.

Commit

Permalink
Producer (#59)
Browse files Browse the repository at this point in the history
* WIP: Producer
* Validation sample
* Fix avro array and review UI
* Improve UI
* Add stringProducer
* Fix string consumer
* Move objectMapper and avroParser to DI
* Remove generic throwables
* Fix tests
* Improve ProducerViewModelTest
* Add tests StringProducerTest
* Add AvroProducerTest
* Improve JsonToAvroConverterTest
* Small improvement to hints
* Improve ProducerViewModelTest
* Add happy path to ListTopicViewModelTest
  • Loading branch information
andrewinci authored Sep 25, 2020
1 parent cb0c929 commit a8a035b
Show file tree
Hide file tree
Showing 18 changed files with 1,098 additions and 15 deletions.
27 changes: 23 additions & 4 deletions src/main/kotlin/insulator/di/KafkaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@ import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroSerializer
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.koin.core.qualifier.named
import org.koin.dsl.module
import java.util.Properties
Expand All @@ -36,15 +41,27 @@ val kafkaModule = module {
}

// Consumers
factory<Consumer<Any, Any>> {
factory<Consumer<String, String>> {
val properties = get<Properties>()
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
KafkaConsumer<Any, Any>(properties)
KafkaConsumer(properties)
}
factory<Consumer<Any, Any>>(named("avroConsumer")) {
factory<Consumer<String, Any>>(named("avroConsumer")) {
val properties = get<Properties>()
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java
KafkaConsumer<Any, Any>(properties)
KafkaConsumer(properties)
}

// Producers
scoped<Producer<String, String>> {
val properties = get<Properties>()
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
KafkaProducer(properties)
}
scoped<Producer<String, Any>>(named("avroProducer")) {
val properties = get<Properties>()
properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
KafkaProducer(properties)
}

// Properties
Expand Down Expand Up @@ -79,6 +96,8 @@ val kafkaModule = module {
}
}
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
put("auto.register.schemas", false)
}
properties
}
Expand Down
13 changes: 12 additions & 1 deletion src/main/kotlin/insulator/di/LibModule.kt
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
package insulator.di

import com.fasterxml.jackson.databind.ObjectMapper
import insulator.lib.configuration.ConfigurationRepo
import insulator.lib.configuration.model.Cluster
import insulator.lib.jsonhelper.JsonFormatter
import insulator.lib.jsonhelper.JsonToAvroConverter
import insulator.lib.kafka.AdminApi
import insulator.lib.kafka.AvroProducer
import insulator.lib.kafka.Consumer
import insulator.lib.kafka.SchemaRegistry
import insulator.lib.kafka.StringProducer
import kotlinx.serialization.json.Json
import org.apache.avro.Schema
import org.koin.core.qualifier.named
import org.koin.dsl.module

val libModule = module {

// Configurations
// Configurations and helpers
single { Json {} }
single { ConfigurationRepo(get()) }
single { JsonFormatter(get()) }
single { Schema.Parser() }
single { ObjectMapper() }
single { JsonToAvroConverter(get()) }

scope<Cluster> {
factory { AdminApi(get(), get()) }
factory { Consumer(get()) }
factory { AvroProducer(get(named("avroProducer")), get(), get()) }
factory { StringProducer(get()) }
factory { SchemaRegistry(get()) }
}
}
135 changes: 135 additions & 0 deletions src/main/kotlin/insulator/lib/jsonhelper/JsonToAvroConverter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package insulator.lib.jsonhelper

import arrow.core.Either
import arrow.core.left
import arrow.core.right
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.avro.AvroRuntimeException
import org.apache.avro.Conversions
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.SchemaParseException
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.GenericRecordBuilder
import java.nio.ByteBuffer
import javax.xml.bind.DatatypeConverter

class JsonToAvroConverter(private val objectMapper: ObjectMapper) {

fun convert(jsonString: String, schemaString: String): Either<Throwable, GenericRecord> {
return try {
val jsonMap = objectMapper.readValue(jsonString, Map::class.java)
val schema = Parser().parse(schemaString)
val parsed = parseRecord(schema, jsonMap)
if (GenericData().validate(schema, parsed)) {
parsed.right()
} else JsonToAvroException("Generated record is invalid, check the schema").left()
} catch (jsonException: SchemaParseException) {
InvalidSchemaException().left()
} catch (jsonException: JsonParseException) {
InvalidJsonException().left()
} catch (jsonException: JsonProcessingException) {
InvalidJsonException().left()
} catch (jsonToAvroException: JsonToAvroException) {
jsonToAvroException.left()
} catch (avroRuntime: AvroRuntimeException) {
JsonToAvroException(avroRuntime.message).left()
}
}

private fun parseRecord(schema: Schema, jsonMap: Map<*, *>?): GenericRecord {
if (schema.type != Schema.Type.RECORD || jsonMap == null)
throw JsonToAvroException("Expecting record ${schema.name}")
val recordBuilder = GenericRecordBuilder(schema)
schema.fields.forEach { fieldSchema ->
val fieldName = fieldSchema.name()
if (fieldName !in jsonMap) throw JsonToAvroException("Expecting \"${fieldName}\" with type ${printType(fieldSchema.schema())}", fieldName)
val jsonValue = jsonMap[fieldName]

recordBuilder.set(fieldSchema, parseField(fieldSchema.schema(), jsonValue))
}
return recordBuilder.build()
}

private fun printType(schema: Schema): String {
return when (schema.type) {
Schema.Type.NULL -> "null"
Schema.Type.RECORD -> "record \"${schema.name}\""
Schema.Type.BYTES -> "bytes (eg \"0x00\")"
Schema.Type.ENUM -> "enum [${schema.enumSymbols.joinToString(", ")}]"
Schema.Type.UNION -> "union [${schema.types.joinToString(", ") { it.name }}]"
Schema.Type.ARRAY -> "array of \"${schema.elementType.name}\""
else -> schema.type.name.toLowerCase()
}
}

private fun parseField(fieldSchema: Schema, jsonValue: Any?): Any? {
return when (fieldSchema.type) {
Schema.Type.NULL -> if (jsonValue == null) null else throw JsonToAvroException("$jsonValue should be ${printType(fieldSchema)}")
Schema.Type.RECORD -> parseRecord(fieldSchema, jsonValue as? Map<*, *>)
Schema.Type.FLOAT -> parseFloat(fieldSchema, jsonValue)
Schema.Type.BYTES -> parseBytes(fieldSchema, jsonValue)
Schema.Type.ENUM -> parseEnum(fieldSchema, jsonValue)
Schema.Type.UNION -> parseUnion(fieldSchema, jsonValue)
Schema.Type.ARRAY -> parseArray(fieldSchema, jsonValue)
Schema.Type.LONG -> parseLong(jsonValue)
else -> jsonValue
}
}

private fun parseLong(jsonValue: Any?) =
when (jsonValue) {
is Long -> jsonValue
is Int -> jsonValue.toLong()
else -> throw JsonToAvroException("Expecting long but got ${jsonValue?.javaClass?.simpleName}")
}

private fun parseArray(fieldSchema: Schema, jsonValue: Any?): Any {
if (jsonValue !is ArrayList<*>) throw JsonToAvroException("Expecting ${printType(fieldSchema)} but got $jsonValue")
return jsonValue.map { parseField(fieldSchema.elementType, it) }.toList()
}

private fun parseUnion(fieldSchema: Schema, jsonValue: Any?): Any? {
fieldSchema.types.forEach {
val parsed = kotlin.runCatching { parseField(it, jsonValue) }
if (parsed.isSuccess) return parsed.getOrNull()
}
throw JsonToAvroException("Expecting \"${fieldSchema.fields.first().name()}\" with type Union [${fieldSchema.types.joinToString(", ") { it.name }}]")
}

private fun parseEnum(fieldSchema: Schema, jsonValue: Any?): GenericData.EnumSymbol {
val symbols = fieldSchema.enumSymbols
return if (jsonValue == null || jsonValue.toString() !in symbols)
throw JsonToAvroException("Expecting ${printType(fieldSchema)} but got $jsonValue")
else GenericData.EnumSymbol(fieldSchema, jsonValue)
}

private fun parseBytes(fieldSchema: Schema, jsonValue: Any?): ByteBuffer? {
if (jsonValue == null) throw JsonToAvroException("Expecting ${printType(fieldSchema)} but got \"${jsonValue}\"")
if (jsonValue is Double && fieldSchema.logicalType.name == "decimal")
return Conversions.DecimalConversion().runCatching {
toBytes(jsonValue.toBigDecimal(), fieldSchema, fieldSchema.logicalType)
}.fold({ it }, { throw JsonToAvroException("Invalid $jsonValue ${it.message}") })
return when (jsonValue) {
null -> null
is String ->
if (!jsonValue.toLowerCase().startsWith("0x")) throw JsonToAvroException("Invalid $jsonValue, BYTES value need to start with 0x")
else ByteBuffer.wrap(DatatypeConverter.parseHexBinary(jsonValue.substring(2)))
else -> throw JsonToAvroException("Expecting binary but got $jsonValue")
}
}

private fun parseFloat(fieldSchema: Schema, jsonValue: Any?) =
when (jsonValue) {
is Double -> jsonValue.toFloat()
is Float -> jsonValue
else -> throw JsonToAvroException("Expecting ${printType(fieldSchema)} but got $jsonValue")
}
}

class JsonToAvroException(message: String?, val nextField: String? = null) : Throwable(message)
class InvalidJsonException(message: String? = null) : Throwable(message)
class InvalidSchemaException(message: String? = null) : Throwable(message)
53 changes: 53 additions & 0 deletions src/main/kotlin/insulator/lib/kafka/Producer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package insulator.lib.kafka

import arrow.core.Either
import arrow.core.flatMap
import arrow.core.left
import arrow.core.right
import insulator.lib.jsonhelper.JsonToAvroConverter
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.Producer as KafkaProducer

interface Producer {
fun validate(value: String, topic: String): Either<Throwable, Unit>
fun send(topic: String, key: String, value: String): Either<Throwable, Unit>
}

class AvroProducer(
private val avroProducer: KafkaProducer<String, GenericRecord>,
private val schemaRegistry: SchemaRegistry,
private val jsonAvroConverter: JsonToAvroConverter
) : Producer {

private val schemaCache = HashMap<String, Either<Throwable, String>>()

override fun validate(value: String, topic: String) =
internalValidate(value, topic).flatMap { Unit.right() }

override fun send(topic: String, key: String, value: String) =
internalValidate(value, topic)
.map { ProducerRecord(topic, key, it) }
.flatMap { avroProducer.runCatching { send(it) }.fold({ Unit.right() }, { it.left() }) }

private fun internalValidate(value: String, topic: String) =
getCachedSchema(topic).flatMap { jsonAvroConverter.convert(value, it) }

private fun getCachedSchema(topic: String) =
schemaCache.getOrPut(
topic,
{
schemaRegistry.getSubject("$topic-value")
.map { it.schemas.maxByOrNull { s -> s.version }?.schema }
.flatMap { it?.right() ?: Throwable("Schema not found").left() }
}
)
}

class StringProducer(private val stringProducer: KafkaProducer<String, String>) : Producer {
override fun validate(value: String, topic: String) = Unit.right()
override fun send(topic: String, key: String, value: String): Either<Throwable, Unit> {
val record = ProducerRecord(topic, key, value)
return stringProducer.runCatching { send(record) }.fold({ Unit.right() }, { it.left() })
}
}
1 change: 1 addition & 0 deletions src/main/kotlin/insulator/styles/Controls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class Controls : Stylesheet() {
val sidebarItem by cssclass()
val iconButton by cssclass()
val alertButton by cssclass()
val blueButton by cssclass()
val view by cssclass()
}

Expand Down
21 changes: 21 additions & 0 deletions src/main/kotlin/insulator/styles/Root.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ class Root : Stylesheet() {
}
}

textArea {
backgroundRadius = multi(box(0.0.px))
backgroundInsets = multi(box(0.px, (-1).px, (-1).px, (-1).px), box(0.0.px), box(0.px, (-1).px, 0.px, (-1).px))
backgroundColor = multi(Theme.mainColor, Theme.backgroundColor, Theme.backgroundColor)
and(focused) {
backgroundRadius = multi(box(0.0.px))
backgroundInsets = multi(box(0.px, (-2).px, (-2).px, (-2).px), box(0.0.px), box(0.px, (-2).px, 0.px, (-2).px))
backgroundColor = multi(Theme.mainColorDark, Theme.backgroundColor, Theme.backgroundColor)
}
content {
backgroundRadius = multi(box(0.0.px))
backgroundInsets = multi(box(0.px, (-2).px, (-2).px, (-2).px), box(0.0.px), box(0.px, (-2).px, 0.px, (-2).px))
backgroundColor = multi(Theme.mainColorDark, Theme.backgroundColor, Theme.backgroundColor)
}
}

button {
padding = box(5.0.px)
textFill = Theme.mainColor
Expand All @@ -41,6 +57,10 @@ class Root : Stylesheet() {
textFill = Theme.backgroundColor
backgroundColor = multi(Theme.alertColor)
}
and(Controls.blueButton) {
textFill = Theme.backgroundColor
backgroundColor = multi(Theme.blueColor)
}
backgroundRadius = multi(box(2.0.px))
}
and(Controls.iconButton) {
Expand All @@ -49,6 +69,7 @@ class Root : Stylesheet() {
and(hover) { backgroundColor = multi(Theme.mainColorDark) }
}
and(Controls.alertButton) { textFill = Theme.alertColor }
and(Controls.blueButton) { textFill = Theme.blueColor }
}

checkBox {
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/insulator/styles/Theme.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ class Theme : Stylesheet() {
val alertColorDark = c("#960017")
val lightGray = c("#ccc")
val darkGray = c("#666")
val blueColor = Color.BLUE
}
}
Loading

0 comments on commit a8a035b

Please sign in to comment.