Skip to content

AnyMindGroup/zio-pubsub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ZIO Google Cloud Pub/Sub

Maven Central Version

Google Cloud Pub/Sub client providing stream-based, declarative, high-level API with zio and zio-streams to help to concentrate on the business logic.

Modules

  • zio-pubsub Core components/interfaces/models
  • zio-pubsub-google Provides publisher, admin and StreamingPull API based subscriber client implementations using Google's Java library
  • zio-pubsub-serde-circe Provides Json Serializer/Deserializer using the circe codec
  • zio-pubsub-serde-vulcan Provides Avro schema Serializer/Deserializer using the vulcan codec

Alternative implementations and codecs may be added later.

Getting Started

To get started with sbt, add the following line to your build.sbt file to use the implementation with the Google Java library:

libraryDependencies += "com.anymindgroup" %% "zio-pubsub-google" % "0.2.2"

Usage examples

Create a stream for existing subscription:

import com.anymindgroup.pubsub.*, zio.*, zio.ZIO.*

object BasicSubscription extends ZIOAppDefault:
  def run = Subscriber
    .subscribe(subscriptionName = "basic_example", des = Serde.int)
    .mapZIO { (message, ackReply) =>
      for {
        _ <- logInfo(
               s"Received message" +
                 s" with id ${message.meta.messageId.value}" +
                 s" and data ${message.data}"
             )
        _ <- ackReply.ack()
      } yield ()
    }
    .runDrain
    .provide(googleSubscriber)

  // subscriber implementation
  private val googleSubscriber: TaskLayer[Subscriber] = {
    import com.anymindgroup.pubsub.google as G

    ZLayer.scoped(
      G.Subscriber.makeStreamingPullSubscriber(
        connection = G.PubsubConnectionConfig.Emulator(
          G.PubsubConnectionConfig.GcpProject("any"),
          "localhost:8085",
        )
      )
    )
  }

Publish random integer every 2 seconds

import com.anymindgroup.pubsub.*, zio.stream.*, zio.*, zio.ZIO.*

object SamplesPublisher extends ZIOAppDefault:
  def run = ZStream
    .repeatZIOWithSchedule(Random.nextInt, Schedule.fixed(2.seconds))
    .mapZIO { sample =>
      for {
        mId <- Publisher.publish[Any, Int](
                 PublishMessage(
                   data = sample,
                   attributes = Map.empty,
                   orderingKey = None,
                 )
               )
        _ <- logInfo(s"Published data $sample with message id ${mId.value}")
      } yield ()
    }
    .runDrain
    .provide(intPublisher)

  // int publisher implementation
  val intPublisher: TaskLayer[Publisher[Any, Int]] = {
    import com.anymindgroup.pubsub.google as G

    ZLayer.scoped(
      G.Publisher.make(
        config = G.PublisherConfig(
          connection = G.PubsubConnectionConfig.Emulator(
            G.PubsubConnectionConfig.GcpProject("any"),
            "localhost:8085",
          ),
          topicName = "basic_example",
          encoding = Encoding.Binary,
          enableOrdering = false,
        ),
        ser = Serde.int,
      )
    )
  }

Setup topics and subscription using the admin client:

import com.anymindgroup.pubsub.google.{PubsubAdmin, PubsubConnectionConfig}
import com.anymindgroup.pubsub.*
import zio.*

object ExamplesAdminSetup extends ZIOAppDefault:
  def run: Task[Unit] = PubsubAdmin.setup(
    connection = PubsubConnectionConfig.Emulator(
      PubsubConnectionConfig.GcpProject("any"),
      "localhost:8085",
    ),
    topics = List(exampleTopic, exampleDeadLettersTopic),
    subscriptions = List(exampleSub, exampleDeadLettersSub),
  )

  val exampleTopic: Topic[Any, Int] = Topic(
    name = "basic_example",
    schemaSetting = SchemaSettings(
      encoding = Encoding.Binary,
      schema = None,
    ),
    serde = Serde.int,
  )

  val exampleDeadLettersTopic: Topic[Any, Int] =
    exampleTopic.copy(name = s"${exampleTopic.name}__dead_letters")

  val exampleSub: Subscription = Subscription(
    topicName = exampleTopic.name,
    name = "basic_example",
    filter = None,
    enableOrdering = false,
    expiration = None,
    deadLettersSettings = Some(DeadLettersSettings(exampleDeadLettersTopic.name, 5)),
  )

  val exampleDeadLettersSub: Subscription = exampleSub.copy(
    topicName = exampleDeadLettersTopic.name,
    name = s"${exampleSub.name}__dead_letters",
    deadLettersSettings = None,
  )

To run the example start Google Pub/Sub emulator with docker-compose unsing provided docker-compose.yaml

docker-compose up

Run examples with sbt:

# run to setup example topics + subscription
sbt '+examples/runMain ExamplesAdminSetup'

# run subscription
sbt '+examples/runMain BasicSubscription'

# run samples publisher
sbt '+examples/runMain SamplesPublisher'

# or choose in sbt which example to run
sbt '+examples/run'

Documentation

Learn more on the ZIO Google Cloud Pub/Sub homepage!

Contributing

If you have any question or problem feel free to open an issue or discussion.

People are expected to follow the Code of Conduct when discussing on the GitHub issues or PRs.

Code of Conduct

See the Code of Conduct

Support

Open an issue or discussion on GitHub

Credits

Inspired by libraries like zio-kafka and fs2-pubsub to provide a similar experience.

License

License