Google Cloud Pub/Sub client providing stream-based, declarative, high-level API with zio and zio-streams to help to concentrate on the business logic.
zio-pubsub
Core components/interfaces/modelszio-pubsub-google
Provides publisher, admin and StreamingPull API based subscriber client implementations using Google's Java libraryzio-pubsub-serde-circe
Provides Json Serializer/Deserializer using the circe codeczio-pubsub-serde-vulcan
Provides Avro schema Serializer/Deserializer using the vulcan codec
Alternative implementations and codecs may be added later.
To get started with sbt, add the following line to your build.sbt file to use the implementation with the Google Java library:
libraryDependencies += "com.anymindgroup" %% "zio-pubsub-google" % "0.2.2"
Create a stream for existing subscription:
import com.anymindgroup.pubsub.*, zio.*, zio.ZIO.*
object BasicSubscription extends ZIOAppDefault:
def run = Subscriber
.subscribe(subscriptionName = "basic_example", des = Serde.int)
.mapZIO { (message, ackReply) =>
for {
_ <- logInfo(
s"Received message" +
s" with id ${message.meta.messageId.value}" +
s" and data ${message.data}"
)
_ <- ackReply.ack()
} yield ()
}
.runDrain
.provide(googleSubscriber)
// subscriber implementation
private val googleSubscriber: TaskLayer[Subscriber] = {
import com.anymindgroup.pubsub.google as G
ZLayer.scoped(
G.Subscriber.makeStreamingPullSubscriber(
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
)
)
)
}
Publish random integer every 2 seconds
import com.anymindgroup.pubsub.*, zio.stream.*, zio.*, zio.ZIO.*
object SamplesPublisher extends ZIOAppDefault:
def run = ZStream
.repeatZIOWithSchedule(Random.nextInt, Schedule.fixed(2.seconds))
.mapZIO { sample =>
for {
mId <- Publisher.publish[Any, Int](
PublishMessage(
data = sample,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- logInfo(s"Published data $sample with message id ${mId.value}")
} yield ()
}
.runDrain
.provide(intPublisher)
// int publisher implementation
val intPublisher: TaskLayer[Publisher[Any, Int]] = {
import com.anymindgroup.pubsub.google as G
ZLayer.scoped(
G.Publisher.make(
config = G.PublisherConfig(
connection = G.PubsubConnectionConfig.Emulator(
G.PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
),
topicName = "basic_example",
encoding = Encoding.Binary,
enableOrdering = false,
),
ser = Serde.int,
)
)
}
Setup topics and subscription using the admin client:
import com.anymindgroup.pubsub.google.{PubsubAdmin, PubsubConnectionConfig}
import com.anymindgroup.pubsub.*
import zio.*
object ExamplesAdminSetup extends ZIOAppDefault:
def run: Task[Unit] = PubsubAdmin.setup(
connection = PubsubConnectionConfig.Emulator(
PubsubConnectionConfig.GcpProject("any"),
"localhost:8085",
),
topics = List(exampleTopic, exampleDeadLettersTopic),
subscriptions = List(exampleSub, exampleDeadLettersSub),
)
val exampleTopic: Topic[Any, Int] = Topic(
name = "basic_example",
schemaSetting = SchemaSettings(
encoding = Encoding.Binary,
schema = None,
),
serde = Serde.int,
)
val exampleDeadLettersTopic: Topic[Any, Int] =
exampleTopic.copy(name = s"${exampleTopic.name}__dead_letters")
val exampleSub: Subscription = Subscription(
topicName = exampleTopic.name,
name = "basic_example",
filter = None,
enableOrdering = false,
expiration = None,
deadLettersSettings = Some(DeadLettersSettings(exampleDeadLettersTopic.name, 5)),
)
val exampleDeadLettersSub: Subscription = exampleSub.copy(
topicName = exampleDeadLettersTopic.name,
name = s"${exampleSub.name}__dead_letters",
deadLettersSettings = None,
)
To run the example start Google Pub/Sub emulator with docker-compose unsing provided docker-compose.yaml
docker-compose up
Run examples with sbt:
# run to setup example topics + subscription
sbt '+examples/runMain ExamplesAdminSetup'
# run subscription
sbt '+examples/runMain BasicSubscription'
# run samples publisher
sbt '+examples/runMain SamplesPublisher'
# or choose in sbt which example to run
sbt '+examples/run'
Learn more on the ZIO Google Cloud Pub/Sub homepage!
If you have any question or problem feel free to open an issue or discussion.
People are expected to follow the Code of Conduct when discussing on the GitHub issues or PRs.
See the Code of Conduct
Open an issue or discussion on GitHub
Inspired by libraries like zio-kafka and fs2-pubsub to provide a similar experience.