Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid committed Apr 24, 2020
0 parents commit 0ec202a
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version = "2.4.2"
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name := "zio-kafka-example-app"

organization := "com.ziverge"

scalaVersion := "2.12.11"

libraryDependencies += "dev.zio" %% "zio-kafka" % "0.8.0"

run / fork := true
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.3.9
31 changes: 31 additions & 0 deletions src/main/scala/com/ziverge/ExampleApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.ziverge

import com.ziverge.chunking._
import zio._
import zio.blocking.Blocking
import zio.clock.Clock
import zio.kafka.consumer._, zio.kafka.serde._

object ExampleApp extends App {
def createRecordChunkingStream(topic: String) =
Consumer
.subscribeAnd(Subscription.topics(topic))
.plainStream(Serde.string, Serde.string)
.flattenChunks
.via(Chunking.writeRecords(_))
.mapM(_.commit)
.provideSomeLayer[Clock with Blocking with Chunking](
Consumer.make(ConsumerSettings(List("localhost:9092")).withGroupId(s"${topic}-group"))
)
.foreachManaged(_ => ZIO.unit)
.fork

def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
first <- createRecordChunkingStream("first")
second <- createRecordChunkingStream("second")
} yield ZIO.raceAll(first.join, List(second.join)))
.use(identity)
.provideCustomLayer(Chunking.live("/tmp/data"))
.foldM(err => UIO(println(err)).as(1), _ => UIO.succeed(0))
}
38 changes: 38 additions & 0 deletions src/main/scala/com/ziverge/chunking/Live.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.ziverge.chunking

import zio._
import zio.kafka.consumer.{ CommittableRecord, OffsetBatch }
import zio.stream._

import java.nio.charset.StandardCharsets
import java.nio.file.{ Files, Paths }

class Live(filePrefix: String, writtenFiles: Ref[Int]) extends Chunking.Service {
val batchRecords = ZSink.foldWeighted(List[String]())(
(rec: CommittableRecord[String, String]) => rec.value.length,
16384) { (acc, el) =>
el.record.value :: acc
}.map(_.reverse.mkString("\n").getBytes(StandardCharsets.UTF_8))

val batchOffsets = ZSink.foldLeft(OffsetBatch.empty) {
(acc, rec: CommittableRecord[String, String]) =>
acc.merge(rec.offset)
}

def writeRecords[R](
stream: ZStream[R, Throwable, CommittableRecord[String, String]]):
ZStream[R, Throwable, OffsetBatch] =
stream
.aggregate(batchOffsets zipPar batchRecords)
.mapM { case (offsets, data) =>
for {
fileIndex <- writtenFiles.updateAndGet(_ + 1)
_ <- Task {
Files.write(
Paths.get(filePrefix, s"chunk-$fileIndex"),
data
)
}
} yield offsets
}
}
25 changes: 25 additions & 0 deletions src/main/scala/com/ziverge/chunking/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.ziverge

import zio._
import zio.kafka.consumer.{ CommittableRecord, OffsetBatch }
import zio.stream._

package object chunking {
type Chunking = Has[Chunking.Service]

object Chunking {
trait Service {
def writeRecords[R](
stream: ZStream[R, Throwable, CommittableRecord[String, String]]):
ZStream[R, Throwable, OffsetBatch]
}

def writeRecords[R](
stream: ZStream[R, Throwable, CommittableRecord[String, String]]):
ZStream[R with Chunking, Throwable, OffsetBatch] =
ZStream.accessStream(_.get[Service].writeRecords(stream))

def live(filePrefix: String): ZLayer[Any, Nothing, Chunking] =
ZLayer.fromEffect(Ref.make(0).map(new Live(filePrefix, _)))
}
}

0 comments on commit 0ec202a

Please sign in to comment.