diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml
index f182d0c..816e8a4 100644
--- a/.github/workflows/scala.yml
+++ b/.github/workflows/scala.yml
@@ -2,9 +2,9 @@ name: Scala CI
on:
push:
- branches: [ master ]
+ branches: [ master, series/0.x ]
pull_request:
- branches: [ master ]
+ branches: [ master, series/0.x ]
jobs:
build:
diff --git a/.gitignore b/.gitignore
index 6350957..f5460db 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,4 +44,5 @@ revision.txt
.DS_Store
local.sbt
site/build
-kafkamate.json
\ No newline at end of file
+kafkamate.json
+null
\ No newline at end of file
diff --git a/.scalafix.conf b/.scalafix.conf
new file mode 100644
index 0000000..7a01bbf
--- /dev/null
+++ b/.scalafix.conf
@@ -0,0 +1,20 @@
+rules = [
+ OrganizeImports
+]
+
+OrganizeImports {
+ blankLines = Auto
+ coalesceToWildcardImportThreshold = 5
+ expandRelative = true
+ groupExplicitlyImportedImplicitsSeparately = false
+ groupedImports = AggressiveMerge
+ groups = [
+ "re:(javax?|scala)\\.", # language
+ "re:^(?!javax?\\.|scala\\.|com\\.investsuite).*", # library
+ "*"
+ ]
+ importSelectorsOrder = Ascii
+ importsOrder = Ascii
+ preset = DEFAULT
+ removeUnused = true
+}
diff --git a/.scalafmt.conf b/.scalafmt.conf
index 1b1bcc8..dd31866 100644
--- a/.scalafmt.conf
+++ b/.scalafmt.conf
@@ -1,23 +1,28 @@
-version = "2.7.5"
+version = "3.5.9"
+runner.dialect = scala213
+align.preset = some
maxColumn = 120
-align.preset = most
-align.multiline = false
-continuationIndent.defnSite = 2
-assumeStandardLibraryStripMargin = true
-docstrings = JavaDoc
-lineEndings = preserve
-includeCurlyBraceInSelectChains = false
-danglingParentheses.preset = true
-optIn.annotationNewlines = true
-newlines.alwaysBeforeMultilineDef = false
-rewrite.rules = [RedundantBraces]
+align.tokens = [
+ {code = "%", owner = "Term.ApplyInfix"},
+ {code = "%%", owner = "Term.ApplyInfix"},
+ {code = "%%%", owner = "Term.ApplyInfix"},
+ {code = "=>", owner = "Case"}
+]
+
+align.openParenCallSite = false
+
+newlines.source=keep
+newlines.topLevelStatements = [before]
+newlines.beforeMultiline = keep
+newlines.topLevelStatementsMinBreaks = 1
+
+optIn.breakChainOnFirstMethodDot = true
+
+continuationIndent.defnSite = 2
-project.excludeFilters = []
+verticalMultiline.atDefnSite = true
+verticalMultiline.arityThreshold = 3
+verticalMultiline.newlineAfterOpenParen = true
-rewrite.redundantBraces.generalExpressions = false
-rewriteTokens = {
- "⇒": "=>"
- "→": "->"
- "←": "<-"
-}
\ No newline at end of file
+danglingParentheses.callSite = false
diff --git a/README.md b/README.md
index 3dbf402..24cffeb 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@ This mount `-v /your/path/to/kafkamate.json:/kafkamate.json` is needed if you wa
If this is skipped then it will start with no configuration.
### Run locally
-Start the site with (make sure to have already installed `npm`):
+Start the site with (make sure to have already installed `npm` and add env `export NODE_OPTIONS=--openssl-legacy-provider` if errors pop up):
```bash
➜ sbt dev
```
@@ -22,17 +22,28 @@ Start the site with (make sure to have already installed `npm`):
In another shell tab start the service:
```bash
➜ sbt service/run
+➜ # or: sbt service/reStart
```
We need also to start the Envoy proxy to forward the browser's gRPC-Web requests to the backend:
```bash
+➜ # if you're using linux
➜ docker run --rm -d --net host -v "$(pwd)"/build/envoy.yaml:/etc/envoy/envoy.yaml:ro envoyproxy/envoy:v1.15.0
+➜ # if you're using macos, then try the next one
+➜ docker run --platform linux/amd64 --rm -p 61234:61234 --add-host host.docker.internal:192.168.0.114 -v "$(pwd)"/build/envoy.yaml:/etc/envoy/envoy.yaml:ro envoyproxy/envoy:v1.15.0 -c /etc/envoy/envoy.yaml -l debug
+➜ # if you're using windows, then try the next one
+➜ docker run --rm -d -p 61234:61234 --add-host host.docker.internal:192.168.0.114 -v %cd%\build\envoy.yaml:/etc/envoy/envoy.yaml:ro envoyproxy/envoy:v1.15.0 -c /etc/envoy/envoy.yaml -l debug
```
### Build docker image
```bash
➜ sbt dockerize
```
+Steps to build on MacBook (M2) multi-architecture images using QEMU:
+ - `docker buildx version`
+ - `docker buildx create --use`
+ - `docker buildx inspect --bootstrap`
+ - `docker buildx build --platform linux/amd64 -t csofronia/kafkamate:latest -f Dockerfile . --load` (cd to target/docker)
### KAFKA TRADEMARK DISCLAIMER
diff --git a/build.sbt b/build.sbt
index 77c4ec7..9cdb0e6 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,227 +1,233 @@
-ThisBuild / resolvers += Resolver.sonatypeRepo("snapshots")
-
-lazy val ProjectName = "kafkamate"
-lazy val ProjectOrganization = "csofronia"
-lazy val ProjectVersion = "0.1.0"
-lazy val ProjectScalaVersion = "2.13.6"
-
-lazy val ZIOVersion = "1.0.9"
-lazy val GrpcVersion = "1.38.1"
-lazy val SlinkyVersion = "0.6.7"
-
-lazy val kafkamate = project
- .in(file("."))
- .aggregate(service, site)
- .settings(
- name := ProjectName,
- organization := ProjectOrganization,
- version := ProjectVersion
- )
- .enablePlugins(DockerPlugin)
- .disablePlugins(RevolverPlugin)
- .settings(
- docker := (docker dependsOn (assembly in service)).value,
- dockerfile in docker := {
- val artifact: File = (assemblyOutputPath in assembly in service).value
- val artifactTargetPath = s"/app/${artifact.name}"
-
- new Dockerfile {
- from("openjdk:8-jre")
- maintainer("Ciprian Sofronia", "ciprian.sofronia@gmail.com")
-
- env("KAFKAMATE_ENV", "prod")
- expose(8080, 61234, 61235)
-
- runRaw(
- "apt-get update && apt-get install -y dumb-init nginx nodejs apt-transport-https ca-certificates curl gnupg2 software-properties-common"
- )
- runRaw("""curl -sL 'https://getenvoy.io/gpg' | apt-key add -""")
- runRaw("""apt-key fingerprint 6FF974DB | grep "5270 CEAC" """)
- runRaw(
- """add-apt-repository "deb [arch=amd64] https://dl.bintray.com/tetrate/getenvoy-deb $(lsb_release -cs) stable" """
- )
- runRaw("apt-get update && apt-get install -y getenvoy-envoy=1.15.1.p0.g670a4a6-1p69.ga5345f6")
-
- runRaw("rm -v /etc/nginx/nginx.conf")
- copy(baseDirectory(_ / "build" / "nginx").value, "/etc/nginx/")
- copy(baseDirectory(_ / "build" / "envoy.yaml").value, "envoy.yaml")
- copy(baseDirectory(_ / "build" / "start.sh").value, "start.sh")
-
- add(artifact, artifactTargetPath)
- copy(baseDirectory(_ / "site" / "build").value, "/usr/share/nginx/html/")
-
- entryPoint("/usr/bin/dumb-init", "--")
- cmd("./start.sh", artifactTargetPath)
- }
- },
- imageNames in docker := Seq(
- ImageName(s"${organization.value}/${name.value}:latest"),
- ImageName(
- repository = s"${organization.value}/${name.value}",
- tag = Some(version.value)
- )
- )
- )
- .settings(
- addCommandAlias("dockerize", ";compile;test;build;docker")
- )
-
-lazy val service = project
- .in(file("service"))
- .settings(sharedSettings)
- .settings(
- name := "kafkamate-service",
- scalacOptions ++= Seq(
- "-unchecked",
- "-deprecation",
- "-encoding",
- "utf8",
- "-target:jvm-1.8",
- "-feature",
- "-language:_",
- "-Ywarn-dead-code",
- "-Ywarn-macros:after",
- "-Ywarn-numeric-widen",
- "-Ywarn-value-discard",
- "-Xlint",
- //"-Xfatal-warnings",
- "-Xlint:-byname-implicit",
- "-Xlog-reflective-calls"
- ),
- libraryDependencies ++= Seq(
- "dev.zio" %% "zio-kafka" % "0.15.0",
- "dev.zio" %% "zio-json" % "0.1.5",
- "dev.zio" %% "zio-logging-slf4j" % "0.5.11",
- "com.lihaoyi" %% "os-lib" % "0.7.8",
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
- "io.confluent" % "kafka-protobuf-serializer" % "6.2.0",
- //"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.0",
- "net.logstash.logback" % "logstash-logback-encoder" % "6.6",
- "ch.qos.logback" % "logback-classic" % "1.2.3",
- "io.github.embeddedkafka" %% "embedded-kafka" % "2.8.0" % Test
- ),
- dependencyOverrides ++= Seq(
- "org.apache.kafka" % "kafka-clients" % "2.8.0"
- ),
- resolvers ++= Seq(
- "Confluent" at "https://packages.confluent.io/maven/"
- ),
- PB.targets in Compile := Seq(
- scalapb.gen(grpc = true) -> (sourceManaged in Compile).value,
- scalapb.zio_grpc.ZioCodeGenerator -> (sourceManaged in Compile).value
- )
- )
- .dependsOn(common.jvm)
- .settings(
- assemblyMergeStrategy in assembly := {
- case x if x endsWith "io.netty.versions.properties" => MergeStrategy.concat
- case x if x endsWith "module-info.class" => MergeStrategy.discard
- case x if x endsWith ".proto" => MergeStrategy.discard
- case x =>
- val oldStrategy = (assemblyMergeStrategy in assembly).value
- oldStrategy(x)
- },
- test in assembly := {}
- )
-
-lazy val site = project
- .in(file("site"))
- .enablePlugins(ScalaJSBundlerPlugin)
- .disablePlugins(RevolverPlugin)
- .settings(sharedSettings)
- .settings(
- name := "kafkamate-site",
- scalacOptions ++= {
- if (scalaJSVersion.startsWith("0.6.")) Seq("-P:scalajs:sjsDefinedByDefault")
- else Nil
- },
- version in webpack := "4.43.0",
- version in startWebpackDevServer := "3.11.0",
- libraryDependencies ++= Seq(
- "me.shadaj" %%% "slinky-core" % SlinkyVersion,
- "me.shadaj" %%% "slinky-web" % SlinkyVersion,
- "me.shadaj" %%% "slinky-native" % SlinkyVersion,
- "me.shadaj" %%% "slinky-hot" % SlinkyVersion,
- "me.shadaj" %%% "slinky-react-router" % SlinkyVersion,
- "me.shadaj" %%% "slinky-scalajsreact-interop" % SlinkyVersion,
- "org.scalatest" %%% "scalatest" % "3.2.9" % Test
- //"com.github.oen9" %%% "slinky-bridge-react-konva" % "0.1.1",
- ),
- npmDependencies in Compile ++= Seq(
- "react" -> "16.13.1",
- "react-dom" -> "16.13.1",
- "react-proxy" -> "1.1.8",
- "react-router-dom" -> "5.2.0",
- "path-to-regexp" -> "3.0.0",
- "use-image" -> "1.0.6"
- //"react-konva" -> "16.13.0-3",
- //"konva" -> "4.2.2",
- ),
- npmDevDependencies in Compile ++= Seq(
- "file-loader" -> "6.0.0",
- "style-loader" -> "1.2.1",
- "css-loader" -> "3.5.3",
- "html-webpack-plugin" -> "4.3.0",
- "copy-webpack-plugin" -> "5.1.1",
- "webpack-merge" -> "4.2.2"
- ),
- webpackResources := baseDirectory.value / "webpack" * "*",
- webpackConfigFile in fastOptJS := Some(baseDirectory.value / "webpack" / "webpack-fastopt.config.js"),
- webpackConfigFile in fullOptJS := Some(baseDirectory.value / "webpack" / "webpack-opt.config.js"),
- webpackConfigFile in Test := Some(baseDirectory.value / "webpack" / "webpack-core.config.js"),
- webpackDevServerExtraArgs in fastOptJS := Seq("--inline", "--hot"),
- webpackBundlingMode in fastOptJS := BundlingMode.LibraryOnly(),
- requireJsDomEnv in Test := true,
- addCommandAlias("dev", ";fastOptJS::startWebpackDevServer;~fastOptJS"),
- addCommandAlias("build", "fullOptJS::webpack"),
- test in Compile := {} //disable site tests for now
- )
- .dependsOn(common.js)
-
-lazy val common = crossProject(JSPlatform, JVMPlatform)
- .crossType(CrossType.Pure)
- .in(file("common"))
- .settings(sharedSettings)
- .disablePlugins(RevolverPlugin)
- .settings(
- libraryDependencies += "com.thesamet.scalapb" %%% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion,
- PB.protoSources in Compile := Seq(
- (baseDirectory in ThisBuild).value / "common" / "src" / "main" / "protobuf"
- )
- )
- .jvmSettings(
- libraryDependencies ++= Seq(
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
- "io.grpc" % "grpc-netty" % GrpcVersion
- ),
- PB.targets in Compile := Seq(
- scalapb.gen(grpc = true) -> (sourceManaged in Compile).value,
- scalapb.zio_grpc.ZioCodeGenerator -> (sourceManaged in Compile).value
- )
- )
- .jsSettings(
- // publish locally and update the version for test
- libraryDependencies += "com.thesamet.scalapb.grpcweb" %%% "scalapb-grpcweb" % scalapb.grpcweb.BuildInfo.version,
- PB.targets in Compile := Seq(
- scalapb.gen(grpc = false) -> (sourceManaged in Compile).value,
- scalapb.grpcweb.GrpcWebCodeGenerator -> (sourceManaged in Compile).value
- )
- )
-
-lazy val sharedSettings = Seq(
- version := ProjectVersion,
- scalaVersion := ProjectScalaVersion,
- scalacOptions ++= Seq(
- "-Ymacro-annotations"
- ),
- libraryDependencies ++= Seq(
- "dev.zio" %%% "zio" % ZIOVersion,
- "dev.zio" %%% "zio-macros" % ZIOVersion,
- "io.circe" %%% "circe-generic" % "0.14.1",
- "dev.zio" %%% "zio-test" % ZIOVersion % Test,
- "dev.zio" %%% "zio-test-sbt" % ZIOVersion % Test
- ),
- addCompilerPlugin("org.typelevel" % "kind-projector" % "0.13.0" cross CrossVersion.full),
- testFrameworks ++= Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
- //,bloopExportJarClassifiers in Global := Some(Set("sources"))
-)
+ThisBuild / resolvers += Resolver.sonatypeRepo("snapshots")
+
+lazy val ProjectName = "kafkamate"
+lazy val ProjectOrganization = "csofronia"
+lazy val ProjectVersion = "0.2.0"
+lazy val ProjectScalaVersion = "2.13.10"
+
+// make sure to align zio versions with scalajs versions from plugins.sbt
+lazy val zioVersion = "1.0.18"
+lazy val zioKafkaVersion = "0.17.8"
+lazy val kafkaVersion = "3.1.0"
+lazy val grpcVersion = "1.38.1"
+lazy val slinkyVersion = "0.6.8"
+
+lazy val kafkamate = project
+ .in(file("."))
+ .aggregate(service, site)
+ .settings(
+ name := ProjectName,
+ organization := ProjectOrganization,
+ version := ProjectVersion
+ )
+ .enablePlugins(DockerPlugin)
+ .disablePlugins(RevolverPlugin)
+ .settings(
+ docker := (docker dependsOn (assembly in service)).value,
+ dockerfile in docker := {
+ val artifact: File = (assemblyOutputPath in assembly in service).value
+ val artifactTargetPath = s"/app/${artifact.name}"
+
+ new Dockerfile {
+ from("openjdk:8-jre")
+ maintainer("Ciprian Sofronia")
+
+ env("KAFKAMATE_ENV", "prod")
+ expose(8080, 61234, 61235)
+
+ runRaw(
+ "apt-get update && apt-get install -y dumb-init nginx nodejs apt-transport-https ca-certificates curl gnupg2 software-properties-common lsb-release"
+ )
+ copy(
+ baseDirectory(_ / "build" / "getenvoy-envoy_1.15.1.p0.g670a4a6-1p69.ga5345f6_amd64.deb").value,
+ "/tmp/getenvoy-envoy_1.15.1.p0.g670a4a6-1p69.ga5345f6_amd64.deb"
+ )
+ runRaw("dpkg -i /tmp/getenvoy-envoy_1.15.1.p0.g670a4a6-1p69.ga5345f6_amd64.deb")
+
+ runRaw("rm -v /etc/nginx/nginx.conf")
+ copy(baseDirectory(_ / "build" / "nginx").value, "/etc/nginx/")
+ copy(baseDirectory(_ / "build" / "envoy.yaml").value, "envoy.yaml")
+ copy(baseDirectory(_ / "build" / "start.sh").value, "start.sh")
+
+ add(artifact, artifactTargetPath)
+ copy(baseDirectory(_ / "site" / "build").value, "/usr/share/nginx/html/")
+
+ entryPoint("/usr/bin/dumb-init", "--")
+ cmd("./start.sh", artifactTargetPath)
+ }
+ },
+ imageNames in docker := Seq(
+ ImageName(s"${organization.value}/${name.value}:latest"),
+ ImageName(
+ repository = s"${organization.value}/${name.value}",
+ tag = Some(version.value)
+ )
+ )
+ )
+ .settings(
+ addCommandAlias("dockerize", ";compile;test;build;docker")
+ )
+
+lazy val service = project
+ .in(file("service"))
+ .settings(sharedSettings)
+ .settings(
+ name := "kafkamate-service",
+ scalacOptions ++= Seq(
+ "-unchecked",
+ "-deprecation",
+ "-encoding",
+ "utf8",
+ "-target:jvm-1.8",
+ "-feature",
+ "-language:_",
+ "-Ywarn-dead-code",
+ "-Ywarn-macros:after",
+ "-Ywarn-numeric-widen",
+ "-Ywarn-value-discard",
+ "-Xlint",
+ // "-Xfatal-warnings",
+ "-Xlint:-byname-implicit",
+ "-Xlog-reflective-calls"
+ ),
+ libraryDependencies ++= Seq(
+ "dev.zio" %% "zio-kafka" % zioKafkaVersion,
+ "dev.zio" %% "zio-json" % "0.1.5",
+ "dev.zio" %% "zio-logging-slf4j" % "0.5.11",
+ "io.github.kitlangton" %% "zio-magic" % "0.3.2",
+ "com.lihaoyi" %% "os-lib" % "0.7.8",
+ "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
+ "io.confluent" % "kafka-protobuf-serializer" % "7.2.1",
+ "net.logstash.logback" % "logstash-logback-encoder" % "6.6",
+ "ch.qos.logback" % "logback-classic" % "1.2.3",
+ "io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test
+ ),
+ dependencyOverrides ++= Seq(
+ "org.apache.kafka" % "kafka-clients" % kafkaVersion
+ ),
+ resolvers ++= Seq(
+ "Confluent" at "https://packages.confluent.io/maven/"
+ ),
+ PB.targets in Compile := Seq(
+ scalapb.gen(grpc = true) -> (sourceManaged in Compile).value,
+ scalapb.zio_grpc.ZioCodeGenerator -> (sourceManaged in Compile).value
+ )
+ )
+ .dependsOn(common.jvm)
+ .settings(
+ assemblyMergeStrategy in assembly := {
+ case x if x endsWith "io.netty.versions.properties" => MergeStrategy.concat
+ case x if x endsWith "module-info.class" => MergeStrategy.discard
+ case x if x endsWith "okio.kotlin_module" => MergeStrategy.discard
+ case x if x endsWith ".proto" => MergeStrategy.discard
+ case x =>
+ val oldStrategy = (assemblyMergeStrategy in assembly).value
+ oldStrategy(x)
+ },
+ test in assembly := {}
+ )
+
+lazy val site = project
+ .in(file("site"))
+ .enablePlugins(ScalaJSBundlerPlugin)
+ .disablePlugins(RevolverPlugin)
+ .settings(sharedSettings)
+ .settings(
+ name := "kafkamate-site",
+ scalacOptions ++= {
+ if (scalaJSVersion.startsWith("0.6.")) Seq("-P:scalajs:sjsDefinedByDefault")
+ else Nil
+ },
+ version in webpack := "4.43.0",
+ version in startWebpackDevServer := "3.11.0",
+ libraryDependencies ++= Seq(
+ "me.shadaj" %%% "slinky-core" % slinkyVersion,
+ "me.shadaj" %%% "slinky-web" % slinkyVersion,
+ "me.shadaj" %%% "slinky-native" % slinkyVersion,
+ "me.shadaj" %%% "slinky-hot" % slinkyVersion,
+ "me.shadaj" %%% "slinky-react-router" % slinkyVersion,
+ "me.shadaj" %%% "slinky-scalajsreact-interop" % slinkyVersion,
+ "org.scalatest" %%% "scalatest" % "3.2.9" % Test
+ // "com.github.oen9" %%% "slinky-bridge-react-konva" % "0.1.1",
+ ),
+ npmDependencies in Compile ++= Seq(
+ "react" -> "16.13.1",
+ "react-dom" -> "16.13.1",
+ "react-proxy" -> "1.1.8",
+ "react-router-dom" -> "5.2.0",
+ "path-to-regexp" -> "3.0.0",
+ "use-image" -> "1.0.6"
+ // "react-konva" -> "16.13.0-3",
+ // "konva" -> "4.2.2",
+ ),
+ npmDevDependencies in Compile ++= Seq(
+ "file-loader" -> "6.0.0",
+ "style-loader" -> "1.2.1",
+ "css-loader" -> "3.5.3",
+ "html-webpack-plugin" -> "4.3.0",
+ "copy-webpack-plugin" -> "5.1.1",
+ "webpack-merge" -> "4.2.2"
+ ),
+ webpackResources := baseDirectory.value / "webpack" * "*",
+ webpackConfigFile in fastOptJS := Some(baseDirectory.value / "webpack" / "webpack-fastopt.config.js"),
+ webpackConfigFile in fullOptJS := Some(baseDirectory.value / "webpack" / "webpack-opt.config.js"),
+ webpackConfigFile in Test := Some(baseDirectory.value / "webpack" / "webpack-core.config.js"),
+ webpackDevServerExtraArgs in fastOptJS := Seq("--inline", "--hot"),
+ webpackBundlingMode in fastOptJS := BundlingMode.LibraryOnly(),
+ requireJsDomEnv in Test := true,
+ addCommandAlias("dev", ";fastOptJS::startWebpackDevServer;~fastOptJS"),
+ addCommandAlias("build", "fullOptJS::webpack"),
+ test in Compile := {} // disable site tests for now
+ )
+ .dependsOn(common.js)
+
+lazy val common = crossProject(JSPlatform, JVMPlatform)
+ .crossType(CrossType.Pure)
+ .in(file("common"))
+ .settings(sharedSettings)
+ .disablePlugins(RevolverPlugin)
+ .settings(
+ libraryDependencies += "com.thesamet.scalapb" %%% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion,
+ PB.protoSources in Compile := Seq(
+ (baseDirectory in ThisBuild).value / "common" / "src" / "main" / "protobuf"
+ )
+ )
+ .jvmSettings(
+ libraryDependencies ++= Seq(
+ "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
+ "io.grpc" % "grpc-netty" % grpcVersion
+ ),
+ PB.targets in Compile := Seq(
+ scalapb.gen(grpc = true) -> (sourceManaged in Compile).value,
+ scalapb.zio_grpc.ZioCodeGenerator -> (sourceManaged in Compile).value
+ )
+ )
+ .jsSettings(
+ // publish locally and update the version for test
+ libraryDependencies += "com.thesamet.scalapb.grpcweb" %%% "scalapb-grpcweb" % scalapb.grpcweb.BuildInfo.version,
+ PB.targets in Compile := Seq(
+ scalapb.gen(grpc = false) -> (sourceManaged in Compile).value,
+ scalapb.grpcweb.GrpcWebCodeGenerator -> (sourceManaged in Compile).value
+ )
+ )
+
+lazy val sharedSettings = Seq(
+ version := ProjectVersion,
+ scalaVersion := ProjectScalaVersion,
+ scalacOptions ++= Seq(
+ "-Ymacro-annotations",
+ "-Wunused"
+ ),
+ Global / useCoursier := false,
+ libraryDependencies ++= Seq(
+ "dev.zio" %%% "zio" % zioVersion,
+ "dev.zio" %%% "zio-macros" % zioVersion,
+ "io.circe" %%% "circe-generic" % "0.14.1",
+ "dev.zio" %%% "zio-test" % zioVersion % Test,
+ "dev.zio" %%% "zio-test-sbt" % zioVersion % Test
+ ),
+ testFrameworks ++= Seq(new TestFramework("zio.test.sbt.ZTestFramework")),
+ Global / scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.6.0",
+ semanticdbEnabled := true,
+ semanticdbVersion := scalafixSemanticdb.revision
+) ++ addCommandAlias("fmt", "all scalafmtSbt scalafmtAll test:scalafmt;scalafixAll")
diff --git a/build/envoy.yaml b/build/envoy.yaml
index 011d14f..1cb2ebd 100644
--- a/build/envoy.yaml
+++ b/build/envoy.yaml
@@ -36,4 +36,5 @@ static_resources:
type: logical_dns
http2_protocol_options: {}
lb_policy: round_robin
- hosts: [{ socket_address: { address: 0.0.0.0, port_value: 61235 }}]
\ No newline at end of file
+ hosts: [{ socket_address: { address: 0.0.0.0, port_value: 61235 }}]
+ #hosts: [{ socket_address: { address: host.docker.internal, port_value: 61235 }}] # use docker host for local dev
\ No newline at end of file
diff --git a/build/getenvoy-envoy_1.15.1.p0.g670a4a6-1p69.ga5345f6_amd64.deb b/build/getenvoy-envoy_1.15.1.p0.g670a4a6-1p69.ga5345f6_amd64.deb
new file mode 100644
index 0000000..d834dde
Binary files /dev/null and b/build/getenvoy-envoy_1.15.1.p0.g670a4a6-1p69.ga5345f6_amd64.deb differ
diff --git a/common/src/main/protobuf/messages.proto b/common/src/main/protobuf/messages.proto
index 3ef32e4..df2666e 100644
--- a/common/src/main/protobuf/messages.proto
+++ b/common/src/main/protobuf/messages.proto
@@ -4,41 +4,81 @@ option java_package = "io.kafkamate";
package messages;
+enum OffsetStrategy {
+ LATEST = 0;
+ FROM_BEGINNING = 1;
+ REAL_TIME = 2;
+}
+
message ConsumeRequest {
string clusterId = 1;
string topicName = 2;
uint64 maxResults = 3;
- string offsetStrategy = 4;
+ OffsetStrategy offsetStrategy = 4;
string filterKeyword = 5;
MessageFormat messageFormat = 6;
}
enum MessageFormat {
- STRING = 0;
- PROTOBUF = 1;
+ AUTO = 0;
+ STRING = 1;
+ PROTOBUF = 2;
}
message ProduceRequest {
+ oneof request {
+ ProduceProtoRequest protoRequest = 1;
+ ProduceStringRequest stringRequest = 2;
+ }
+}
+
+message ProduceStringRequest {
+ string clusterId = 1;
+ string topicName = 2;
+ string key = 3;
+ string value = 4;
+}
+
+message ProduceProtoRequest {
string clusterId = 1;
string topicName = 2;
string key = 3;
string value = 4;
+ int32 schemaId = 5;
+ string valueSubject = 6;
+ optional string valueDescriptor = 7;
}
message ProduceResponse {
string status = 1;
}
-message Message {
+message GetSchemaSubjectRequest {
+ string clusterId = 1;
+ string topicName = 2;
+}
+
+message SchemaSubject {
+ int32 id = 1;
+ string url = 2;
+}
+
+message SchemaSubjectResponse {
+ repeated SchemaSubject versions = 1;
+}
+
+message LogicMessage {
uint64 offset = 1;
int32 partition = 2;
uint64 timestamp = 3;
- string key = 4;
- string value = 5;
+ optional string key = 4;
+ MessageFormat valueFormat = 5;
+ optional int32 valueSchemaId = 6;
+ optional string value = 7;
}
service MessagesService {
rpc ProduceMessage(ProduceRequest) returns (ProduceResponse);
-
- rpc ConsumeMessages(ConsumeRequest) returns (stream Message);
+ rpc ConsumeMessages(ConsumeRequest) returns (stream LogicMessage);
+ rpc GetSchemaSubject(GetSchemaSubjectRequest) returns (SchemaSubjectResponse);
}
\ No newline at end of file
diff --git a/docker-compose/compose-kafkamate.json b/docker-compose/compose-kafkamate.json
index 810a968..264561c 100644
--- a/docker-compose/compose-kafkamate.json
+++ b/docker-compose/compose-kafkamate.json
@@ -4,5 +4,11 @@
"name" : "local-docker-compose",
"kafkaHosts" : ["kafka:29092"],
"schemaRegistryUrl" : "http://schema-registry:8081"
+ },
+ {
+ "id" : "localhost-bIqpR9",
+ "name" : "localhost",
+ "kafkaHosts" : ["localhost:9092"],
+ "schemaRegistryUrl" : "http://localhost:8081"
}]
}
\ No newline at end of file
diff --git a/docker-compose/kafka-schema-registry.yml b/docker-compose/kafka-schema-registry.yml
index d99cb21..96273c2 100644
--- a/docker-compose/kafka-schema-registry.yml
+++ b/docker-compose/kafka-schema-registry.yml
@@ -2,6 +2,7 @@ version: "2"
services:
schema-registry:
+ platform: linux/amd64
image: confluentinc/cp-schema-registry:5.5.1
hostname: schema-registry
ports:
@@ -14,6 +15,7 @@ services:
- kafka
kafka:
+ platform: linux/amd64
image: obsidiandynamics/kafka
restart: "no"
ports:
diff --git a/project/build.properties b/project/build.properties
index 57f0579..a4804cf 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version = 1.5.4
\ No newline at end of file
+sbt.version = 1.8.3
\ No newline at end of file
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 1c4b7f5..9c614ba 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,32 +1,23 @@
ThisBuild / resolvers += Resolver.sonatypeRepo("snapshots")
-addSbtPlugin("ch.epfl.scala" % "sbt-missinglink" % "0.3.2")
-
-addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.8-63-80fdb462")
-
-addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
-
-addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
-
-addSbtPlugin("org.jmotor.sbt" % "sbt-dependency-updates" % "1.2.2")
-
-addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")
-
-addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.8.2")
-
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")
-
-addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.2")
+addSbtPlugin("ch.epfl.scala" % "sbt-missinglink" % "0.3.2")
+addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.8-63-80fdb462")
+addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")
+addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
+addSbtPlugin("org.jmotor.sbt" % "sbt-dependency-updates" % "1.2.2")
+addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")
+addSbtPlugin("se.marcuslonnberg" % "sbt-docker" % "1.8.2")
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")
+addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
+addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.10.4")
libraryDependencies ++= Seq(
- "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3",
- "com.thesamet.scalapb.grpcweb" %% "scalapb-grpcweb-code-gen" % "0.6.4",
- "com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % "0.5.0"
+ "com.thesamet.scalapb" %% "compilerplugin" % "0.11.13",
+ "com.thesamet.scalapb.grpcweb" %% "scalapb-grpcweb-code-gen" % "0.6.6",
+ "com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % "0.5.3"
)
// For Scala.js:
-addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.6.0")
-
+addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.13.2")
addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.0.0")
-
-addSbtPlugin("ch.epfl.scala" % "sbt-scalajs-bundler" % "0.20.0")
+addSbtPlugin("ch.epfl.scala" % "sbt-scalajs-bundler" % "0.20.0")
diff --git a/service/src/main/resources/logback.xml b/service/src/main/resources/logback.xml
new file mode 100644
index 0000000..de18c5d
--- /dev/null
+++ b/service/src/main/resources/logback.xml
@@ -0,0 +1,23 @@
+
+
+
+
+ %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/service/src/main/scala/io/kafkamate/Main.scala b/service/src/main/scala/io/kafkamate/Main.scala
index be8479e..9ce4fad 100644
--- a/service/src/main/scala/io/kafkamate/Main.scala
+++ b/service/src/main/scala/io/kafkamate/Main.scala
@@ -1,30 +1,41 @@
package io.kafkamate
-import scalapb.zio_grpc.{ServerMain, ServiceList}
-import zio.ZEnv
+import io.grpc.ServerBuilder
+import io.grpc.protobuf.services.ProtoReflectionService
+import io.kafkamate.config._
+import io.kafkamate.grpc._
+import io.kafkamate.kafka._
+import io.kafkamate.utils._
+import scalapb.zio_grpc.{ManagedServer, ServiceList}
+import zio._
-import config._
-import grpc._
-import kafka._
-import utils._
+object Main extends App {
-object Main extends ServerMain {
+ private val port = 61235
- override def port: Int = 61235
-
- override def services: ServiceList[ZEnv] =
- ServiceList
- .add(ClustersService.GrpcService)
- .add(BrokersService.GrpcService)
- .add(TopicsService.GrpcService)
- .add(MessagesService.GrpcService)
+ def run(args: List[String]): URIO[ZEnv, ExitCode] = {
+ val builder =
+ ServerBuilder
+ .forPort(port)
+ .addService(ProtoReflectionService.newInstance())
+ val services =
+ ServiceList
+ .add(ClustersService.GrpcService)
+ .add(BrokersService.GrpcService)
+ .add(TopicsService.GrpcService)
+ .add(MessagesService.GrpcService)
+ ManagedServer
+ .fromServiceList(builder, services)
.provideLayer(
ZEnv.live >+>
Logger.liveLayer >+>
ConfigPathService.liveLayer >+>
ClustersConfig.liveLayer >+>
- MessagesService.liveLayer >+>
- KafkaExplorer.liveLayer
+ KafkaExplorer.liveLayer >+>
+ MessagesService.liveLayer
)
+ .useForever
+ .exitCode
+ }
}
diff --git a/service/src/main/scala/io/kafkamate/config/ClustersConfig.scala b/service/src/main/scala/io/kafkamate/config/ClustersConfig.scala
index 049830b..1555f8c 100644
--- a/service/src/main/scala/io/kafkamate/config/ClustersConfig.scala
+++ b/service/src/main/scala/io/kafkamate/config/ClustersConfig.scala
@@ -17,16 +17,22 @@ import zio.macros.accessible
_configs ++ Map("schema.registry.url" -> schemaRegistryUrl)
}
- case class ClusterSettings(id: String, name: String, kafkaHosts: List[String], schemaRegistryUrl: Option[String]) {
- val kafkaHosts_ : String = kafkaHosts.mkString(",")
+ case class ClusterSettings(
+ id: String,
+ name: String,
+ kafkaHosts: List[String],
+ schemaRegistryUrl: Option[String]) {
+ val kafkaHosts_ : String = kafkaHosts.mkString(",")
val protoSerdeSettings: Option[ProtoSerdeSettings] = schemaRegistryUrl.map(ProtoSerdeSettings(_))
}
+
object ClusterSettings {
implicit val decoder: JsonDecoder[ClusterSettings] = DeriveJsonDecoder.gen[ClusterSettings]
implicit val encoder: JsonEncoder[ClusterSettings] = DeriveJsonEncoder.gen[ClusterSettings]
}
case class ClusterProperties(clusters: List[ClusterSettings])
+
object ClusterProperties {
implicit val decoder: JsonDecoder[ClusterProperties] = DeriveJsonDecoder.gen[ClusterProperties]
implicit val encoder: JsonEncoder[ClusterProperties] = DeriveJsonEncoder.gen[ClusterProperties]
@@ -41,16 +47,16 @@ import zio.macros.accessible
for {
all <- readClusters
cs <- ZIO
- .fromOption(all.clusters.find(_.id == clusterId))
- .orElseFail(new Exception(s"Cluster ($clusterId) doesn't exist!"))
+ .fromOption(all.clusters.find(_.id == clusterId))
+ .orElseFail(new Exception(s"Cluster ($clusterId) doesn't exist!"))
} yield cs
}
lazy val liveLayer: URLayer[HasConfigPath with Logging, ClustersConfigService] =
ZLayer.fromServices[ConfigPath, Logger[String], Service] { case (configPath, log) =>
new Service {
- private val configFilepath = configPath.path
- private def emptyProperties = ClusterProperties(List.empty)
+ private val configFilepath = configPath.path
+ private def emptyProperties = ClusterProperties(List.empty)
private def emptyPropertiesJson = emptyProperties.toJsonPretty
private def writeJson(json: => String) =
@@ -62,27 +68,27 @@ import zio.macros.accessible
_ <- writeJson(emptyPropertiesJson).unless(b)
s <- Task(os.read(configFilepath))
r <- ZIO
- .fromEither(s.fromJson[ClusterProperties])
- .catchAll { err =>
- log.warn(s"Parsing error: $err") *>
- writeJson(emptyPropertiesJson).as(emptyProperties)
- }
+ .fromEither(s.fromJson[ClusterProperties])
+ .catchAll { err =>
+ log.warn(s"Parsing error: $err") *>
+ writeJson(emptyPropertiesJson).as(emptyProperties)
+ }
} yield r
def writeClusters(cluster: ClusterSettings): Task[Unit] =
for {
- c <- readClusters
+ c <- readClusters
json = c.copy(clusters = c.clusters :+ cluster).toJsonPretty
- _ <- writeJson(json)
+ _ <- writeJson(json)
} yield ()
def deleteCluster(clusterId: String): Task[ClusterProperties] =
for {
- c <- readClusters
- ls <- ZIO.filterNotPar(c.clusters)(s => Task(s.id == clusterId))
+ c <- readClusters
+ ls <- ZIO.filterNotPar(c.clusters)(s => Task(s.id == clusterId))
json = ClusterProperties(ls).toJsonPretty
- _ <- writeJson(json)
- r <- readClusters
+ _ <- writeJson(json)
+ r <- readClusters
} yield r
}
}
diff --git a/service/src/main/scala/io/kafkamate/config/ConfigPathService.scala b/service/src/main/scala/io/kafkamate/config/ConfigPathService.scala
index e19d8d8..b4eb3b5 100644
--- a/service/src/main/scala/io/kafkamate/config/ConfigPathService.scala
+++ b/service/src/main/scala/io/kafkamate/config/ConfigPathService.scala
@@ -2,12 +2,11 @@ package io.kafkamate
package config
import zio._
-import zio.system
-import system.System
+import zio.system.System
object ConfigPathService {
- lazy val EnvKey = "KAFKAMATE_ENV"
+ lazy val EnvKey = "KAFKAMATE_ENV"
lazy val FileName = "kafkamate.json"
case class ConfigPath(path: os.Path)
diff --git a/service/src/main/scala/io/kafkamate/grpc/BrokersService.scala b/service/src/main/scala/io/kafkamate/grpc/BrokersService.scala
index b60f1bc..709bad5 100644
--- a/service/src/main/scala/io/kafkamate/grpc/BrokersService.scala
+++ b/service/src/main/scala/io/kafkamate/grpc/BrokersService.scala
@@ -2,21 +2,21 @@ package io.kafkamate
package grpc
import io.grpc.Status
-import zio.{ZEnv, ZIO}
+import io.kafkamate.brokers._
+import io.kafkamate.kafka._
+import io.kafkamate.utils._
import zio.logging._
-
-import brokers._
-import kafka._
-import utils._
+import zio.{ZEnv, ZIO}
object BrokersService {
type Env = ZEnv with KafkaExplorer.HasKafkaExplorer with Logging
object GrpcService extends ZioBrokers.RBrokersService[Env] {
+
def getBrokers(request: BrokerRequest): ZIO[Env, Status, BrokerResponse] =
KafkaExplorer
.listBrokers(request.clusterId)
- .tapError(e => log.error(s"Get brokers error: ${e.getMessage}"))
- .bimap(GRPCStatus.fromThrowable, BrokerResponse(_))
+ .tapError(e => log.throwable(s"Get brokers error: ${e.getMessage}", e))
+ .mapBoth(GRPCStatus.fromThrowable, BrokerResponse(_))
}
}
diff --git a/service/src/main/scala/io/kafkamate/grpc/ClustersService.scala b/service/src/main/scala/io/kafkamate/grpc/ClustersService.scala
index f746d6d..6f01d10 100644
--- a/service/src/main/scala/io/kafkamate/grpc/ClustersService.scala
+++ b/service/src/main/scala/io/kafkamate/grpc/ClustersService.scala
@@ -4,29 +4,30 @@ package grpc
import scala.util.Random
import io.grpc.Status
-import zio.{UIO, ZEnv, ZIO}
+import io.kafkamate.clusters._
+import io.kafkamate.config.ClustersConfig._
+import io.kafkamate.config._
+import io.kafkamate.utils._
import zio.logging._
-
-import config._, ClustersConfig._
-import clusters._
-import utils._
+import zio.{UIO, ZEnv, ZIO}
object ClustersService {
type Env = ZEnv with ClustersConfigService with Logging
object GrpcService extends ZioClusters.RClustersService[Env] {
+
def genRandStr(length: Int): UIO[String] =
UIO(Random.alphanumeric.take(length).mkString)
def addCluster(request: ClusterDetails): ZIO[Env, Status, ClusterDetails] =
for {
- clusterId <- genRandStr(6).map(str => s"${request.name.trim}-$str")
- hosts = request.kafkaHosts.split(",").toList
+ clusterId <- genRandStr(6).map(str => s"${request.name.trim.replaceAll(" ", "-")}-$str")
+ hosts = request.kafkaHosts.split(",").toList
schemaRegistry = Option.unless(request.schemaRegistryUrl.isEmpty)(request.schemaRegistryUrl)
c <- ClustersConfig
- .writeClusters(ClusterSettings(clusterId, request.name, hosts, schemaRegistry))
- .tapError(e => log.error(s"Add cluster error: ${e.getMessage}"))
- .bimap(GRPCStatus.fromThrowable, _ => request)
+ .writeClusters(ClusterSettings(clusterId, request.name, hosts, schemaRegistry))
+ .tapError(e => log.throwable(s"Add cluster error: ${e.getMessage}", e))
+ .mapBoth(GRPCStatus.fromThrowable, _ => request)
} yield c
private def toClusterResponse(r: ClusterProperties) =
@@ -35,14 +36,15 @@ object ClustersService {
)
def getClusters(request: ClusterRequest): ZIO[Env, Status, ClusterResponse] =
- ClustersConfig.readClusters
- .tapError(e => log.error(s"Get clusters error: ${e.getMessage}"))
- .bimap(GRPCStatus.fromThrowable, toClusterResponse)
+ log.debug("Received getClusters request") *>
+ ClustersConfig.readClusters
+ .tapError(e => log.throwable(s"Get clusters error: ${e.getMessage}", e))
+ .mapBoth(GRPCStatus.fromThrowable, toClusterResponse)
def deleteCluster(request: ClusterDetails): ZIO[Env, Status, ClusterResponse] =
ClustersConfig
.deleteCluster(request.id)
- .tapError(e => log.error(s"Delete cluster error: ${e.getMessage}"))
- .bimap(GRPCStatus.fromThrowable, toClusterResponse)
+ .tapError(e => log.throwable(s"Delete cluster error: ${e.getMessage}", e))
+ .mapBoth(GRPCStatus.fromThrowable, toClusterResponse)
}
}
diff --git a/service/src/main/scala/io/kafkamate/grpc/MessagesService.scala b/service/src/main/scala/io/kafkamate/grpc/MessagesService.scala
index b0c8a0b..84bb2f6 100644
--- a/service/src/main/scala/io/kafkamate/grpc/MessagesService.scala
+++ b/service/src/main/scala/io/kafkamate/grpc/MessagesService.scala
@@ -2,36 +2,43 @@ package io.kafkamate
package grpc
import io.grpc.Status
-import zio.{URLayer, ZEnv, ZIO, ZLayer}
-import zio.stream.ZStream
+import io.kafkamate.config.ClustersConfig._
+import io.kafkamate.kafka.KafkaExplorer.HasKafkaExplorer
+import io.kafkamate.kafka.{KafkaConsumer, KafkaProducer}
+import io.kafkamate.messages._
+import io.kafkamate.utils._
import zio.logging._
-
-import config.ClustersConfig._
-import kafka.KafkaConsumer
-import kafka.KafkaProducer
-import messages._
-import utils._
+import zio.magic._
+import zio.stream.ZStream
+import zio.{URLayer, ZEnv, ZIO, ZLayer}
object MessagesService {
- type Env = ZEnv with KafkaConsumer.KafkaConsumer with KafkaProducer.KafkaProducer with Logging
+ type Env = ZEnv with KafkaConsumer.KafkaConsumer with KafkaProducer.KafkaProducer with HasKafkaExplorer with Logging
- lazy val liveLayer: URLayer[ZEnv with Logging with ClustersConfigService, Env] =
- ZEnv.any ++
- ZLayer.requires[Logging] ++
- ZLayer.requires[ClustersConfigService] >+>
- KafkaProducer.liveLayer ++ KafkaConsumer.liveLayer
+ lazy val liveLayer: URLayer[ZEnv with Logging with ClustersConfigService with HasKafkaExplorer, Env] =
+ ZLayer.wireSome[ZEnv with Logging with ClustersConfigService with HasKafkaExplorer, Env](
+ KafkaProducer.liveLayer,
+ KafkaConsumer.liveLayer
+ )
object GrpcService extends ZioMessages.RMessagesService[Env] {
+
override def produceMessage(request: ProduceRequest): ZIO[Env, Status, ProduceResponse] =
KafkaProducer
- .produce(request.topicName, request.key, request.value)(request.clusterId)
- .tapError(e => log.error(s"Producer error: ${e.getMessage}"))
- .bimap(GRPCStatus.fromThrowable, _ => ProduceResponse("OK"))
+ .produce(request)
+ .tapError(e => log.throwable(s"Producer error: ${e.getMessage}", e))
+ .mapBoth(GRPCStatus.fromThrowable, _ => ProduceResponse("OK"))
- override def consumeMessages(request: ConsumeRequest): ZStream[Env, Status, Message] =
+ override def consumeMessages(request: ConsumeRequest): ZStream[Env, Status, LogicMessage] =
KafkaConsumer
.consume(request)
- .onError(e => log.error("Consumer error: \n" + e.prettyPrint))
+ .onError(e => log.error("Consumer error: \n" + e.prettyPrint, e))
+ .mapError(GRPCStatus.fromThrowable)
+
+ override def getSchemaSubject(request: GetSchemaSubjectRequest): ZIO[Env, Status, SchemaSubjectResponse] =
+ KafkaProducer
+ .getSchemaSubjects(request)
+ .tapError(e => log.throwable(s"Error retrieving schemas: ${e.getMessage}", e))
.mapError(GRPCStatus.fromThrowable)
}
}
diff --git a/service/src/main/scala/io/kafkamate/grpc/TopicsService.scala b/service/src/main/scala/io/kafkamate/grpc/TopicsService.scala
index f3dbe6f..a25180d 100644
--- a/service/src/main/scala/io/kafkamate/grpc/TopicsService.scala
+++ b/service/src/main/scala/io/kafkamate/grpc/TopicsService.scala
@@ -2,33 +2,33 @@ package io.kafkamate
package grpc
import io.grpc.Status
-import zio.{ZEnv, ZIO}
+import io.kafkamate.kafka.KafkaExplorer
+import io.kafkamate.topics._
+import io.kafkamate.utils._
import zio.logging._
-
-import kafka.KafkaExplorer
-import topics._
-import utils._
+import zio.{ZEnv, ZIO}
object TopicsService {
type Env = ZEnv with KafkaExplorer.HasKafkaExplorer with Logging
object GrpcService extends ZioTopics.RTopicsService[Env] {
+
def getTopics(request: GetTopicsRequest): ZIO[Env, Status, TopicResponse] =
KafkaExplorer
.listTopics(request.clusterId)
- .tapError(e => log.error(s"Get topics error: ${e.getMessage}"))
- .bimap(GRPCStatus.fromThrowable, r => TopicResponse(r))
+ .tapError(e => log.throwable(s"Get topics error: ${e.getMessage}", e))
+ .mapBoth(GRPCStatus.fromThrowable, r => TopicResponse(r))
def addTopic(request: AddTopicRequest): ZIO[Env, Status, TopicDetails] =
KafkaExplorer
.addTopic(request)
- .tapError(e => log.error(s"Add topic error: ${e.getMessage}"))
+ .tapError(e => log.throwable(s"Add topic error: ${e.getMessage}", e))
.mapError(GRPCStatus.fromThrowable)
def deleteTopic(request: DeleteTopicRequest): ZIO[Env with Any, Status, DeleteTopicResponse] =
KafkaExplorer
.deleteTopic(request)
- .tapError(e => log.error(s"Delete topic error: ${e.getMessage}"))
+ .tapError(e => log.throwable(s"Delete topic error: ${e.getMessage}", e))
.mapError(GRPCStatus.fromThrowable)
}
}
diff --git a/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala b/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala
index 716e231..48a8e05 100644
--- a/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala
+++ b/service/src/main/scala/io/kafkamate/kafka/KafkaConsumer.scala
@@ -1,33 +1,37 @@
package io.kafkamate
package kafka
+import java.nio.ByteBuffer
import java.util.UUID
-import scala.jdk.CollectionConverters._
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
+import com.google.protobuf.util.JsonFormat
+import com.google.protobuf.{Message, MessageOrBuilder}
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
-import com.google.protobuf.{Message => GMessage}
+import io.kafkamate.config.ClustersConfig._
+import io.kafkamate.config._
+import io.kafkamate.kafka.KafkaExplorer._
+import io.kafkamate.messages._
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.SerializationException
import zio._
import zio.blocking.Blocking
import zio.clock.Clock
import zio.duration._
-import zio.logging._
-import zio.stream.ZStream
-import zio.kafka.consumer._
import zio.kafka.consumer.Consumer._
+import zio.kafka.consumer._
import zio.kafka.serde.Deserializer
+import zio.logging._
import zio.macros.accessible
-
-import io.kafkamate.messages.MessageFormat.PROTOBUF
-import config._, ClustersConfig._
-import messages._
+import zio.stream.ZStream
@accessible object KafkaConsumer {
type KafkaConsumer = Has[Service]
- type Env = Clock with Blocking with Logging
+ type Env = Clock with Blocking with Logging with HasKafkaExplorer
trait Service {
- def consume(request: ConsumeRequest): ZStream[Env, Throwable, Message]
+ def consume(request: ConsumeRequest): ZStream[Env, Throwable, LogicMessage]
}
lazy val liveLayer: URLayer[ClustersConfigService, KafkaConsumer] =
@@ -35,63 +39,169 @@ import messages._
private def createService(clustersConfigService: ClustersConfig.Service): Service =
new Service {
- private def extractOffsetStrategy(offsetValue: String): AutoOffsetStrategy =
- offsetValue match {
- case "earliest" => AutoOffsetStrategy.Earliest
- case _ => AutoOffsetStrategy.Latest
- }
- private def protobufDeserializer(settings: ProtoSerdeSettings): Deserializer[Any, Try[GMessage]] =
- Deserializer {
- val protoDeser = new KafkaProtobufDeserializer()
- protoDeser.configure(settings.configs.asJava, false)
- protoDeser
- }.asTry
+ private def safeOffset(maxResults: Long)(offset: Long): Long = {
+ val r = offset - maxResults
+ if (r < 0L) 0L else r
+ }
+
+ private def extractOffsetStrategy(request: ConsumeRequest): RIO[Env, OffsetRetrieval] =
+ request.offsetStrategy match {
+ case OffsetStrategy.REAL_TIME => UIO(OffsetRetrieval.Auto(AutoOffsetStrategy.Latest))
+ case OffsetStrategy.FROM_BEGINNING => UIO(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
+ case OffsetStrategy.LATEST =>
+ for {
+ b <- ZIO.service[Blocking.Service]
+ c <- ZIO.service[Clock.Service]
+ k <- ZIO.service[KafkaExplorer.Service]
+ dep = Has.allOf(b, c, k)
+ f = (tps: Set[TopicPartition]) =>
+ KafkaExplorer
+ .getLatestOffset(request.clusterId, tps)
+ .map(_.view.mapValues(safeOffset(request.maxResults)).toMap)
+ .provide(dep)
+ } yield OffsetRetrieval.Manual(f)
+ case v => ZIO.fail(new IllegalArgumentException(s"Unrecognized OffsetStrategy: $v"))
+ }
- private def consumerSettings(config: ClusterSettings, offsetStrategy: String): Task[ConsumerSettings] =
- Task {
+ private def consumerSettings(
+ config: ClusterSettings,
+ request: ConsumeRequest
+ ): RIO[Env, ConsumerSettings] =
+ for {
+ offsetRetrieval <- extractOffsetStrategy(request)
+ manualResetOffset = offsetRetrieval match {
+ case _: OffsetRetrieval.Manual => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
+ case _ => Map.empty[String, AnyRef]
+ }
+ } yield {
val uuid = UUID.randomUUID().toString
ConsumerSettings(config.kafkaHosts)
.withGroupId(s"group-kafkamate-$uuid")
.withClientId(s"client-kafkamate-$uuid")
.withProperties(config.protoSerdeSettings.map(_.configs).getOrElse(Map.empty))
- .withOffsetRetrieval(OffsetRetrieval.Auto(extractOffsetStrategy(offsetStrategy)))
+ .withOffsetRetrieval(offsetRetrieval)
+ .withProperties(manualResetOffset)
.withCloseTimeout(10.seconds)
}
- private def makeConsumerLayer(clusterId: String, offsetStrategy: String): RLayer[Clock with Blocking, Consumer] =
+ private def makeConsumerLayer(request: ConsumeRequest) =
ZLayer.fromManaged {
for {
- cs <- clustersConfigService.getCluster(clusterId).toManaged_
- settings <- consumerSettings(cs, offsetStrategy).toManaged_
+ cs <- clustersConfigService.getCluster(request.clusterId).toManaged_
+ settings <- consumerSettings(cs, request).toManaged_
consumer <- Consumer.make(settings)
} yield consumer
}
- def consume(request: ConsumeRequest): ZStream[Env, Throwable, Message] = {
- def consumer[T](valueDeserializer: Deserializer[Any, Try[T]]) = Consumer
- .subscribeAnd(Subscription.topics(request.topicName))
- .plainStream(Deserializer.string, valueDeserializer)
- .collect {
- case v if v.value.isSuccess =>
- Message(v.offset.offset, v.partition, v.timestamp, v.key, v.value.get.toString)
- }
+ private def toJson(messageOrBuilder: MessageOrBuilder): Task[String] =
+ Task(JsonFormat.printer.print(messageOrBuilder))
+
+ private def protobufDeserializer(settings: ProtoSerdeSettings): Task[Deserializer[Any, Try[Message]]] =
+ Deserializer
+ .fromKafkaDeserializer(new KafkaProtobufDeserializer[Message](), settings.configs, false)
+ .map(_.asTry)
- val stream = request.messageFormat match {
- case PROTOBUF =>
- val protoSettings = clustersConfigService
- .getCluster(request.clusterId)
- .flatMap(c =>
- ZIO
- .fromOption(c.protoSerdeSettings)
- .orElseFail(new Exception("SchemaRegistry url was not provided!"))
- )
- ZStream
- .fromEffect(protoSettings)
- .flatMap(p => consumer(protobufDeserializer(p)))
- case _ => consumer(Deserializer.string.asTry)
+ private def getSchemaId(record: CommittableRecord[String, Array[Byte]]): URIO[Logging, Option[Int]] =
+ Task {
+ val buffer = ByteBuffer.wrap(record.value)
+ if (buffer.get != 0x0) throw new SerializationException("Unknown magic byte for schema id!")
+ buffer.getInt()
+ }.asSome.catchAll { t =>
+ log
+ .throwable(
+ s"Failed extracting schema id with key: '${record.key}'; will ignore; error message: ${t.getMessage}",
+ t
+ )
+ .as(None)
}
+ private def asLogicMessage(
+ record: CommittableRecord[String, Array[Byte]],
+ valueFormat: MessageFormat,
+ valueSchemaId: Option[Int],
+ valueStr: Option[String]
+ ) =
+ LogicMessage(
+ offset = record.offset.offset,
+ partition = record.partition,
+ timestamp = record.timestamp,
+ key = Option(record.key),
+ valueFormat = valueFormat,
+ valueSchemaId = valueSchemaId,
+ value = valueStr
+ )
+
+ private def deserializeAuto(
+ messageFormat: MessageFormat,
+ cachedDeserializer: RIO[Logging, Deserializer[Any, Try[Message]]],
+ record: CommittableRecord[String, Array[Byte]],
+ fallback: Throwable => Task[LogicMessage]
+ ) =
+ for {
+ d <- cachedDeserializer
+ tmp = for {
+ pair <- d.deserialize(record.record.topic, record.record.headers, record.value) <&> getSchemaId(record)
+ r <- pair match {
+ case (Success(message), schemaId) =>
+ toJson(message).map(jsonStr => asLogicMessage(record, MessageFormat.PROTOBUF, schemaId, Some(jsonStr)))
+ case (Failure(e), _) =>
+ Task.fail(e)
+ }
+ } yield r
+ r <- tmp.catchAll { t =>
+ log.throwable(
+ s"Failed auto deserializing $messageFormat with key: '${record.key}'; error message: ${t.getMessage}",
+ t) *> fallback(t)
+ }
+ } yield r
+
+ private def deserializeString(record: CommittableRecord[String, Array[Byte]]) =
+ Deserializer
+ .string
+ .deserialize(record.record.topic, record.record.headers, record.value)
+ .map(str => asLogicMessage(record, MessageFormat.STRING, None, Some(str)))
+
+ def consume(request: ConsumeRequest): ZStream[Env, Throwable, LogicMessage] = {
+ val stream =
+ for {
+ _ <- ZStream.fromEffect(log.debug(s"Consuming request: $request"))
+ cachedProtoDeserializer <- ZStream.fromEffect(
+ clustersConfigService
+ .getCluster(request.clusterId)
+ .flatMap(c =>
+ ZIO
+ .fromOption(c.protoSerdeSettings)
+ .orElseFail(new RuntimeException("SchemaRegistry url was not provided!")))
+ .flatMap(protobufDeserializer)
+ .zipLeft(log.debug(s"Created proto deserializer"))
+ .memoize
+ )
+ response <- Consumer
+ .subscribeAnd(Subscription.topics(request.topicName))
+ .plainStream(Deserializer.string, Deserializer.byteArray)
+ .mapM { record =>
+ request.messageFormat match {
+ case MessageFormat.AUTO =>
+ deserializeAuto(
+ MessageFormat.PROTOBUF,
+ cachedProtoDeserializer,
+ record,
+ _ => deserializeString(record)
+ )
+ case MessageFormat.PROTOBUF =>
+ deserializeAuto(
+ MessageFormat.PROTOBUF,
+ cachedProtoDeserializer,
+ record,
+ e => UIO(asLogicMessage(record, MessageFormat.PROTOBUF, None, Option(e.getMessage)))
+ )
+ case _ =>
+ deserializeString(record)
+ }
+ }
+ } yield response
+
val withFilter = {
val trimmed = request.filterKeyword.trim
if (trimmed.isEmpty) stream
@@ -103,7 +213,7 @@ import messages._
else withFilter.take(request.maxResults)
withFilterLimit.provideSomeLayer[Env](
- makeConsumerLayer(request.clusterId, request.offsetStrategy)
+ makeConsumerLayer(request)
)
}
}
diff --git a/service/src/main/scala/io/kafkamate/kafka/KafkaExplorer.scala b/service/src/main/scala/io/kafkamate/kafka/KafkaExplorer.scala
index ad118c2..19e6d20 100644
--- a/service/src/main/scala/io/kafkamate/kafka/KafkaExplorer.scala
+++ b/service/src/main/scala/io/kafkamate/kafka/KafkaExplorer.scala
@@ -1,29 +1,38 @@
package io.kafkamate
package kafka
+import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
+
+import io.kafkamate.brokers.BrokerDetails
+import io.kafkamate.config.ClustersConfig._
+import io.kafkamate.topics._
+import org.apache.kafka.clients.admin.{AdminClient => JAdminClient, OffsetSpec}
+import org.apache.kafka.common.TopicPartition
import zio._
-import zio.blocking.Blocking
+import zio.blocking._
import zio.clock.Clock
import zio.duration._
-import zio.kafka.admin._
import zio.kafka.admin.AdminClient.{ConfigResource, ConfigResourceType}
+import zio.kafka.admin._
import zio.macros.accessible
-import config._, ClustersConfig._
-import topics._
-import brokers.BrokerDetails
-
@accessible object KafkaExplorer {
type HasKafkaExplorer = Has[Service]
- type HasAdminClient = Has[AdminClient]
+ type HasAdminClient = Has[AdminClient] with Has[JAdminClient]
- val CleanupPolicyKey = "cleanup.policy"
- val RetentionMsKey = "retention.ms"
+ private val CleanupPolicyKey = "cleanup.policy"
+ private val RetentionMsKey = "retention.ms"
trait Service {
def listBrokers(clusterId: String): RIO[Blocking with Clock, List[BrokerDetails]]
def listTopics(clusterId: String): RIO[Blocking with Clock, List[TopicDetails]]
+
+ def getLatestOffset(
+ clusterId: String,
+ tps: Set[TopicPartition]
+ ): RIO[Blocking with Clock, Map[TopicPartition, Long]]
def addTopic(req: AddTopicRequest): RIO[Blocking with Clock, TopicDetails]
def deleteTopic(req: DeleteTopicRequest): RIO[Blocking with Clock, DeleteTopicResponse]
}
@@ -31,17 +40,21 @@ import brokers.BrokerDetails
lazy val liveLayer: URLayer[ClustersConfigService, HasKafkaExplorer] =
ZLayer.fromService { clustersConfigService =>
new Service {
- private def adminClientLayer(clusterId: String): RLayer[Blocking, HasAdminClient] =
- ZLayer.fromManaged {
- for {
- cs <- clustersConfigService.getCluster(clusterId).toManaged_
- client <- AdminClient.make(AdminClientSettings(cs.kafkaHosts, 2.seconds, Map.empty))
- } yield client
- }
+ private def adminClientLayer(clusterId: String): RLayer[Blocking, HasAdminClient] = {
+ def services = for {
+ cs <- clustersConfigService.getCluster(clusterId).toManaged_
+ s = AdminClientSettings(cs.kafkaHosts, 2.seconds, Map.empty)
+ managed = Task(JAdminClient.create(s.driverSettings.asJava)).toManaged(ja => UIO(ja.close(s.closeTimeout)))
+ client <- AdminClient.fromManagedJavaClient(managed)
+ jAdmin <- managed
+ } yield Has.allOf(client, jAdmin)
+
+ ZLayer.fromManagedMany(services)
+ }
private implicit class AdminClientProvider[A](eff: RIO[HasAdminClient with Blocking, A]) {
def withAdminClient(clusterId: String): RIO[Blocking with Clock, A] = eff
- .timeoutFail(new Exception("Timed out"))(6.seconds)
+ .timeoutFail(new TimeoutException("Timed out after 15 second!"))(15.seconds)
.provideSomeLayer[Blocking with Clock](adminClientLayer(clusterId))
}
@@ -50,14 +63,14 @@ import brokers.BrokerDetails
.accessM[HasAdminClient with Blocking] { env =>
val ac = env.get[AdminClient]
for {
- (nodes, controllerId) <- ac.describeClusterNodes() <&> ac.describeClusterController().map(_.id)
+ (nodes, controllerId) <- ac.describeClusterNodes() <&> ac.describeClusterController().map(_.map(_.id))
brokers = nodes.map { n =>
- val nodeId = n.id
- if (controllerId != nodeId) BrokerDetails(nodeId)
- else BrokerDetails(nodeId, isController = true)
- }
- //resources = nodes.map(n => new ConfigResource(ConfigResource.Type.BROKER, n.idString()))
- //_ <- ac.describeConfigs(resources)
+ val nodeId = n.id
+ if (!controllerId.contains(nodeId)) BrokerDetails(nodeId)
+ else BrokerDetails(nodeId, isController = true)
+ }
+ // resources = nodes.map(n => new ConfigResource(ConfigResource.Type.BROKER, n.idString()))
+ // _ <- ac.describeConfigs(resources)
} yield brokers
}
.withAdminClient(clusterId)
@@ -67,28 +80,63 @@ import brokers.BrokerDetails
.accessM[HasAdminClient with Blocking] { env =>
val ac = env.get[AdminClient]
ac.listTopics()
- .map(_.keys.toList)
- .flatMap(ls => ZIO.filterNotPar(ls)(t => UIO(t.startsWith("_"))))
+ .map(_.keys.toList.filterNot(_.startsWith("_")))
.flatMap(ls =>
- ac.describeTopics(ls) <&> ac.describeConfigs(ls.map(ConfigResource(ConfigResourceType.Topic, _)))
- )
- .map { case (nameDescriptionMap, topicConfigMap) =>
+ ZIO.tupledPar(
+ ac.describeTopics(ls).flatMap(r => getTopicsSize(r).map((r, _))),
+ ac.describeConfigs(ls.map(ConfigResource(ConfigResourceType.Topic, _)))
+ ))
+ .map { case ((nameDescriptionMap, sizes), topicConfigMap) =>
val configs = topicConfigMap.map { case (res, conf) => (res.name, conf) }
- nameDescriptionMap.map { case (name, description) =>
- val conf = configs.get(name).map(_.entries)
+ nameDescriptionMap.map { case (topicName, description) =>
+ val conf = configs.get(topicName).map(_.entries)
def getConfig(key: String) = conf.flatMap(_.get(key).map(_.value())).getOrElse("unknown")
TopicDetails(
- name = name,
+ name = topicName,
partitions = description.partitions.size,
replication = description.partitions.headOption.map(_.replicas.size).getOrElse(0),
cleanupPolicy = getConfig(CleanupPolicyKey),
- retentionMs = getConfig(RetentionMsKey)
+ retentionMs = getConfig(RetentionMsKey),
+ size = sizes.getOrElse(topicName, 0L)
)
}.toList.sortBy(_.name)
}
}
.withAdminClient(clusterId)
+ private def getTopicsSize(
+ topicDescription: Map[String, AdminClient.TopicDescription]
+ ): RIO[Blocking with HasAdminClient, Map[String, Long]] = {
+ val brokerIds = topicDescription.values.flatMap(_.partitions).flatMap(_.leader.map(_.id)).toSet
+ aggregateTopicSizes(brokerIds)
+ }
+
+ private def aggregateTopicSizes(brokerIds: Set[Int]): RIO[HasAdminClient with Blocking, Map[String, Long]] =
+ for {
+ jAdmin <- ZIO.service[JAdminClient]
+ ids = brokerIds.map(Integer.valueOf).asJavaCollection
+ asJavaFuture <- effectBlocking(jAdmin.describeLogDirs(ids).descriptions())
+ logDirInfo <- ZIO.foreachPar(asJavaFuture.asScala.toMap) { case (brokerId, kafkaFuture) =>
+ AdminClient.fromKafkaFuture(effectBlocking(kafkaFuture)).map(brokerId -> _.asScala)
+ }
+ } yield {
+ logDirInfo.values
+ .flatMap(_.values)
+ .flatMap(_.replicaInfos.asScala)
+ .groupMapReduce({ case (tp, _) => tp.topic })({ case (_, info) => info.size })(_ + _)
+ }
+
+ override def getLatestOffset(
+ clusterId: String,
+ tps: Set[TopicPartition]
+ ): RIO[Blocking with Clock, Map[TopicPartition, Long]] = {
+ val topicPartitionOffsets = tps.map(tp => (tp, OffsetSpec.latest())).toMap.asJava
+ for {
+ result <- ZIO.service[JAdminClient].mapEffect(_.listOffsets(topicPartitionOffsets))
+ map <- AdminClient.fromKafkaFuture(UIO(result.all())).map(_.asScala.view.mapValues(_.offset()).toMap)
+ } yield map
+ }.withAdminClient(clusterId)
+
def addTopic(req: AddTopicRequest): RIO[Blocking with Clock, TopicDetails] =
ZIO
.accessM[HasAdminClient with Blocking] { env =>
@@ -101,7 +149,7 @@ import brokers.BrokerDetails
req.replication.toShort,
Map(
CleanupPolicyKey -> req.cleanupPolicy,
- RetentionMsKey -> req.retentionMs
+ RetentionMsKey -> req.retentionMs
)
)
)
diff --git a/service/src/main/scala/io/kafkamate/kafka/KafkaProducer.scala b/service/src/main/scala/io/kafkamate/kafka/KafkaProducer.scala
index 912d845..cb0d0cc 100644
--- a/service/src/main/scala/io/kafkamate/kafka/KafkaProducer.scala
+++ b/service/src/main/scala/io/kafkamate/kafka/KafkaProducer.scala
@@ -1,42 +1,198 @@
package io.kafkamate
package kafka
+import scala.jdk.CollectionConverters._
+
+import com.google.protobuf.util.JsonFormat
+import com.google.protobuf.{Descriptors, DynamicMessage, Message}
+import io.confluent.kafka.formatter.SchemaMessageSerializer
+import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
+import io.confluent.kafka.schemaregistry.protobuf.{ProtobufSchema, ProtobufSchemaProvider}
+import io.confluent.kafka.schemaregistry.{ParsedSchema, SchemaProvider}
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
+import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer
+import io.kafkamate.config.ClustersConfig._
+import io.kafkamate.config._
+import io.kafkamate.messages._
import zio._
import zio.blocking._
-import zio.kafka.serde._
import zio.kafka.producer._
+import zio.kafka.serde._
+import zio.logging._
import zio.macros.accessible
-
-import config._, ClustersConfig._
+import zio.magic._
@accessible object KafkaProducer {
type KafkaProducer = Has[Service]
trait Service {
- def produce(topic: String, key: String, value: String)(clusterId: String): RIO[Blocking, Unit]
+ def getSchemaSubjects(request: GetSchemaSubjectRequest): Task[SchemaSubjectResponse]
+ def produce(request: ProduceRequest): RIO[Blocking, Unit]
}
- lazy val liveLayer: URLayer[ClustersConfigService, KafkaProducer] =
- ZLayer.fromService { clusterConfigService =>
- new Service {
- lazy val serdeLayer: ULayer[Has[Serializer[Any, String]]] =
- ZLayer.succeed(Serde.string)
-
- def settingsLayer(clusterId: String): TaskLayer[Has[ProducerSettings]] =
- clusterConfigService
- .getCluster(clusterId)
- .map(c => ProducerSettings(c.kafkaHosts))
- .toLayer
-
- def producerLayer(clusterId: String): RLayer[Blocking, Producer[Any, String, String]] =
- Blocking.any ++ serdeLayer ++ settingsLayer(clusterId) >>> Producer.live[Any, String, String]
-
- def produce(topic: String, key: String, value: String)(clusterId: String): RIO[Blocking, Unit] =
- Producer
- .produce[Any, String, String](topic, key, value)
- .unit
- .provideSomeLayer[Blocking](producerLayer(clusterId))
- }
+ lazy val liveLayer: URLayer[ClustersConfigService with Logging, KafkaProducer] =
+ ZLayer.fromServices[ClustersConfig.Service, Logger[String], KafkaProducer.Service] {
+ (clusterConfigService, logging) =>
+ new Service {
+ lazy val providers: List[SchemaProvider] =
+ List(new ProtobufSchemaProvider())
+
+ lazy val producerConfig: Map[String, String] =
+ Map("auto.register.schema" -> "true")
+
+ def custerSettingsLayer(clusterId: String): TaskLayer[Has[ClusterSettings]] =
+ clusterConfigService.getCluster(clusterId).toLayer
+
+ def settingsLayer: URLayer[Has[ClusterSettings], Has[ProducerSettings]] =
+ ZIO
+ .service[ClusterSettings]
+ .map(c => ProducerSettings(c.kafkaHosts))
+ .toLayer
+
+ def getSchemaRegistryClient: RIO[Has[ClusterSettings], CachedSchemaRegistryClient] =
+ ZIO
+ .service[ClusterSettings]
+ .map(_.schemaRegistryUrl)
+ .someOrFail(new RuntimeException("Schema registry url not provided!"))
+ .mapEffect { schemaRegistryUrl =>
+ new CachedSchemaRegistryClient(
+ List(schemaRegistryUrl).asJava,
+ AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
+ providers.asJava,
+ producerConfig.asJava
+ )
+ }
+
+ def getSchema(registry: CachedSchemaRegistryClient, id: Int): Task[ParsedSchema] =
+ Task(registry.getSchemaById(id))
+
+ def getSerializer(registry: CachedSchemaRegistryClient): Task[KM8ProtobufMessageSerializer] =
+ Task(KM8ProtobufMessageSerializer(registry))
+
+ private def parseSchema(valueString: String, schemaDescriptor: Descriptors.Descriptor): Task[Message] =
+ Task {
+ val builder = DynamicMessage.newBuilder(schemaDescriptor)
+ JsonFormat.parser.merge(valueString, builder)
+ builder.build
+ }.tapBoth(
+ e =>
+ logging.throwable(
+ s"Error while parsing schema with descriptor ${schemaDescriptor.getName}: ${e.getMessage}",
+ e
+ ),
+ _ => logging.debug(s"Success parsing schema with descriptor: ${schemaDescriptor.getName}")
+ )
+
+ private def extractDescriptors(valueDescriptor: Option[String], protobufSchema: ProtobufSchema) =
+ Task {
+ valueDescriptor match {
+ case Some(name) => List(protobufSchema.toDescriptor(name))
+ case None =>
+ List(protobufSchema.toDescriptor) ++
+ protobufSchema.toDynamicSchema.getMessageTypes.asScala
+ .map(protobufSchema.toDescriptor)
+ }
+ }
+
+ def readFrom(
+ valueString: String,
+ valueSchema: ParsedSchema,
+ valueDescriptor: Option[String]
+ ): Task[Message] = {
+ val process = for {
+ protobufSchema <- Task(valueSchema.asInstanceOf[ProtobufSchema])
+ descriptors <- extractDescriptors(valueDescriptor, protobufSchema)
+ ios = descriptors.map(parseSchema(valueString, _))
+ r <- ZIO.firstSuccessOf(ios.head, ios.tail)
+ } yield r
+
+ process.tapError(e =>
+ logging.throwable(s"Error while reading from ($valueString) and schema ($valueSchema)", e))
+ }
+
+ def readMessage(request: ProduceProtoRequest): RIO[Has[ClusterSettings], Array[Byte]] =
+ for {
+ _ <- logging.debug(s"request: $request")
+ registry <- getSchemaRegistryClient
+ valueSchema <- getSchema(registry, request.schemaId)
+ value <- readFrom(request.value, valueSchema, request.valueDescriptor)
+ _ <- logging.debug(s"value message: $value")
+ serializer <- getSerializer(registry)
+ bytes <-
+ Task(serializer.serialize(request.valueSubject, request.topicName, isKey = false, value, valueSchema))
+ } yield bytes
+
+ def producerLayer(
+ clusterId: String
+ ): RLayer[Blocking, Has[Producer] with Has[ClusterSettings]] =
+ ZLayer.wireSome[Blocking, Has[Producer] with Has[ClusterSettings]](
+ custerSettingsLayer(clusterId),
+ settingsLayer,
+ Producer.live
+ )
+
+ override def produce(produceRequest: ProduceRequest): RIO[Blocking, Unit] =
+ produceRequest.request match {
+ case ProduceRequest.Request.ProtoRequest(r) =>
+ readMessage(r).flatMap { bytes =>
+ Producer
+ .produce(r.topicName, r.key, bytes, Serde.string, Serde.byteArray)
+ .unit
+ }.provideSomeLayer[Blocking](producerLayer(r.clusterId))
+ case ProduceRequest.Request.StringRequest(r) =>
+ Producer
+ .produce(r.topicName, r.key, r.value.getBytes, Serde.string, Serde.byteArray)
+ .unit
+ .provideSomeLayer[Blocking](producerLayer(r.clusterId))
+ case _ => ZIO.fail(new RuntimeException("Not implemented!"))
+ }
+
+ override def getSchemaSubjects(request: GetSchemaSubjectRequest): Task[SchemaSubjectResponse] = {
+ val process =
+ for {
+ baseUrl <- ZIO
+ .service[ClusterSettings]
+ .map(_.schemaRegistryUrl)
+ .someOrFail(new RuntimeException("Schema registry url not provided!"))
+ subject = s"${request.topicName}-value"
+ registry <- getSchemaRegistryClient
+ versions <- Task(registry.getAllVersions(subject).asScala.toList)
+ metas <- ZIO.foreachPar(versions)(v => Task(registry.getSchemaMetadata(subject, v)))
+ schemaUrl = (id: Int) => s"$baseUrl/schemas/ids/$id"
+ } yield SchemaSubjectResponse(
+ metas.map { meta =>
+ SchemaSubject(meta.getId, schemaUrl(meta.getId))
+ }.sortBy(-_.id)
+ )
+
+ process.provideLayer(custerSettingsLayer(request.clusterId))
+ }
+
+ }
}
+ case class KM8ProtobufMessageSerializer(
+ schemaRegistryClient: SchemaRegistryClient,
+ autoRegister: Boolean = true,
+ useLatest: Boolean = true) extends AbstractKafkaProtobufSerializer[Message]
+ with SchemaMessageSerializer[Message] {
+
+ this.schemaRegistry = schemaRegistryClient
+ this.autoRegisterSchema = autoRegister
+ this.useLatestVersion = useLatest
+
+ override def getKeySerializer = ???
+
+ override def serializeKey(topic: String, payload: Object) = ???
+
+ override def serialize(
+ subject: String,
+ topic: String,
+ isKey: Boolean,
+ `object`: Message,
+ schema: ParsedSchema
+ ): Array[Byte] =
+ super.serializeImpl(subject, topic, isKey, `object`, schema.asInstanceOf[ProtobufSchema])
+ }
+
}
diff --git a/service/src/main/scala/io/kafkamate/utils/GRPCStatus.scala b/service/src/main/scala/io/kafkamate/utils/GRPCStatus.scala
index 911af3a..1d9a772 100644
--- a/service/src/main/scala/io/kafkamate/utils/GRPCStatus.scala
+++ b/service/src/main/scala/io/kafkamate/utils/GRPCStatus.scala
@@ -6,6 +6,6 @@ import io.grpc.Status
object GRPCStatus {
def fromThrowable(e: Throwable): Status =
- Status.UNKNOWN.withDescription(e.getMessage).withCause(e)
+ Status.INTERNAL.withCause(e).withDescription(e.getMessage)
}
diff --git a/service/src/main/scala/io/kafkamate/utils/Logger.scala b/service/src/main/scala/io/kafkamate/utils/Logger.scala
index 53faf6f..88192e3 100644
--- a/service/src/main/scala/io/kafkamate/utils/Logger.scala
+++ b/service/src/main/scala/io/kafkamate/utils/Logger.scala
@@ -9,8 +9,9 @@ import zio.logging._
object Logger {
lazy val liveLayer: URLayer[Console with Clock, Logging] =
+ // Slf4jLogger.make((_, message) => message)
Logging.console(
- logLevel = LogLevel.Info,
+ logLevel = LogLevel.Debug,
format = LogFormat.ColoredLogFormat()
) >>> Logging.withRootLoggerName("kafkamate")
diff --git a/service/src/test/scala/io/kafkamate/kafka/KafkaConsumerSpec.scala b/service/src/test/scala/io/kafkamate/kafka/KafkaConsumerSpec.scala
new file mode 100644
index 0000000..d165c07
--- /dev/null
+++ b/service/src/test/scala/io/kafkamate/kafka/KafkaConsumerSpec.scala
@@ -0,0 +1,51 @@
+package io.kafkamate.kafka
+
+import io.kafkamate.kafka.KafkaConsumer._
+import io.kafkamate.kafka.KafkaExplorer.HasKafkaExplorer
+import io.kafkamate.messages._
+import io.kafkamate.util.{HelperSpec, KafkaEmbedded}
+import io.kafkamate.utils.Logger
+import zio._
+import zio.blocking.Blocking
+import zio.clock.Clock
+import zio.console._
+import zio.duration._
+import zio.kafka.producer.Producer
+import zio.logging._
+import zio.magic._
+import zio.test.TestAspect._
+import zio.test._
+import zio.test.environment._
+
+object KafkaConsumerSpec extends DefaultRunnableSpec with HelperSpec {
+
+ type TestEnv = Clock with Blocking with Logging with KafkaConsumer with HasKafkaExplorer with Has[Producer]
+
+ val testLayer: ULayer[TestEnv] =
+ ZLayer.wire[TestEnv](
+ Clock.live,
+ Console.live,
+ Blocking.live,
+ KafkaExplorer.liveLayer,
+ KafkaEmbedded.Kafka.embedded.orDie,
+ testConfigLayer,
+ Logger.liveLayer,
+ producerSettings,
+ Producer.live.orDie,
+ KafkaConsumer.liveLayer
+ )
+
+ override def spec: ZSpec[TestEnvironment, Throwable] =
+ suite("Kafka Consumer")(
+ testM("consume messages from kafka") {
+ val io = for {
+ topic <- UIO("topic150")
+ kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i"))
+ _ <- produceMany(topic, kvs)
+ req = ConsumeRequest("test-id", topic, 5, OffsetStrategy.LATEST, "", MessageFormat.STRING)
+ records <- KafkaConsumer.consume(req).runCollect
+ } yield assertTrue(records.map(v => (v.key.get, v.value.get)).toList == kvs.map(v => (v._1, v._2)))
+ io
+ }
+ ).provideLayerShared(testLayer) @@ timeout(30.seconds)
+}
diff --git a/service/src/test/scala/io/kafkamate/kafka/KafkaProducerSpec.scala b/service/src/test/scala/io/kafkamate/kafka/KafkaProducerSpec.scala
new file mode 100644
index 0000000..0c1d768
--- /dev/null
+++ b/service/src/test/scala/io/kafkamate/kafka/KafkaProducerSpec.scala
@@ -0,0 +1,22 @@
+package io.kafkamate.kafka
+
+import io.kafkamate.util.HelperSpec
+import zio._
+import zio.duration._
+import zio.test._
+
+object KafkaProducerSpec extends DefaultRunnableSpec with HelperSpec {
+
+ // add tests for KafkaProducer using KafkaEmbedded and SchemaRegistryEmbedded and zio-test
+ // the test should testing the proto serialization and consume the messages from kafka and check the deserialization
+ // the test should testing the json serialization and consume the messages from kafka and check the deserialization
+
+ override def spec = suite("Kafka Producer")(
+ testM("produce proto messages to kafka") {
+ for {
+ _ <- ZIO.unit
+ } yield assertCompletes
+ }
+ ) @@ TestAspect.timeout(30.seconds)
+
+}
diff --git a/service/src/test/scala/io/kafkamate/kafka/consumer/ConsumerSpec.scala b/service/src/test/scala/io/kafkamate/kafka/consumer/ConsumerSpec.scala
deleted file mode 100644
index 477c8d8..0000000
--- a/service/src/test/scala/io/kafkamate/kafka/consumer/ConsumerSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-package io.kafkamate
-package kafka
-package consumer
-
-import zio._
-import zio.blocking.Blocking
-import zio.clock.Clock
-import zio.duration._
-import zio.test.Assertion._
-import zio.test.TestAspect._
-import zio.test.environment._
-import zio.logging._
-import zio.console._
-import zio.test.{DefaultRunnableSpec, _}
-
-import util.{HelperSpec, KafkaEmbedded}
-
-object ConsumerSpec extends DefaultRunnableSpec with HelperSpec {
- import KafkaConsumer._
- import utils.Logger
- import messages._
-
- val testLayer
- : Layer[TestFailure[Throwable], Clock with Blocking with Logging with StringProducer with KafkaConsumer] =
- (Clock.live >+>
- Console.live >+>
- Blocking.live >+>
- KafkaEmbedded.Kafka.embedded >+>
- stringProducer >+>
- testConfigLayer >+>
- Logger.liveLayer >+>
- KafkaConsumer.liveLayer).mapError(TestFailure.fail)
-
- override def spec: ZSpec[TestEnvironment, Throwable] =
- suite("Kafka Consumer")(
- testM("consume messages from kafka") {
- for {
- topic <- UIO("topic150")
- kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i"))
- _ <- produceMany(topic, kvs)
- records <- KafkaConsumer.consume(ConsumeRequest("test-id", topic, 5, "earliest", "")).runCollect
- } yield assert(records.map(v => (v.key, v.value)).toList)(equalTo(kvs.map(v => (v._1, v._2))))
- }
- ).provideLayerShared(testLayer) @@ timeout(30.seconds)
-}
diff --git a/service/src/test/scala/io/kafkamate/util/HelperSpec.scala b/service/src/test/scala/io/kafkamate/util/HelperSpec.scala
index d84729e..0c0a640 100644
--- a/service/src/test/scala/io/kafkamate/util/HelperSpec.scala
+++ b/service/src/test/scala/io/kafkamate/util/HelperSpec.scala
@@ -1,44 +1,39 @@
package io.kafkamate
package util
+import io.kafkamate.config.ClustersConfig._
+import io.kafkamate.config._
+import io.kafkamate.util.KafkaEmbedded.Kafka
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import zio._
import zio.blocking.Blocking
-import zio.clock.Clock
import zio.kafka.producer.{Producer, ProducerSettings}
-import zio.kafka.serde.{Serde, Serializer}
-
-import config._, ClustersConfig._
-import KafkaEmbedded.Kafka
+import zio.kafka.serde.Serde
trait HelperSpec {
- type StringProducer = Producer[Any, String, String]
-
- val producerSettings: URIO[Kafka, ProducerSettings] =
- ZIO.access[Kafka](_.get.bootstrapServers).map(ProducerSettings(_))
- val stringProducer: ZLayer[Kafka with Blocking, Throwable, StringProducer] =
- (Blocking.any ++ producerSettings.toLayer ++ ZLayer.succeed(Serde.string: Serializer[Any, String])) >>>
- Producer.live[Any, String, String]
+ val producerSettings: URLayer[Kafka, Has[ProducerSettings]] =
+ ZIO.service[Kafka.Service].map(s => ProducerSettings(s.bootstrapServers)).toLayer
- val testConfigLayer: URLayer[Clock with Blocking with Kafka, Clock with Blocking with ClustersConfigService] =
- ZLayer.requires[Clock] ++
- ZLayer.requires[Blocking] ++
- ZLayer.fromService[Kafka.Service, ClustersConfig.Service] { kafka =>
- new ClustersConfig.Service {
- def readClusters: Task[ClusterProperties] =
- Task(ClusterProperties(List(ClusterSettings("test-id", "test", kafka.bootstrapServers, None))))
+ val testConfigLayer: URLayer[Kafka, ClustersConfigService] =
+ ZLayer.fromService[Kafka.Service, ClustersConfig.Service] { kafka =>
+ new ClustersConfig.Service {
+ def readClusters: Task[ClusterProperties] =
+ Task(ClusterProperties(List(ClusterSettings("test-id", "test", kafka.bootstrapServers, None))))
- def writeClusters(cluster: ClusterSettings): Task[Unit] = ???
+ def writeClusters(cluster: ClusterSettings): Task[Unit] = ???
- def deleteCluster(clusterId: String): Task[ClusterProperties] = ???
- }
+ def deleteCluster(clusterId: String): Task[ClusterProperties] = ???
}
+ }
def produceMany(
topic: String,
kvs: Iterable[(String, String)]
- ): RIO[Blocking with StringProducer, Chunk[RecordMetadata]] =
+ ): RIO[Blocking with Has[Producer], Chunk[RecordMetadata]] =
Producer
- .produceChunk[Any, String, String](Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(topic, k, v) }))
+ .produceChunk(
+ Chunk.fromIterable(kvs.map { case (k, v) => new ProducerRecord(topic, k, v) }),
+ Serde.string,
+ Serde.string)
}
diff --git a/service/src/test/scala/io/kafkamate/util/KafkaEmbedded.scala b/service/src/test/scala/io/kafkamate/util/KafkaEmbedded.scala
index e74f277..761c3ad 100644
--- a/service/src/test/scala/io/kafkamate/util/KafkaEmbedded.scala
+++ b/service/src/test/scala/io/kafkamate/util/KafkaEmbedded.scala
@@ -8,6 +8,7 @@ object KafkaEmbedded {
type Kafka = Has[Kafka.Service]
object Kafka {
+
trait Service {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
@@ -15,12 +16,12 @@ object KafkaEmbedded {
case class EmbeddedKafkaService(embeddedK: EmbeddedK) extends Service {
override def bootstrapServers: List[String] = List(s"localhost:${embeddedK.config.kafkaPort}")
- override def stop(): UIO[Unit] = ZIO.effectTotal(embeddedK.stop(true))
+ override def stop(): UIO[Unit] = ZIO.effectTotal(embeddedK.stop(true))
}
case object DefaultLocal extends Service {
override def bootstrapServers: List[String] = List(s"localhost:9092")
- override def stop(): UIO[Unit] = UIO.unit
+ override def stop(): UIO[Unit] = UIO.unit
}
lazy val embedded: TaskLayer[Kafka] = ZLayer.fromManaged {
diff --git a/site/public/index.html b/site/public/index.html
index ce70c56..48d4010 100644
--- a/site/public/index.html
+++ b/site/public/index.html
@@ -28,6 +28,13 @@
+
+
+