Skip to content
This repository has been archived by the owner on Apr 12, 2023. It is now read-only.

Commit

Permalink
Improve producer (#63)
Browse files Browse the repository at this point in the history
* Add missing partition count from the subtitle binding

* Replace jsontoavro converter

* Add array parser test

* Add byte parser test

* Add unit tests for simple type parsers

* Add record parser test

* Reduce MCC for SimpleTypeParsersFactory

* Reduce MCC of FieldParser

* Improve unit tests

* Add shortcut to search a schema in list schema view
  • Loading branch information
andrewinci authored Sep 28, 2020
1 parent fb8d1de commit 346c51d
Show file tree
Hide file tree
Showing 36 changed files with 921 additions and 635 deletions.
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.gradle.parallel=true
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx4096m -XX:MaxMetaspaceSize=512m
13 changes: 9 additions & 4 deletions src/main/kotlin/insulator/di/LibModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import com.fasterxml.jackson.databind.ObjectMapper
import insulator.lib.configuration.ConfigurationRepo
import insulator.lib.configuration.model.Cluster
import insulator.lib.jsonhelper.JsonFormatter
import insulator.lib.jsonhelper.JsonToAvroConverter
import insulator.lib.jsonhelper.jsontoavro.FieldParser
import insulator.lib.jsonhelper.jsontoavro.JsonToAvroConverter
import insulator.lib.jsonhelper.jsontoavro.fieldparser.ComplexTypeParsersFactory
import insulator.lib.jsonhelper.jsontoavro.fieldparser.SimpleTypeParsersFactory
import insulator.lib.kafka.AdminApi
import insulator.lib.kafka.AvroProducer
import insulator.lib.kafka.Consumer
import insulator.lib.kafka.SchemaRegistry
import insulator.lib.kafka.StringProducer
import kotlinx.serialization.json.Json
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.koin.core.qualifier.named
import org.koin.dsl.module

Expand All @@ -21,9 +24,11 @@ val libModule = module {
single { Json {} }
single { ConfigurationRepo(get()) }
single { JsonFormatter(get()) }
single { Schema.Parser() }

// JsonToAvro
single { ObjectMapper() }
single { JsonToAvroConverter(get()) }
single { FieldParser(SimpleTypeParsersFactory(), ComplexTypeParsersFactory()) }
single { JsonToAvroConverter(get(), get(), GenericData.get()) }

scope<Cluster> {
factory { AdminApi(get(), get()) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package insulator.lib.configuration
import arrow.core.Either
import arrow.core.extensions.fx
import arrow.core.flatMap
import arrow.core.left
import arrow.core.right
import insulator.lib.configuration.model.Cluster
import insulator.lib.configuration.model.Configuration
import insulator.lib.helpers.toEither
import kotlinx.serialization.json.Json
import java.io.File

Expand All @@ -17,10 +16,10 @@ class ConfigurationRepo(private val json: Json, private val configPath: String =
fun getConfiguration(): Either<ConfigurationRepoException, Configuration> {
if (!File(configPath).exists()) store(Configuration(emptyList()))
return kotlin.runCatching { File(configPath).readText() }
.fold({ it.right() }, { ConfigurationRepoException("Unable to load the file", it).left() })
.toEither { ConfigurationRepoException("Unable to load the file", it) }
.flatMap {
json.runCatching { decodeFromString(Configuration.serializer(), it) }
.fold({ it.right() }, { ConfigurationRepoException("Unable to load the configurations", it).left() })
.toEither { ConfigurationRepoException("Unable to load the configurations", it) }
}
}

Expand All @@ -44,7 +43,7 @@ class ConfigurationRepo(private val json: Json, private val configPath: String =

private fun store(configuration: Configuration) = kotlin.runCatching {
File(configPath).writeText(json.encodeToString(Configuration.serializer(), configuration))
}.fold({ right() }, { ConfigurationRepoException("Unable to store the configuration", it).left() })
}.toEither { ConfigurationRepoException("Unable to store the configuration", it) }

fun addNewClusterCallback(callback: (Configuration) -> Unit) {
callbacks.add(callback)
Expand Down
13 changes: 13 additions & 0 deletions src/main/kotlin/insulator/lib/helpers/ExceptionHandlingHelper.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
package insulator.lib.helpers

import arrow.core.Either
import arrow.core.flatMap
import arrow.core.left
import arrow.core.right

fun <R> Result<R>.toEither() = this.fold({ it.right() }, { it.left() })
fun <R, T : Throwable> Result<R>.toEither(f: (Throwable) -> T) = this.fold({ it.right() }, { it.left() }).mapLeft { f(it) }

fun <A, B> List<Either<A, B>>.toEitherOfList() =
this.fold(emptyList<B>().right() as Either<A, List<B>>) { lst, v ->
lst.flatMap {
when (v) {
is Either.Left<A> -> v
is Either.Right<B> -> it.plus(v.b).right()
}
}
}
5 changes: 2 additions & 3 deletions src/main/kotlin/insulator/lib/jsonhelper/JsonFormatter.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package insulator.lib.jsonhelper

import arrow.core.left
import arrow.core.right
import insulator.lib.helpers.toEither
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonArray
import kotlinx.serialization.json.JsonElement
Expand All @@ -14,7 +13,7 @@ class JsonFormatter(private val json: Json) {

fun formatJsonString(jsonString: String, indent: Boolean = true) = json.runCatching { parseToJsonElement(jsonString) }
.map { InternalFormatter(indent).format(it, 1) }
.fold({ it.right() }, { it.left() })
.toEither()

private class InternalFormatter(private val indent: Boolean = true) {
@OptIn(ExperimentalStdlibApi::class)
Expand Down
135 changes: 0 additions & 135 deletions src/main/kotlin/insulator/lib/jsonhelper/JsonToAvroConverter.kt

This file was deleted.

39 changes: 39 additions & 0 deletions src/main/kotlin/insulator/lib/jsonhelper/jsontoavro/FieldParser.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package insulator.lib.jsonhelper.jsontoavro

import arrow.core.left
import insulator.lib.jsonhelper.jsontoavro.fieldparser.ComplexTypeParsersFactory
import insulator.lib.jsonhelper.jsontoavro.fieldparser.SimpleTypeParsersFactory
import org.apache.avro.Schema

class FieldParser(
simpleTypeParsersFactory: SimpleTypeParsersFactory,
complexTypeParsersFactory: ComplexTypeParsersFactory
) {

private val simpleTypeParsers = simpleTypeParsersFactory.build()
private val complexTypeParsers = complexTypeParsersFactory.build(this)
private val parsersLookup = mapOf(
// composed types
Schema.Type.RECORD to complexTypeParsers.recordParser,
Schema.Type.ARRAY to complexTypeParsers.arrayParser,
Schema.Type.UNION to complexTypeParsers.unionParser,
Schema.Type.BYTES to complexTypeParsers.byteParser,
Schema.Type.FIXED to jsonFieldParser { _, _ -> JsonFieldParsingException("Avro FIXED type not supported").left() },
Schema.Type.MAP to jsonFieldParser { _, _ -> JsonFieldParsingException("Avro MAP type not supported").left() },

// simple types
Schema.Type.STRING to simpleTypeParsers.stringParser,
Schema.Type.ENUM to simpleTypeParsers.enumParser,
Schema.Type.INT to simpleTypeParsers.intParser,
Schema.Type.LONG to simpleTypeParsers.longParser,
Schema.Type.FLOAT to simpleTypeParsers.floatParser,
Schema.Type.DOUBLE to simpleTypeParsers.doubleParser,
Schema.Type.BOOLEAN to simpleTypeParsers.booleanParser,
Schema.Type.NULL to simpleTypeParsers.nullParser,
)

fun parseField(jsonValue: Any?, fieldSchema: Schema) = (
parsersLookup[fieldSchema.type]
?: jsonFieldParser { _, _ -> JsonFieldParsingException("Null schema type").left() }
).parse(jsonValue, fieldSchema)
}
20 changes: 20 additions & 0 deletions src/main/kotlin/insulator/lib/jsonhelper/jsontoavro/Helpers.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package insulator.lib.jsonhelper.jsontoavro

import arrow.core.Either
import org.apache.avro.Schema

fun <O> jsonFieldParser(parseFn: (fieldValue: Any?, schema: Schema) -> Either<JsonFieldParsingException, O>): JsonFieldParser<O> = object : JsonFieldParser<O> {
override fun parse(fieldValue: Any?, schema: Schema) = parseFn(fieldValue, schema)
}

internal fun Schema.printType(): String {
return when (this.type) {
Schema.Type.NULL -> "null"
Schema.Type.RECORD -> "record \"${this.toString(true)}\""
Schema.Type.BYTES -> "bytes (eg \"0x00\")"
Schema.Type.ENUM -> "enum [${this.enumSymbols.joinToString(", ")}]"
Schema.Type.UNION -> "union [${this.types.joinToString(", ") { it.toString(true) }}]"
Schema.Type.ARRAY -> "array of \"${this.elementType.toString(true)}\""
else -> this.type.name.toLowerCase()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package insulator.lib.jsonhelper.jsontoavro

import arrow.core.Either
import arrow.core.extensions.fx
import arrow.core.left
import arrow.core.right
import com.fasterxml.jackson.databind.ObjectMapper
import insulator.lib.helpers.toEither
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord

class JsonToAvroConverter(private val objectMapper: ObjectMapper, private val fieldParser: FieldParser, private val genericData: GenericData) {
fun parse(jsonString: String, schemaString: String) = Either.fx<JsonToAvroException, GenericRecord> {
val jsonMap = !objectMapper.runCatching { readValue(jsonString, Map::class.java) }.toEither { JsonParsingException("Invalid json", it) }
val schema = !Schema.Parser().runCatching { parse(schemaString) }.toEither { SchemaParsingException("Invalid AVRO schema", it) }
val record = !fieldParser.parseField(jsonMap, schema).flatMap {
(it as? GenericRecord)?.right() ?: JsonToAvroException("Invalid record").left()
}
!(
if (genericData.validate(schema, record)) record.right()
else JsonToAvroException("Unable to parse the json into a valid ${schema.name}. Final validation failed.").left()
)
}
}

interface JsonFieldParser<out O> {
fun parse(fieldValue: Any?, schema: Schema): Either<JsonFieldParsingException, O>
}

open class JsonToAvroException(message: String? = null, cause: Throwable? = null) : Exception(message, cause)
class SchemaParsingException(message: String?, cause: Throwable? = null) : JsonToAvroException(message, cause)
class JsonParsingException(message: String?, cause: Throwable? = null) : JsonToAvroException(message, cause)

open class JsonFieldParsingException(message: String?) : JsonToAvroException(message)
class JsonInvalidFieldException(expectedSchema: Schema, fieldName: Any?) : JsonFieldParsingException("Invalid field \"$fieldName\". Expected ${expectedSchema.printType()}")
class JsonMissingFieldException(expectedSchema: Schema, val fieldName: String? = null) : JsonFieldParsingException("Missing field \"$fieldName\". Expected ${expectedSchema.printType()}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package insulator.lib.jsonhelper.jsontoavro.fieldparser

import arrow.core.Either
import arrow.core.left
import arrow.core.right
import insulator.lib.helpers.toEitherOfList
import insulator.lib.jsonhelper.jsontoavro.FieldParser
import insulator.lib.jsonhelper.jsontoavro.JsonFieldParser
import insulator.lib.jsonhelper.jsontoavro.JsonFieldParsingException
import insulator.lib.jsonhelper.jsontoavro.JsonInvalidFieldException
import org.apache.avro.Schema

class ArrayParser(private val fieldParser: FieldParser) : JsonFieldParser<List<Any?>> {
override fun parse(fieldValue: Any?, schema: Schema): Either<JsonFieldParsingException, List<Any?>> {
if (fieldValue !is ArrayList<*>) return JsonInvalidFieldException(schema, fieldValue).left()
if (fieldValue.size == 0) return emptyList<Any?>().right()
// field value is a non-empty list
return fieldValue.map { fieldParser.parseField(it, schema.elementType) }.toEitherOfList()
}
}
Loading

0 comments on commit 346c51d

Please sign in to comment.