diff --git a/.scalafix.conf b/.scalafix.conf index b363c6f..83dcbef 100644 --- a/.scalafix.conf +++ b/.scalafix.conf @@ -1,13 +1,9 @@ rules = [ DisableSyntax - ExplicitResultTypes LeakingImplicitClassVal NoAutoTupling NoValInForComprehension OrganizeImports - ProcedureSyntax - RemoveUnused - MissingFinal ] OrganizeImports { @@ -23,10 +19,6 @@ OrganizeImports { ] } -RemoveUnused { - imports = false # handled by OrganizeImports -} - DisableSyntax.noThrows = true DisableSyntax.noNulls = true DisableSyntax.noReturns = true diff --git a/.scalafix_test.conf b/.scalafix_test.conf index b64b339..fb05c7f 100644 --- a/.scalafix_test.conf +++ b/.scalafix_test.conf @@ -1,23 +1,11 @@ rules = [ - Disable DisableSyntax - ExplicitResultTypes LeakingImplicitClassVal NoAutoTupling NoValInForComprehension OrganizeImports - ProcedureSyntax - RemoveUnused - MissingFinal ] -Disable { - ifSynthetic = [ - "scala/Option.option2Iterable" - "scala/Predef.any2stringadd" - ] -} - OrganizeImports { removeUnused = true targetDialect = Scala3 @@ -31,10 +19,8 @@ OrganizeImports { ] } -RemoveUnused { - imports = false # handled by OrganizeImports -} - +DisableSyntax.noThrows = false +DisableSyntax.noNulls = false DisableSyntax.noReturns = true DisableSyntax.noXml = true DisableSyntax.noFinalize = true diff --git a/build.sbt b/build.sbt index c7c3d10..f203c77 100644 --- a/build.sbt +++ b/build.sbt @@ -1,11 +1,15 @@ -import zio.sbt.githubactions.{Job, Step, Condition, ActionRef} +import zio.sbt.githubactions.{ActionRef, Condition, Job, Step} import _root_.io.circe.Json + +import scala.annotation.tailrec enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) lazy val _scala2 = "2.13.15" lazy val _scala3 = "3.3.4" +lazy val zioGcpVersion = "0.0.3+9-e55f796a-SNAPSHOT" + inThisBuild( List( name := "ZIO Google Cloud Pub/Sub", @@ -27,11 +31,11 @@ inThisBuild( url = url("https://github.com/qhquanghuy"), ), ), - zioVersion := "2.1.11", + zioVersion := "2.1.13", scala213 := _scala2, scala3 := _scala3, - scalaVersion := _scala2, - crossScalaVersions := Seq(_scala2, _scala3), + scalaVersion := _scala3, + crossScalaVersions := Seq(_scala3), versionScheme := Some("early-semver"), ciEnabledBranches := Seq("master"), ciJvmOptions ++= Seq("-Xms2G", "-Xmx2G", "-Xss4M", "-XX:+UseG1GC"), @@ -116,9 +120,6 @@ inThisBuild( }, scalafmt := true, scalafmtSbtCheck := true, - scalafixDependencies ++= List( - "com.github.vovapolu" %% "scaluzzi" % "0.1.23" - ), ) ) @@ -149,8 +150,12 @@ lazy val ciGenerateGithubWorkflowV2 = Def.task { lazy val commonSettings = List( libraryDependencies ++= { CrossVersion.partialVersion(scalaVersion.value) match { - case Some((2, _)) => Seq(compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1")) - case _ => Seq() + case Some((2, _)) => + Seq( + compilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1"), + compilerPlugin("org.typelevel" %% "kind-projector" % "0.13.3" cross CrossVersion.full), + ) + case _ => Seq() } }, javacOptions ++= Seq("-source", "17"), @@ -163,7 +168,9 @@ lazy val commonSettings = List( Compile / scalacOptions --= sys.env.get("CI").fold(Seq("-Xfatal-warnings"))(_ => Nil), Test / scalafixConfig := Some(new File(".scalafix_test.conf")), Test / scalacOptions --= Seq("-Xfatal-warnings"), -) ++ scalafixSettings + semanticdbEnabled := true, + semanticdbVersion := scalafixSemanticdb.revision, // use Scalafix compatible version +) val noPublishSettings = List( publish := {}, @@ -177,9 +184,12 @@ lazy val root = .aggregate( zioPubsub.jvm, zioPubsub.native, + zioPubsubHttp.jvm, + zioPubsubHttp.native, zioPubsubGoogle, zioPubsubGoogleTest, - zioPubsubTestkit, + zioPubsubTestkit.jvm, + zioPubsubTestkit.native, zioPubsubSerdeCirce.jvm, zioPubsubSerdeCirce.native, zioPubsubSerdeVulcan, @@ -206,6 +216,28 @@ lazy val zioPubsub = crossProject(JVMPlatform, NativePlatform) ) ) +lazy val zioPubsubHttp = crossProject(JVMPlatform, NativePlatform) + .in(file("zio-pubsub-http")) + .settings(moduleName := "zio-pubsub-http") + .dependsOn(zioPubsub) + .settings(commonSettings) + .settings( + libraryDependencies ++= Seq( + "com.anymindgroup" %%% "zio-gcp-auth" % zioGcpVersion, + "com.anymindgroup" %%% "zio-gcp-pubsub-v1" % zioGcpVersion, + ), + // temprorary until zio-gcp is published + credentials += { + for { + username <- sys.env.get("ARTIFACT_REGISTRY_USERNAME") + apiKey <- sys.env.get("ARTIFACT_REGISTRY_PASSWORD") + } yield Credentials("https://asia-maven.pkg.dev", "asia-maven.pkg.dev", username, apiKey) + }.getOrElse(Credentials(Path.userHome / ".ivy2" / ".credentials")), + resolvers ++= Seq( + "AnyChat Maven" at "https://asia-maven.pkg.dev/anychat-staging/maven" + ), + ) + val vulcanVersion = "1.11.1" lazy val zioPubsubSerdeVulcan = (project in file("zio-pubsub-serde-vulcan")) .settings(moduleName := "zio-pubsub-serde-vulcan") @@ -247,7 +279,7 @@ lazy val zioPubsubGoogle = (project in file("zio-pubsub-google")) lazy val zioPubsubGoogleTest = project .in(file("zio-pubsub-google-test")) - .dependsOn(zioPubsub.jvm, zioPubsubGoogle, zioPubsubTestkit, zioPubsubSerdeCirce.jvm, zioPubsubSerdeVulcan) + .dependsOn(zioPubsub.jvm, zioPubsubGoogle, zioPubsubTestkit.jvm, zioPubsubSerdeCirce.jvm, zioPubsubSerdeVulcan) .settings(moduleName := "zio-pubsub-google-test") .settings(commonSettings) .settings(noPublishSettings) @@ -258,10 +290,10 @@ lazy val zioPubsubGoogleTest = project (Test / fork) := true, ) -// TODO remove dependency on zioPubsubGoogle lazy val zioPubsubTestkit = - (project in file("zio-pubsub-testkit")) - .dependsOn(zioPubsub.jvm, zioPubsubGoogle) + crossProject(JVMPlatform, NativePlatform) + .in(file("zio-pubsub-testkit")) + .dependsOn(zioPubsub, zioPubsubHttp) .settings(moduleName := "zio-pubsub-testkit") .settings(commonSettings) .settings( @@ -274,7 +306,7 @@ lazy val zioPubsubTestkit = lazy val zioPubsubTest = crossProject(JVMPlatform, NativePlatform) .in(file("zio-pubsub-test")) - .dependsOn(zioPubsub, zioPubsubSerdeCirce) + .dependsOn(zioPubsub, zioPubsubSerdeCirce, zioPubsubHttp, zioPubsubTestkit) .settings(moduleName := "zio-pubsub-test") .settings(commonSettings) .settings(noPublishSettings) @@ -282,18 +314,18 @@ lazy val zioPubsubTest = .jvmSettings(coverageEnabled := true) .nativeSettings(coverageEnabled := false) -lazy val examples = (project in file("examples")) - .dependsOn(zioPubsubGoogle) - .settings(noPublishSettings) - .settings( - scalaVersion := _scala3, - crossScalaVersions := Seq(_scala3), - coverageEnabled := false, - fork := true, - libraryDependencies ++= Seq( - "dev.zio" %% "zio-json" % "0.7.1" - ), - ) +//lazy val examples = (project in file("examples")) +// .dependsOn(zioPubsubGoogle) +// .settings(noPublishSettings) +// .settings( +// scalaVersion := _scala3, +// crossScalaVersions := Seq(_scala3), +// coverageEnabled := false, +// fork := true, +// libraryDependencies ++= Seq( +// "dev.zio" %% "zio-json" % "0.7.1" +// ), +// ) lazy val testDeps = Seq( libraryDependencies ++= Seq( diff --git a/docker-compose.yaml b/docker-compose.yaml index 25b3c0a..4aa0355 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,7 +1,7 @@ services: pubsub: # https://console.cloud.google.com/gcr/images/google.com:cloudsdktool/GLOBAL/cloud-sdk - image: gcr.io/google.com/cloudsdktool/cloud-sdk:480.0.0-emulators + image: gcr.io/google.com/cloudsdktool/cloud-sdk:498.0.0-emulators ports: - "8085:8085" command: gcloud beta emulators pubsub start --project=any --host-port=0.0.0.0:8085 diff --git a/examples/src/main/scala/BasicSubscription.scala b/examples/src/main/scala/BasicSubscription.scala index 13b23d8..fa32a76 100644 --- a/examples/src/main/scala/BasicSubscription.scala +++ b/examples/src/main/scala/BasicSubscription.scala @@ -22,8 +22,8 @@ object BasicSubscription extends ZIOAppDefault: ZLayer.scoped( G.Subscriber.makeStreamingPullSubscriber( - connection = G.PubsubConnectionConfig.Emulator( - G.PubsubConnectionConfig.GcpProject("any"), + connection = PubsubConnectionConfig.Emulator( + PubsubConnectionConfig.GcpProject("any"), "localhost:8085", ) ) diff --git a/examples/src/main/scala/ExamplesAdminSetup.scala b/examples/src/main/scala/ExamplesAdminSetup.scala index 98b1508..cadfbd8 100644 --- a/examples/src/main/scala/ExamplesAdminSetup.scala +++ b/examples/src/main/scala/ExamplesAdminSetup.scala @@ -1,4 +1,4 @@ -import com.anymindgroup.pubsub.google.{PubsubAdmin, PubsubConnectionConfig} +import com.anymindgroup.pubsub.google.PubsubAdmin import com.anymindgroup.pubsub.* import zio.* diff --git a/examples/src/main/scala/PubAndSubAndAdminExample.scala b/examples/src/main/scala/PubAndSubAndAdminExample.scala index fd55bb5..cf63457 100644 --- a/examples/src/main/scala/PubAndSubAndAdminExample.scala +++ b/examples/src/main/scala/PubAndSubAndAdminExample.scala @@ -60,9 +60,9 @@ object PubAndSubAndAdminExample extends ZIOAppDefault: } yield () } - val pubsubConnection: G.PubsubConnectionConfig = - G.PubsubConnectionConfig.Emulator( - project = G.PubsubConnectionConfig.GcpProject("any"), + val pubsubConnection: PubsubConnectionConfig = + PubsubConnectionConfig.Emulator( + project = PubsubConnectionConfig.GcpProject("any"), host = "localhost:8085", ) diff --git a/examples/src/main/scala/SamplesPublisher.scala b/examples/src/main/scala/SamplesPublisher.scala index 463360d..581c64e 100644 --- a/examples/src/main/scala/SamplesPublisher.scala +++ b/examples/src/main/scala/SamplesPublisher.scala @@ -25,8 +25,8 @@ object SamplesPublisher extends ZIOAppDefault: ZLayer.scoped( G.Publisher.make( config = G.PublisherConfig( - connection = G.PubsubConnectionConfig.Emulator( - G.PubsubConnectionConfig.GcpProject("any"), + connection = PubsubConnectionConfig.Emulator( + PubsubConnectionConfig.GcpProject("any"), "localhost:8085", ), topicName = "basic_example", diff --git a/project/build.properties b/project/build.properties index cbf878e..7a159c4 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.10.3 +sbt.version = 1.10.6 diff --git a/project/plugins.sbt b/project/plugins.sbt index 9a6cb5d..26af2b3 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -7,7 +7,7 @@ addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion exclude ("or addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.12.2") addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) -addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.5") +addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.6") addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2") diff --git a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala similarity index 95% rename from zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala rename to zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala index c1e9d49..120b4d0 100644 --- a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala +++ b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/AvroPublisherSpec.scala @@ -1,7 +1,7 @@ package com.anymindgroup.pubsub.google +import com.anymindgroup.pubsub.PubsubTestSupport.* import com.anymindgroup.pubsub.google -import com.anymindgroup.pubsub.google.PubsubTestSupport.* import com.anymindgroup.pubsub.google.TestSupport.* import com.anymindgroup.pubsub.model.* import com.anymindgroup.pubsub.pub.* @@ -39,7 +39,7 @@ object AvroPublisherSpec extends ZIOSpecDefault { SchemaSettings( schema = Some( SchemaRegistry( - id = s"${topicName.getTopic()}_v1", + id = s"${topicName.topic}_v1", schemaType = SchemaType.Avro, definition = ZIO.succeed(TestEvent.avroCodecSchema), ) @@ -47,14 +47,14 @@ object AvroPublisherSpec extends ZIOSpecDefault { encoding = encoding, ) topic = Topic[Any, TestEvent]( - topicName.getTopic(), + topicName.topic, schema, VulcanSerde.fromAvroCodec(TestEvent.avroCodec, encoding), ) publisherConfig = PublisherConfig.forTopic(conn, topic, enableOrdering = true) subscription = Subscription( topicName = topic.name, - name = subscriptionName.getSubscription(), + name = subscriptionName.subscription, filter = None, enableOrdering = true, expiration = None, @@ -91,6 +91,7 @@ object AvroPublisherSpec extends ZIOSpecDefault { ) publishedOrderingKeys = testMessages.map(_.orderingKey) consumedOrderingKeys = consumed.map(_.meta.orderingKey) + _ <- assert(publishedOrderingKeys)(hasSameElements(consumedOrderingKeys)) _ <- assert(consumedAttr)(hasSameElements(publishedAttrs)) } yield assert(publishedOrderingKeys)(hasSameElements(consumedOrderingKeys)) }) :: diff --git a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/PubsubAdminSpec.scala b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/PubsubAdminSpec.scala similarity index 82% rename from zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/PubsubAdminSpec.scala rename to zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/PubsubAdminSpec.scala index 2b013b0..66574c3 100644 --- a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/PubsubAdminSpec.scala +++ b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/PubsubAdminSpec.scala @@ -1,8 +1,9 @@ package com.anymindgroup.pubsub.google +import scala.concurrent.duration.Duration import scala.util.Try -import com.anymindgroup.pubsub.google.PubsubTestSupport.* +import com.anymindgroup.pubsub.PubsubTestSupport.* import com.anymindgroup.pubsub.google.TestSupport.* import com.anymindgroup.pubsub.model.{SchemaRegistry, SchemaType, *} import com.anymindgroup.pubsub.serde.{CirceSerde, VulcanSerde} @@ -31,7 +32,7 @@ object PubsubAdminSpec extends ZIOSpecDefault { case SchemaSettings(Encoding.Json, None) => CirceSerde.fromCirceCodec(TestEvent.jsonCodec) } topicName <- topicNameGen - } yield Topic(topicName.getTopic(), schemaSetting, serde) + } yield Topic(topicName.topic, schemaSetting, serde) override def spec: Spec[TestEnvironment & Scope, Any] = suite("PubsubAdminSpec")( test("crating a subscriptions for non existing topic fails") { @@ -49,16 +50,17 @@ object PubsubAdminSpec extends ZIOSpecDefault { _ <- PubsubAdmin.setup(connection, topics.map(_._1), subscriptions.flatten) _ <- ZIO.foreachDiscard(subscriptions.flatten) { subscription => for { - maybePubsubSub <- findSubscription(subscription.name) + maybePubsubSub <- findSubscription(SubscriptionName(connection.project.name, subscription.name)) _ <- assert(maybePubsubSub)(isSome) pubsubSub = maybePubsubSub.get - pubsubSubName = pubsubSub.getName().split("/").last + pubsubSubName = pubsubSub.name.split("/").last _ <- assert(pubsubSubName)(equalTo(subscription.name)) - _ <- assert(pubsubSub.getEnableMessageOrdering())(equalTo(subscription.enableOrdering)) - _ <- assert(pubsubSub.getFilter())(equalTo(subscription.filter.map(_.value).getOrElse(""))) - _ <- assert(pubsubSub.getExpirationPolicy().getTtl().getSeconds())( - equalTo(subscription.expiration.map(_.getSeconds()).getOrElse(0L)) - ) + _ <- assert(pubsubSub.enableMessageOrdering.getOrElse(false))(equalTo(subscription.enableOrdering)) + _ <- assert(pubsubSub.filter.getOrElse(""))(equalTo(subscription.filter.map(_.value).getOrElse(""))) + _ <- + assert(pubsubSub.expirationPolicy.flatMap(_.ttl.map(Duration(_).toSeconds)))( + equalTo(subscription.expiration.map(_.toSeconds())) + ) } yield assertTrue(true) } exit <- PubsubAdmin.setup(connection, topics.map(_._1), subscriptions.flatten).exit @@ -90,6 +92,6 @@ object PubsubAdminSpec extends ZIOSpecDefault { }, ).provideSomeShared[Scope]( emulatorConnectionConfigLayer(), - SubscriptionAdmin.layer, + emulatorBackendLayer, ) @@ TestAspect.nondeterministic } diff --git a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala similarity index 100% rename from zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala rename to zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriberSpec.scala diff --git a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/SubscriberSpec.scala b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/SubscriberSpec.scala similarity index 98% rename from zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/SubscriberSpec.scala rename to zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/SubscriberSpec.scala index cffb2f4..648a1e3 100644 --- a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/SubscriberSpec.scala +++ b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/SubscriberSpec.scala @@ -4,9 +4,9 @@ import java.time.Instant import scala.jdk.CollectionConverters.* -import com.anymindgroup.pubsub.google.PubsubConnectionConfig.GcpProject -import com.anymindgroup.pubsub.google.PubsubTestSupport.* +import com.anymindgroup.pubsub.PubsubTestSupport.* import com.anymindgroup.pubsub.model.* +import com.anymindgroup.pubsub.model.PubsubConnectionConfig.GcpProject import com.anymindgroup.pubsub.serde.VulcanSerde import com.anymindgroup.pubsub.sub.{AckId, DeadLettersSettings, SubscriberFilter, Subscription} import com.google.api.gax.rpc.NotFoundException @@ -192,7 +192,7 @@ object SubscriberSpec extends ZIOSpecDefault { def createRandomTopic: RIO[PubsubConnectionConfig.Emulator & Scope, Topic[Any, Int]] = topicNameGen.runHead .map(_.get) - .map(_.getTopic()) + .map(_.topic) .map(topicName => Topic( topicName, diff --git a/zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/TestSupport.scala b/zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/TestSupport.scala similarity index 100% rename from zio-pubsub-google-test/zio-gc-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/TestSupport.scala rename to zio-pubsub-google-test/src/test/scala/com/anymindgroup/pubsub/google/TestSupport.scala diff --git a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubsubConnectionConfig.scala b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Emulator.scala similarity index 65% rename from zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubsubConnectionConfig.scala rename to zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Emulator.scala index 73c20a3..3b06c11 100644 --- a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubsubConnectionConfig.scala +++ b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Emulator.scala @@ -2,6 +2,7 @@ package com.anymindgroup.pubsub.google import java.util.concurrent.TimeUnit +import com.anymindgroup.pubsub.model.PubsubConnectionConfig import com.google.api.gax.core.{CredentialsProvider, NoCredentialsProvider} import com.google.api.gax.grpc.GrpcTransportChannel import com.google.api.gax.rpc.{FixedTransportChannelProvider, TransportChannelProvider} @@ -9,18 +10,13 @@ import io.grpc.{ManagedChannel, ManagedChannelBuilder} import zio.{RIO, Scope, ZIO} -sealed trait PubsubConnectionConfig { - def project: PubsubConnectionConfig.GcpProject - -} - -object PubsubConnectionConfig { - final case class Cloud(project: GcpProject) extends PubsubConnectionConfig - final case class Emulator(project: GcpProject, host: String) extends PubsubConnectionConfig - - def createEmulatorSettings(config: Emulator): RIO[Scope, (TransportChannelProvider, CredentialsProvider)] = for { +object Emulator { + def createEmulatorSettings( + config: PubsubConnectionConfig.Emulator + ): RIO[Scope, (TransportChannelProvider, CredentialsProvider)] = for { channel <- ZIO.acquireRelease(ZIO.attempt { - val channel: ManagedChannel = ManagedChannelBuilder.forTarget(config.host).usePlaintext().build + val channel: ManagedChannel = + ManagedChannelBuilder.forTarget(s"${config.host}:${config.port}").usePlaintext().build GrpcTransportChannel.create(channel) }) { channel => (for { @@ -35,8 +31,4 @@ object PubsubConnectionConfig { channelProvider <- ZIO.attempt(FixedTransportChannelProvider.create(channel)) credentialsProvider = NoCredentialsProvider.create } yield (channelProvider, credentialsProvider) - - final case class GcpProject(name: String) { - override def toString: String = name - } } diff --git a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubSubSchemaRegistryAdmin.scala b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubSubSchemaRegistryAdmin.scala index 64f85fc..28d44e7 100644 --- a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubSubSchemaRegistryAdmin.scala +++ b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/PubSubSchemaRegistryAdmin.scala @@ -2,8 +2,7 @@ package com.anymindgroup.pubsub.google import java.util.concurrent.TimeUnit -import com.anymindgroup.pubsub.google.PubsubConnectionConfig.Emulator -import com.anymindgroup.pubsub.model.{SchemaRegistry, SchemaType} +import com.anymindgroup.pubsub.model.{PubsubConnectionConfig, SchemaRegistry, SchemaType} import com.google.api.gax.rpc.NotFoundException import com.google.cloud.pubsub.v1.{SchemaServiceClient, SchemaServiceSettings} import com.google.pubsub.v1.{ProjectName, Schema as GSchema, SchemaName} @@ -15,9 +14,9 @@ object PubSubSchemaRegistryAdmin { private[pubsub] def makeClient(connection: PubsubConnectionConfig): RIO[Scope, SchemaServiceClient] = ZIO.acquireRelease( connection match { - case config: Emulator => + case config: PubsubConnectionConfig.Emulator => for { - (channelProvider, credentialsProvider) <- PubsubConnectionConfig.createEmulatorSettings(config) + (channelProvider, credentialsProvider) <- Emulator.createEmulatorSettings(config) s <- ZIO.attempt( SchemaServiceClient.create( SchemaServiceSettings diff --git a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Publisher.scala b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Publisher.scala index f3cf57c..c9b5e55 100644 --- a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Publisher.scala +++ b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/Publisher.scala @@ -4,7 +4,6 @@ import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters.* -import com.anymindgroup.pubsub.google.PubsubConnectionConfig.{Cloud, Emulator} import com.anymindgroup.pubsub.model.* import com.anymindgroup.pubsub.pub.* import com.anymindgroup.pubsub.serde.Serializer @@ -12,7 +11,7 @@ import com.google.cloud.pubsub.v1.Publisher as GPublisher import com.google.protobuf.ByteString import com.google.pubsub.v1.PubsubMessage as GPubsubMessage -import zio.{RIO, Scope, ZIO} +import zio.{NonEmptyChunk, RIO, Scope, ZIO} object Publisher { def make[R, E]( @@ -33,10 +32,10 @@ object Publisher { private[pubsub] def makeUnderlyingPublisher(config: PublisherConfig): RIO[Scope, GPublisher] = ZIO.acquireRelease { for { builder <- config.connection match { - case _: Cloud => ZIO.attempt(GPublisher.newBuilder(config.topicId)) - case emulator: Emulator => + case _: PubsubConnectionConfig.Cloud => ZIO.attempt(GPublisher.newBuilder(config.topicId)) + case emulator: PubsubConnectionConfig.Emulator => for { - (channelProvider, credentialsProvider) <- PubsubConnectionConfig.createEmulatorSettings(emulator) + (channelProvider, credentialsProvider) <- Emulator.createEmulatorSettings(emulator) p <- ZIO.attempt( GPublisher .newBuilder(config.topicId) @@ -66,6 +65,16 @@ object Publisher { } class GooglePublisher[R, E](publisher: GPublisher, serde: Serializer[R, E]) extends Publisher[R, E] { + + override def publish(messages: NonEmptyChunk[PublishMessage[E]]): RIO[R, NonEmptyChunk[MessageId]] = + ZIO + .foreach(messages) { message => + toPubsubMessage(message).map(publisher.publish(_)) + } + .flatMap { futures => + ZIO.foreachPar(futures)(f => ZIO.fromFutureJava(f).map(MessageId(_))) + } + override def publish(event: PublishMessage[E]): ZIO[R, Throwable, MessageId] = for { msg <- toPubsubMessage(event) diff --git a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala index d03d66f..1089015 100644 --- a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala +++ b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/StreamingPullSubscriber.scala @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit import scala.jdk.CollectionConverters.* +import com.anymindgroup.pubsub.model.PubsubConnectionConfig import com.anymindgroup.pubsub.sub.* import com.google.api.gax.rpc.{BidiStream as GBidiStream, ClientStream} import com.google.cloud.pubsub.v1.stub.{GrpcSubscriberStub, SubscriberStubSettings} @@ -31,7 +32,7 @@ private[pubsub] object StreamingPullSubscriber { ) ) case c: PubsubConnectionConfig.Emulator => - PubsubConnectionConfig.createEmulatorSettings(c).map { case (channelProvider, credentialsProvider) => + Emulator.createEmulatorSettings(c).map { case (channelProvider, credentialsProvider) => SubscriberStubSettings.newBuilder .setTransportChannelProvider(channelProvider) .setCredentialsProvider(credentialsProvider) diff --git a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/SubscriptionAdmin.scala b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/SubscriptionAdmin.scala index 4e32f04..95d098a 100644 --- a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/SubscriptionAdmin.scala +++ b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/SubscriptionAdmin.scala @@ -2,6 +2,7 @@ package com.anymindgroup.pubsub.google import java.util.concurrent.TimeUnit +import com.anymindgroup.pubsub.model.PubsubConnectionConfig import com.anymindgroup.pubsub.sub.{DeadLettersSettings, SubscriberFilter, Subscription} import com.google.api.gax.rpc.{AlreadyExistsException, NotFoundException} import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, SubscriptionAdminSettings} @@ -17,7 +18,7 @@ object SubscriptionAdmin { connection match { case config: PubsubConnectionConfig.Emulator => for { - (channelProvider, credentialsProvider) <- PubsubConnectionConfig.createEmulatorSettings(config) + (channelProvider, credentialsProvider) <- Emulator.createEmulatorSettings(config) s <- ZIO.attempt( SubscriptionAdminClient.create( SubscriptionAdminSettings diff --git a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/TopicAdmin.scala b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/TopicAdmin.scala index b4b88f4..97a61ac 100644 --- a/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/TopicAdmin.scala +++ b/zio-pubsub-google/src/main/scala/com/anymindgroup/pubsub/google/TopicAdmin.scala @@ -2,6 +2,7 @@ package com.anymindgroup.pubsub.google import java.util.concurrent.TimeUnit +import com.anymindgroup.pubsub.model.PubsubConnectionConfig import com.google.cloud.pubsub.v1.{TopicAdminClient, TopicAdminSettings} import zio.{RIO, RLayer, Scope, ZIO, ZLayer} @@ -12,7 +13,7 @@ object TopicAdmin { connection match { case config: PubsubConnectionConfig.Emulator => for { - (channelProvider, credentialsProvider) <- PubsubConnectionConfig.createEmulatorSettings(config) + (channelProvider, credentialsProvider) <- Emulator.createEmulatorSettings(config) s <- ZIO.attempt( TopicAdminClient.create( TopicAdminSettings diff --git a/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/EmulatorBackend.scala b/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/EmulatorBackend.scala new file mode 100644 index 0000000..8f97885 --- /dev/null +++ b/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/EmulatorBackend.scala @@ -0,0 +1,20 @@ +package com.anymindgroup.pubsub.http + +import com.anymindgroup.pubsub.model.PubsubConnectionConfig +import sttp.capabilities.Effect +import sttp.client4.{Backend, GenericRequest, Response} +import sttp.monad.MonadError + +import zio.Task + +object EmulatorBackend { + def apply(backend: Backend[Task], config: PubsubConnectionConfig.Emulator): Backend[Task] = + new Backend[Task] { + override def send[T](req: GenericRequest[T, Any & Effect[Task]]): Task[Response[T]] = + backend.send(req.method(req.method, req.uri.scheme("http").host(config.host).port(config.port))) + + override def close(): Task[Unit] = backend.close() + + override def monad: MonadError[Task] = backend.monad + } +} diff --git a/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/Publisher.scala b/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/Publisher.scala new file mode 100644 index 0000000..76b6242 --- /dev/null +++ b/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/Publisher.scala @@ -0,0 +1,84 @@ +package com.anymindgroup.pubsub.http + +import java.util.Base64 + +import com.anymindgroup.gcp.auth.{AccessToken, AuthedBackend, Token, TokenProvider} +import com.anymindgroup.gcp.pubsub.v1.resources.projects as p +import com.anymindgroup.gcp.pubsub.v1.schemas as s +import com.anymindgroup.pubsub.* +import com.anymindgroup.pubsub.model.{MessageId, PubsubConnectionConfig, TopicName} +import sttp.client4.Backend + +import zio.{Chunk, NonEmptyChunk, RIO, Task, ZIO} + +class HttpPublisher[R, E]( + serializer: Serializer[R, E], + backend: Backend[Task], + topic: TopicName, +) extends Publisher[R, E] { + + override def publish(events: NonEmptyChunk[PublishMessage[E]]): RIO[R, NonEmptyChunk[MessageId]] = + for { + request <- toRequestBody(events) + response <- request.send(backend) + ids <- ZIO.fromEither { + if (response.isSuccess) { + response.body.map(r => NonEmptyChunk.fromChunk(r.messageIds.getOrElse(Chunk.empty))) match { + case Right(Some(msgIds)) => Right(msgIds.map(MessageId(_))) + case Right(_) => Left(new Throwable("Missing id in response")) + case Left(err) => Left(new Throwable(err)) + } + } else Left(new Throwable(s"Failed with ${response.code} ${response.statusText}")) + } + } yield ids + + override def publish(event: PublishMessage[E]): ZIO[R, Throwable, MessageId] = + publish(NonEmptyChunk.single(event)).map(_.head) + + private def toRequestBody(events: NonEmptyChunk[PublishMessage[E]]) = for { + messages <- ZIO.foreach(events) { event => + for { + data <- serializer.serialize(event.data).map(Base64.getEncoder.encodeToString) + } yield s.PubsubMessage( + data = Some(data), + orderingKey = event.orderingKey.map(_.value), + attributes = if (event.attributes.nonEmpty) Some(event.attributes) else None, + ) + } + } yield p.Topics.publish( + projectsId = topic.projectId, + topicsId = topic.topic, + request = s.PublishRequest(messages), + ) +} + +object HttpPublisher { + def make[R, E]( + connection: PubsubConnectionConfig, + topic: String, + serializer: Serializer[R, E], + backend: Backend[Task], + tokenProvider: TokenProvider[Token], + ): HttpPublisher[R, E] = + connection match { + case PubsubConnectionConfig.Cloud(project) => + new HttpPublisher[R, E]( + serializer = serializer, + topic = TopicName(projectId = project.name, topic = topic), + backend = AuthedBackend(tokenProvider, backend), + ) + case config @ PubsubConnectionConfig.Emulator(project, _, _) => + new HttpPublisher[R, E]( + serializer = serializer, + topic = TopicName(projectId = project.name, topic = topic), + backend = EmulatorBackend(backend, config), + ) + } + + def make[R, E]( + connection: PubsubConnectionConfig, + topic: Topic[R, E], + backend: Backend[Task], + tokenProvider: TokenProvider[AccessToken], + ): HttpPublisher[R, E] = make(connection, topic.name, topic.serde, backend, tokenProvider) +} diff --git a/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/Subscriber.scala b/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/Subscriber.scala new file mode 100644 index 0000000..ef61a88 --- /dev/null +++ b/zio-pubsub-http/shared/src/main/scala/com/anymindgroup/pubsub/http/Subscriber.scala @@ -0,0 +1,166 @@ +package com.anymindgroup.pubsub.http + +import java.util.Base64 + +import com.anymindgroup.gcp.auth.{AuthedBackend, Token, TokenProvider} +import com.anymindgroup.gcp.pubsub.v1.resources.projects as p +import com.anymindgroup.gcp.pubsub.v1.schemas as s +import com.anymindgroup.gcp.pubsub.v1.schemas.PubsubMessage +import com.anymindgroup.pubsub.model.{MessageId, OrderingKey} +import com.anymindgroup.pubsub.sub.{AckId, RawReceipt, ReceivedMessage, Subscriber} +import com.anymindgroup.pubsub.{AckReply, PubsubConnectionConfig} +import sttp.client4.Backend + +import zio.stream.ZStream +import zio.{Cause, Chunk, NonEmptyChunk, Queue, Schedule, Scope, Task, UIO, ZIO} + +class HttpSubscriber private[http] ( + backend: Backend[Task], + projectId: String, + maxMessagesPerPull: Int, + ackQueue: Queue[(String, Boolean)], + retrySchedule: Schedule[Any, Throwable, ?], +) extends Subscriber { + private def processAckQueue(chunkSizeLimit: Option[Int], subscriptionId: String): UIO[Option[Cause[Throwable]]] = + chunkSizeLimit + .fold(ackQueue.takeAll)(ackQueue.takeBetween(1, _)) + .flatMap { c => + val (ackIds, nackIds) = c.partitionMap { + case (id, true) => Left(id) + case (id, false) => Right(id) + } + + ( + NonEmptyChunk.fromChunk(ackIds).map(sendAck(_, subscriptionId)), + NonEmptyChunk.fromChunk(nackIds).map(sendNack(_, subscriptionId)), + ) match { + case (Some(sendAckReq), Some(sendNackReq)) => + (sendAckReq <&> sendNackReq).map { + case (Some(c1), Some(c2)) => Some(c1 && c2) + case (c1, c2) => c1.orElse(c2) + } + case (Some(sendAckReq), _) => sendAckReq + case (_, Some(sendNackReq)) => sendNackReq + case _ => ZIO.none + } + } + + private def sendNack(nackIds: NonEmptyChunk[String], subscriptionId: String) = + p.Subscriptions + .modifyAckDeadline( + projectsId = projectId, + subscriptionsId = subscriptionId, + request = s.ModifyAckDeadlineRequest(nackIds, ackDeadlineSeconds = 0), + ) + .send(backend) + .uninterruptible + .as(None) + .catchAllCause(c => ackQueue.offerAll(nackIds.map((_, false))).as(Some(c))) + + private def sendAck(ackIds: NonEmptyChunk[String], subscriptionId: String) = + p.Subscriptions + .acknowledge( + projectsId = projectId, + subscriptionsId = subscriptionId, + request = s.AcknowledgeRequest(ackIds), + ) + .send(backend) + .uninterruptible + .as(None) + .catchAllCause(c => ackQueue.offerAll(ackIds.map((_, true))).as(Some(c))) + + private[http] def pull( + subscriptionName: String, + returnImmediately: Option[Boolean] = None, + ): ZIO[Any, Throwable, Chunk[(ReceivedMessage[Array[Byte]], AckReply)]] = + p.Subscriptions + .pull( + projectsId = projectId, + subscriptionsId = subscriptionName, + request = s.PullRequest(maxMessages = maxMessagesPerPull, returnImmediately), + ) + .send(backend) + .flatMap { res => + res.body match { + case Left(value) => ZIO.fail(new Throwable(value)) + case Right(value) => + ZIO.succeed( + value.receivedMessages + .getOrElse(Chunk.empty) + .collect { + case s.ReceivedMessage( + Some(ackId), + Some(PubsubMessage(data, attrs, Some(mId), Some(ts), orderingKey)), + deliveryAttempt, + ) => + ( + ReceivedMessage( + meta = ReceivedMessage.Metadata( + messageId = MessageId(mId), + ackId = AckId(ackId), + publishTime = ts.toInstant(), + orderingKey = orderingKey.flatMap(OrderingKey.fromString(_)), + attributes = attrs.getOrElse(Map.empty), + deliveryAttempt = deliveryAttempt.getOrElse(0), + ), + data = data match { + case None => Array.empty[Byte] + case Some(value) => Base64.getDecoder().decode(value) + }, + ), + new AckReply { + override def ack(): UIO[Unit] = + ackQueue.offer((ackId, true)).unit + override def nack(): UIO[Unit] = + ackQueue.offer((ackId, false)).unit + }, + ) + } + ) + } + } + + override def subscribeRaw(subscriptionName: String): ZStream[Any, Throwable, RawReceipt] = { + val pullStream = ZStream.repeatZIOChunk(pull(subscriptionName)) + + val ackStream: ZStream[Any, Throwable, Unit] = ZStream + .unfoldZIO(())(_ => + processAckQueue(Some(1024), subscriptionName).flatMap { + case None => ZIO.some(((), ())) + case Some(c) => ZIO.failCause(c) + } + ) + + pullStream.drainFork(ackStream).onError(_ => processAckQueue(None, subscriptionName)).retry(retrySchedule) + } +} + +object HttpSubscriber { + def make[R, E]( + connection: PubsubConnectionConfig, + backend: Backend[Task], + tokenProvider: TokenProvider[Token], + maxMessagesPerPull: Int = 100, + retrySchedule: Schedule[Any, Throwable, ?] = Schedule.recurs(5), + ): ZIO[Scope, Nothing, HttpSubscriber] = + for { + ackQueue <- ZIO.acquireRelease(Queue.unbounded[(String, Boolean)])(_.shutdown) + } yield connection match { + case PubsubConnectionConfig.Cloud(project) => + new HttpSubscriber( + projectId = project.name, + backend = AuthedBackend(tokenProvider, backend), + maxMessagesPerPull = maxMessagesPerPull, + ackQueue = ackQueue, + retrySchedule = retrySchedule, + ) + case config @ PubsubConnectionConfig.Emulator(project, _, _) => + new HttpSubscriber( + projectId = project.name, + backend = EmulatorBackend(backend, config), + maxMessagesPerPull = maxMessagesPerPull, + ackQueue = ackQueue, + retrySchedule = retrySchedule, + ) + } +} diff --git a/zio-pubsub-test/shared/src/test/scala/com/anymindgroup/pubsub/http/PubAndSubSpec.scala b/zio-pubsub-test/shared/src/test/scala/com/anymindgroup/pubsub/http/PubAndSubSpec.scala new file mode 100644 index 0000000..1de4802 --- /dev/null +++ b/zio-pubsub-test/shared/src/test/scala/com/anymindgroup/pubsub/http/PubAndSubSpec.scala @@ -0,0 +1,64 @@ +package com.anymindgroup.pubsub.http + +import scala.util.Random + +import com.anymindgroup.gcp.auth.TokenProvider +import com.anymindgroup.http.httpBackendScoped +import com.anymindgroup.pubsub.PubsubTestSupport.* +import com.anymindgroup.pubsub.model.PubsubConnectionConfig.GcpProject +import com.anymindgroup.pubsub.model.{SubscriptionName, TopicName} +import com.anymindgroup.pubsub.pub.PublishMessage +import com.anymindgroup.pubsub.{Publisher, PubsubConnectionConfig, Serde} +import sttp.client4.Backend + +import zio.test.* +import zio.test.Assertion.* +import zio.{Chunk, Task, ZIO, ZLayer} + +object PubAndSubSpec extends ZIOSpecDefault { + val connection = PubsubConnectionConfig.Emulator(GcpProject("any"), "localhost", 8085) + val testTopic = TopicName("any", s"topic_${Random.alphanumeric.take(10).mkString}") + val testSub = SubscriptionName("any", s"sub_${Random.alphanumeric.take(10).mkString}") + + val pubLayer: ZLayer[Any, Throwable, Publisher[Any, String]] = ZLayer.scoped { + for { + backend <- httpBackendScoped() + publisher = HttpPublisher.make[Any, String]( + connection = connection, + topic = testTopic.topic, + serializer = Serde.utf8String, + backend = backend, + tokenProvider = TokenProvider.noTokenProvider, + ) + } yield publisher + } + + val subLayer: ZLayer[Any, Throwable, HttpSubscriber] = ZLayer.scoped { + for { + backend <- httpBackendScoped() + subscriber <- HttpSubscriber.make[Any, String]( + connection = connection, + backend = backend, + tokenProvider = TokenProvider.noTokenProvider, + ) + } yield subscriber + } + + override def spec: Spec[Any, Any] = suite("PubAndSubSpec")( + test("Publish and consume") { + for { + _ <- createTopicWithSubscription(testTopic, testSub).provide(emulatorBackendLayer(connection)) + p <- ZIO.service[Publisher[Any, String]] + s <- ZIO.service[HttpSubscriber] + samples = Chunk.fromIterable((1 to 200).map(i => s"message $i")) + _ <- ZIO.foreachParDiscard(samples)(sample => p.publish(PublishMessage(sample, None, Map.empty))) + collected <- s.subscribe(testSub.subscription, Serde.utf8String) + .take(samples.length.toLong) + .mapZIO { case (m, ack) => ack.ack().as(m) } + .runCollect + _ <- assert(samples)(hasSameElements(collected.map(_.data))) + _ <- assertZIO(s.pull(testSub.subscription, Some(true)))(isEmpty) // all messages were acknowlegded + } yield assertCompletes + } + ).provide(pubLayer ++ subLayer) +} diff --git a/zio-pubsub-testkit/shared/src/main/scala/com/anymindgroup/pubsub/PubsubTestSupport.scala b/zio-pubsub-testkit/shared/src/main/scala/com/anymindgroup/pubsub/PubsubTestSupport.scala new file mode 100644 index 0000000..46a4cc9 --- /dev/null +++ b/zio-pubsub-testkit/shared/src/main/scala/com/anymindgroup/pubsub/PubsubTestSupport.scala @@ -0,0 +1,165 @@ +package com.anymindgroup.pubsub + +import java.util.Base64 + +import com.anymindgroup.gcp.pubsub.v1.resources.projects as p +import com.anymindgroup.gcp.pubsub.v1.schemas as s +import com.anymindgroup.http.httpBackendLayer +import com.anymindgroup.pubsub.http.EmulatorBackend +import com.anymindgroup.pubsub.model.* +import com.anymindgroup.pubsub.model.PubsubConnectionConfig.GcpProject +import com.anymindgroup.pubsub.sub.* +import sttp.client4.{Backend, GenericBackend} + +import zio.test.Gen +import zio.{Chunk, RIO, Task, ZIO, ZLayer, durationInt} + +object PubsubTestSupport { + def emulatorConnectionConfig( + project: GcpProject = sys.env.get("PUBSUB_EMULATOR_GCP_PROJECT").map(GcpProject(_)).getOrElse(GcpProject("any")), + host: String = sys.env.get("PUBSUB_EMULATOR_HOST").getOrElse("localhost"), + port: Int = sys.env.get("PUBSUB_EMULATOR_PORT").flatMap(_.toIntOption).getOrElse(8085), + ): PubsubConnectionConfig.Emulator = + PubsubConnectionConfig.Emulator(project, host, port) + + def emulatorConnectionConfigLayer( + config: PubsubConnectionConfig.Emulator = emulatorConnectionConfig() + ): ZLayer[Any, Nothing, PubsubConnectionConfig.Emulator & GcpProject] = + ZLayer.succeed(config) ++ ZLayer.succeed(config.project) + + def emulatorBackendLayer: ZLayer[PubsubConnectionConfig.Emulator, Throwable, Backend[Task]] = + httpBackendLayer() >>> ZLayer.fromFunction(EmulatorBackend(_, _)) + + def emulatorBackendLayer( + config: PubsubConnectionConfig.Emulator + ): ZLayer[Any, Throwable, Backend[Task]] = + ZLayer.succeed(config) >>> emulatorBackendLayer + + def createTopicWithSubscription( + topicName: TopicName, + subscriptionName: SubscriptionName, + ): RIO[GenericBackend[Task, Any], Unit] = + createTopic(topicName) *> createSubscription(topicName, subscriptionName) + + def createSubscription( + topicName: TopicName, + subscriptionName: SubscriptionName, + ): RIO[GenericBackend[Task, Any], Unit] = + ZIO + .serviceWithZIO[GenericBackend[Task, Any]]( + _.send( + p.Subscriptions.create( + projectsId = topicName.projectId, + subscriptionsId = subscriptionName.subscription, + request = s.Subscription( + name = subscriptionName.path, + topic = topicName.path, + ), + ) + ).flatMap { res => + if (res.isSuccess) ZIO.unit else ZIO.fail(new Throwable(s"Failed to create subscription $res")) + } + ) + .unit + + def createTopic(topicName: TopicName): RIO[GenericBackend[Task, Any], Unit] = + ZIO + .serviceWithZIO[GenericBackend[Task, Any]]( + _.send( + p.Topics.create( + projectsId = topicName.projectId, + topicsId = topicName.topic, + request = s.Topic(name = topicName.path), + ) + ).flatMap { res => + if (res.isSuccess) ZIO.unit + else ZIO.fail(new Throwable(s"Failed to create topic $res")) + } + ) + .unit + + def topicExists(topicName: TopicName): RIO[GenericBackend[Task, Any], Boolean] = for { + topicAdmin <- ZIO.service[GenericBackend[Task, Any]] + res <- topicAdmin.send(p.Topics.get(projectsId = topicName.projectId, topicsId = topicName.topic)) + } yield res.body.isRight + + def publishEvent[E]( + event: E, + topicName: TopicName, + encode: E => Array[Byte] = (e: E) => e.toString.getBytes, + ): RIO[GenericBackend[Task, Any], Seq[String]] = + publishEvents(Seq(event), topicName, encode) + + def publishEvents[E]( + events: Seq[E], + topicName: TopicName, + encode: E => Array[Byte] = (e: E) => e.toString.getBytes, + ): RIO[GenericBackend[Task, Any], Seq[String]] = + ZIO.serviceWithZIO[GenericBackend[Task, Any]]( + _.send( + p.Topics + .publish( + projectsId = topicName.projectId, + topicsId = topicName.topic, + request = s.PublishRequest( + Chunk.fromIterable( + events + .map(encode) + .map(Base64.getEncoder.encodeToString) + .map(data => s.PubsubMessage(data = Some(data))) + ) + ), + ) + ).flatMap { res => + res.body match { + case Left(value) => ZIO.fail(new Throwable(value)) + case Right(value) => ZIO.succeed(value.messageIds.toList.flatten) + } + } + ) + + val topicNameGen: Gen[PubsubConnectionConfig.Emulator, TopicName] = for { + connection <- Gen.fromZIO(ZIO.service[PubsubConnectionConfig.Emulator]) + topic <- Gen.alphaNumericStringBounded(10, 10).map("topic_" + _) + } yield TopicName(projectId = connection.project.name, topic = topic) + + val subscriptionNameGen: Gen[PubsubConnectionConfig.Emulator, SubscriptionName] = for { + connection <- Gen.fromZIO(ZIO.service[PubsubConnectionConfig.Emulator]) + subscriptionId <- Gen.alphaNumericStringBounded(10, 10).map("sub_" + _) + } yield SubscriptionName(connection.project.name, subscriptionId) + + val topicWithSubscriptionGen: Gen[PubsubConnectionConfig.Emulator, (TopicName, SubscriptionName)] = for { + topicName <- topicNameGen + subscriptionName <- subscriptionNameGen + } yield (topicName, subscriptionName) + + def someTopicWithSubscriptionName: ZIO[PubsubConnectionConfig.Emulator, Nothing, (TopicName, SubscriptionName)] = + topicWithSubscriptionGen.runHead.map(_.get) + + def findSubscription( + subscription: SubscriptionName + ): RIO[GenericBackend[Task, Any], Option[s.Subscription]] = for { + client <- ZIO.service[GenericBackend[Task, Any]] + result <- + client.send(p.Subscriptions.get(projectsId = subscription.projectId, subscriptionsId = subscription.subscription)) + } yield result.body.toOption + + val encodingGen: Gen[Any, Encoding] = Gen.fromIterable(List(Encoding.Binary, Encoding.Json)) + + def subscriptionsConfigsGen(topicName: String): Gen[PubsubConnectionConfig.Emulator, Subscription] = (for { + filter <- Gen + .option( + Gen.mapOf(Gen.alphaNumericString, Gen.alphaNumericString).map(SubscriberFilter.matchingAttributes) + ) + enableOrdering <- Gen.boolean + expiration <- Gen.option(Gen.finiteDuration(24.hours, 30.days)) + name <- subscriptionNameGen + } yield Subscription( + topicName = topicName, + name = name.subscription, + filter = filter, + enableOrdering = enableOrdering, + expiration = expiration, + deadLettersSettings = None, + )) +} diff --git a/zio-pubsub-testkit/zio-gc-pubsub-testkit/src/main/scala/com/anymindgroup/pubsub/google/PubsubTestSupport.scala b/zio-pubsub-testkit/zio-gc-pubsub-testkit/src/main/scala/com/anymindgroup/pubsub/google/PubsubTestSupport.scala deleted file mode 100644 index b2624b4..0000000 --- a/zio-pubsub-testkit/zio-gc-pubsub-testkit/src/main/scala/com/anymindgroup/pubsub/google/PubsubTestSupport.scala +++ /dev/null @@ -1,175 +0,0 @@ -package com.anymindgroup.pubsub.google - -import com.anymindgroup.pubsub.google -import com.anymindgroup.pubsub.google.PubsubConnectionConfig.GcpProject -import com.anymindgroup.pubsub.model.* -import com.anymindgroup.pubsub.sub.* -import com.google.api.gax.rpc.NotFoundException -import com.google.cloud.pubsub.v1.{Publisher as GPublisher, SubscriptionAdminClient, TopicAdminClient} -import com.google.protobuf.ByteString -import com.google.pubsub.v1.{PubsubMessage, Subscription as GSubscription, SubscriptionName, TopicName} - -import zio.stream.ZStream -import zio.test.Gen -import zio.{Duration, RIO, RLayer, Scope, Task, ZIO, ZLayer, durationInt} - -object PubsubTestSupport { - def emulatorConnectionConfig( - project: GcpProject = sys.env.get("PUBSUB_EMULATOR_GCP_PROJECT").map(GcpProject(_)).getOrElse(GcpProject("any")), - host: String = sys.env.get("PUBSUB_EMULATOR_HOST").getOrElse("localhost:8085"), - ): PubsubConnectionConfig.Emulator = - PubsubConnectionConfig.Emulator(project, host) - - def emulatorConnectionConfigLayer( - config: PubsubConnectionConfig.Emulator = emulatorConnectionConfig() - ): ZLayer[Any, Nothing, PubsubConnectionConfig.Emulator & GcpProject] = - ZLayer.succeed(config) ++ ZLayer.succeed(config.project) - - val topicAdminClientLayer: RLayer[Scope & PubsubConnectionConfig.Emulator, TopicAdminClient] = - ZLayer.fromZIO { - for { - config <- ZIO.service[PubsubConnectionConfig.Emulator] - adminClient <- TopicAdmin.makeClient(config) - } yield adminClient - } - - val subscriptionAdminClientLayer: RLayer[Scope & PubsubConnectionConfig.Emulator, SubscriptionAdminClient] = - ZLayer.fromZIO { - for { - config <- ZIO.service[PubsubConnectionConfig.Emulator] - adminClient <- SubscriptionAdmin.makeClient(config) - } yield adminClient - } - - val adminLayer: RLayer[Scope & PubsubConnectionConfig.Emulator, TopicAdminClient & SubscriptionAdminClient] = - topicAdminClientLayer ++ subscriptionAdminClientLayer - - def createTopicWithSubscription( - topicName: TopicName, - subscriptionName: SubscriptionName, - ): RIO[SubscriptionAdminClient & TopicAdminClient, Unit] = - createTopic(topicName) *> createSubscription(topicName, subscriptionName) - - def createSubscription( - topicName: TopicName, - subscriptionName: SubscriptionName, - ): RIO[SubscriptionAdminClient, Unit] = for { - subAdminClient <- ZIO.service[SubscriptionAdminClient] - subscription = GSubscription - .newBuilder() - .setTopic(topicName.toString) - .setName(subscriptionName.toString) - .build() - _ <- ZIO.attempt(subAdminClient.createSubscription(subscription)) - } yield () - - def createTopic(topicName: TopicName): RIO[TopicAdminClient, Unit] = for { - topicAdminClient <- ZIO.service[TopicAdminClient] - _ <- ZIO.attempt(topicAdminClient.createTopic(topicName)) - } yield () - - def topicExists(topicName: TopicName): RIO[TopicAdminClient, Boolean] = for { - topicAdmin <- ZIO.service[TopicAdminClient] - res <- ZIO.attempt(topicAdmin.getTopic(topicName)).as(true).catchSome { - case _: com.google.api.gax.rpc.NotFoundException => ZIO.succeed(false) - } - } yield res - - def publishEvent[E]( - event: E, - publisher: GPublisher, - encode: E => Array[Byte] = (e: E) => e.toString.getBytes, - ): Task[Seq[String]] = - publishEvents(Seq(event), publisher, encode) - - def publishEvents[E]( - events: Seq[E], - publisher: GPublisher, - encode: E => Array[Byte] = (e: E) => e.toString.getBytes, - ): Task[Seq[String]] = { - val messages = events.map { data => - val dataArr = encode(data) - PubsubMessage.newBuilder - .setData(ByteString.copyFrom(dataArr)) - .build - } - ZIO.foreach(messages)(e => ZIO.fromFutureJava(publisher.publish(e))) - } - - def publishBatches[E]( - publisher: GPublisher, - amount: Int, - event: Int => E, - ): ZStream[Any, Throwable, String] = - ZStream - .fromIterable((0 until amount).map(event)) - .mapZIO { e => - publishEvent(e, publisher) - } - .flatMap(ZStream.fromIterable(_)) - - val topicNameGen: Gen[PubsubConnectionConfig.Emulator, TopicName] = for { - connection <- Gen.fromZIO(ZIO.service[PubsubConnectionConfig.Emulator]) - topic <- Gen.alphaNumericStringBounded(10, 10).map("topic_" + _) - } yield TopicName.of(connection.project.name, topic) - - val subscriptionNameGen: Gen[PubsubConnectionConfig.Emulator, SubscriptionName] = for { - connection <- Gen.fromZIO(ZIO.service[PubsubConnectionConfig.Emulator]) - subscriptionId <- Gen.alphaNumericStringBounded(10, 10).map("sub_" + _) - } yield SubscriptionName.of(connection.project.name, subscriptionId) - - val topicWithSubscriptionGen: Gen[PubsubConnectionConfig.Emulator, (TopicName, SubscriptionName)] = for { - topicName <- topicNameGen - subscriptionName <- subscriptionNameGen - } yield (topicName, subscriptionName) - - def someTopicWithSubscriptionName: ZIO[PubsubConnectionConfig.Emulator, Nothing, (TopicName, SubscriptionName)] = - topicWithSubscriptionGen.runHead.map(_.get) - - def createSomeSubscriptionRawStream( - topicName: String, - enableOrdering: Boolean = false, - ): RIO[Scope & PubsubConnectionConfig.Emulator, ZStream[Any, Throwable, ReceivedMessage.Raw]] = - for { - connection <- ZIO.service[PubsubConnectionConfig.Emulator] - randomSubName <- Gen.alphaNumericChar.runCollectN(10).map(_.mkString) - stream <- google.Subscriber - .makeTempRawStreamingPullSubscription( - connection = connection, - topicName = topicName, - subscriptionName = s"test_${randomSubName}", - subscriptionFilter = None, - maxTtl = Duration.Infinity, - enableOrdering = enableOrdering, - ) - } yield stream.via(Pipeline.autoAckPipeline) - - def findSubscription( - subscriptionName: String - ): RIO[GcpProject & SubscriptionAdminClient, Option[GSubscription]] = for { - client <- ZIO.service[SubscriptionAdminClient] - subscriptionId <- ZIO.serviceWith[GcpProject](g => SubscriptionName.of(g.name, subscriptionName)) - result <- ZIO.attempt(client.getSubscription(subscriptionId)).map(Some(_)).catchSome { case _: NotFoundException => - ZIO.none - } - } yield result - - val encodingGen: Gen[Any, Encoding] = Gen.fromIterable(List(Encoding.Binary, Encoding.Json)) - - def subscriptionsConfigsGen(topicName: String): Gen[PubsubConnectionConfig.Emulator, Subscription] = (for { - filter <- Gen - .option( - Gen.mapOf(Gen.alphaNumericString, Gen.alphaNumericString).map(SubscriberFilter.matchingAttributes) - ) - enableOrdering <- Gen.boolean - expiration <- Gen.option(Gen.finiteDuration(24.hours, 30.days)) - name <- subscriptionNameGen - } yield Subscription( - topicName = topicName, - name = name.getSubscription(), - filter = filter, - enableOrdering = enableOrdering, - expiration = expiration, - deadLettersSettings = None, - )) -} diff --git a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/PubsubConnectionConfig.scala b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/PubsubConnectionConfig.scala new file mode 100644 index 0000000..b01455e --- /dev/null +++ b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/PubsubConnectionConfig.scala @@ -0,0 +1,14 @@ +package com.anymindgroup.pubsub.model + +sealed trait PubsubConnectionConfig { + def project: PubsubConnectionConfig.GcpProject +} + +object PubsubConnectionConfig { + final case class Cloud(project: GcpProject) extends PubsubConnectionConfig + final case class Emulator(project: GcpProject, host: String, port: Int = 8085) extends PubsubConnectionConfig + + final case class GcpProject(name: String) extends AnyVal { + override def toString: String = name + } +} diff --git a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/SubscriptionName.scala b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/SubscriptionName.scala new file mode 100644 index 0000000..b33660e --- /dev/null +++ b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/SubscriptionName.scala @@ -0,0 +1,6 @@ +package com.anymindgroup.pubsub.model + +final case class SubscriptionName(projectId: String, subscription: String) { + def path: String = s"projects/$projectId/subscriptions/$subscription" + override def toString: String = path +} diff --git a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/TopicName.scala b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/TopicName.scala new file mode 100644 index 0000000..8e96e6e --- /dev/null +++ b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/model/TopicName.scala @@ -0,0 +1,6 @@ +package com.anymindgroup.pubsub.model + +final case class TopicName(projectId: String, topic: String) { + def path: String = s"projects/$projectId/topics/$topic" + override def toString: String = path +} diff --git a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/package.scala b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/package.scala index 10a829e..b9105d5 100644 --- a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/package.scala +++ b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/package.scala @@ -40,4 +40,7 @@ package object pubsub { type DeadLettersSettings = sub.DeadLettersSettings val DeadLettersSettings = sub.DeadLettersSettings + + type PubsubConnectionConfig = model.PubsubConnectionConfig + val PubsubConnectionConfig = model.PubsubConnectionConfig } diff --git a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/pub/Publisher.scala b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/pub/Publisher.scala index e6b0f89..c41d189 100644 --- a/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/pub/Publisher.scala +++ b/zio-pubsub/shared/src/main/scala/com/anymindgroup/pubsub/pub/Publisher.scala @@ -2,10 +2,12 @@ package com.anymindgroup.pubsub.pub import com.anymindgroup.pubsub.model.MessageId -import zio.{RIO, Tag, ZIO} +import zio.{NonEmptyChunk, RIO, Tag, ZIO} trait Publisher[R, E] { def publish(message: PublishMessage[E]): RIO[R, MessageId] + + def publish(message: NonEmptyChunk[PublishMessage[E]]): RIO[R, NonEmptyChunk[MessageId]] } object Publisher {