diff --git a/docs/build.sbt b/docs/build.sbt index eda51be..add621d 100644 --- a/docs/build.sbt +++ b/docs/build.sbt @@ -47,22 +47,22 @@ HttpCsvToKafka / paradoxProperties ++= Map( "canonical.base_url" -> s"${homepage.value.get}/${HttpCsvToKafka.name}", "snip.build.base_dir" -> s"${baseDirectory.value}/../pekko-connectors-sample-${HttpCsvToKafka.name}", "github.root.base_dir" -> s"${baseDirectory.value}/..", - // Alpakka - "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.HttpCsvToKafka.AlpakkaVersion}", - "javadoc.akka.base_url" -> "", - "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.HttpCsvToKafka.AlpakkaVersion}/%s", + // Pekko Connectors + "scaladoc.pekko.connectors.base_url" -> s"https://pekko.apache.org/api/pekko-connectors/${Dependencies.HttpCsvToKafka.PekkoConnectorsVersion}/org/apache", // TODO: TBC + "javadoc.pekko.connectors.base_url" -> s"https://pekko.apache.org/japi/pekko-connectors/${Dependencies.HttpCsvToKafka.PekkoConnectorsVersion}/org/apache", // TODO: TBC + "extref.pekko-connectors.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/${Dependencies.HttpCsvToKafka.PekkoConnectorsVersion}/%s", // Alpakka Kafka - "scaladoc.akka.kafka.base_url" -> s"https://doc.akka.io/api/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}", - "javadoc.akka.kafka.base_url" -> "", - "extref.alpakka-kafka.base_url" -> s"https://doc.akka.io/docs/alpakka-kafka/${Dependencies.HttpCsvToKafka.AlpakkaKafkaVersion}/%s", - // Akka - "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.HttpCsvToKafka.AkkaVersion}", - "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.HttpCsvToKafka.AkkaVersion}", - "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.HttpCsvToKafka.AkkaVersion}/%s", - // Akka HTTP - "scaladoc.akka.http.base_url" -> s"https://doc.akka.io/api/akka-http/${Dependencies.HttpCsvToKafka.AkkaHttpVersion}", - "javadoc.akka.http.base_url" -> s"https://doc.akka.io/japi/akka-http/${Dependencies.HttpCsvToKafka.AkkaHttpVersion}", - "extref.akka-http.base_url" -> s"https://doc.akka.io/docs/akka-http/${Dependencies.HttpCsvToKafka.AkkaHttpVersion}/%s", + "scaladoc.pekko.kafka.base_url" -> s"https://pekko.apache.org/api/pekko-connectors-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}/org/apache", // TODO: TBC + "javadoc.pekko.kafka.base_url" -> s"https://pekko.apache.org/japi/pekko-connectors-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}/org/apache", // TODO: TBC + "extref.pekko-connectors-kafka.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors-kafka/${Dependencies.HttpCsvToKafka.PekkoConnectorsKafkaVersion}/%s", + // Pekko + "scaladoc.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/${Dependencies.HttpCsvToKafka.PekkoVersion}/org/apache", + "javadoc.pekko.base_url" -> s"https://pekko.apache.org/japi/pekko/${Dependencies.HttpCsvToKafka.PekkoVersion}/org/apache", + "extref.pekko.base_url" -> s"https://pekko.apache.org/docs/pekko/${Dependencies.HttpCsvToKafka.PekkoVersion}/%s", + // Pekko HTTP + "scaladoc.pekko.http.base_url" -> s"https://pekko.apache.org/api/pekko-http/${Dependencies.HttpCsvToKafka.PekkoHttpVersion}/org/apache", // TODO: TBC + "javadoc.pekko.http.base_url" -> s"https://pekko.apache.org/japi/pekko-http/${Dependencies.HttpCsvToKafka.PekkoHttpVersion}/org/apache", // TODO: TBC + "extref.pekko-http.base_url" -> s"https://pekko.apache.org/docs/pekko-http/${Dependencies.HttpCsvToKafka.PekkoHttpVersion}/%s", ) HttpCsvToKafka / paradoxGroups := Map("Language" -> Seq("Java", "Scala")) diff --git a/docs/project/Dependencies.scala b/docs/project/Dependencies.scala index e7c3293..562cd3b 100644 --- a/docs/project/Dependencies.scala +++ b/docs/project/Dependencies.scala @@ -28,10 +28,10 @@ object Dependencies { val ScalaVersion = versions("scalaVer") val ScalaTestVersion = versions("ScalaTestVersion") - val AkkaVersion = versions("AkkaVersion") - val AkkaHttpVersion = versions("AkkaHttpVersion") - val AlpakkaVersion = versions("AlpakkaVersion") - val AlpakkaKafkaVersion = versions("AlpakkaKafkaVersion") + val PekkoVersion = versions("PekkoVersion") + val PekkoHttpVersion = versions("PekkoHttpVersion") + val PekkoConnectorsVersion = versions("PekkoConnectorsVersion") + val PekkoConnectorsKafkaVersion = versions("PekkoConnectorsKafkaVersion") } object JdbcToElasticsearch { diff --git a/pekko-connectors-sample-http-csv-to-kafka/.courseName b/pekko-connectors-sample-http-csv-to-kafka/.courseName index dcfce32..8df0cac 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/.courseName +++ b/pekko-connectors-sample-http-csv-to-kafka/.courseName @@ -1 +1 @@ -Alpakka: HTTP CSV to Kafka +Pekko Connectors: HTTP CSV to Kafka diff --git a/pekko-connectors-sample-http-csv-to-kafka/README.md b/pekko-connectors-sample-http-csv-to-kafka/README.md index 013ccb4..4c418b2 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/README.md @@ -1,9 +1,9 @@ -# Alpakka sample +# Pekko Connectors sample -## Fetch CSV via Akka HTTP and publish the data as JSON to Kafka +## Fetch CSV via Pekko HTTP and publish the data as JSON to Kafka -This example uses @extref[Akka HTTP to send the HTTP request](akka-http:client-side/connection-level.html#opening-http-connections) and Akka HTTPs primary JSON support via @extref[Spray JSON](akka-http:common/json-support.html#spray-json-support) (for Scala) or Jackson JSON (for Java) to convert the map into a JSON structure which gets published to a Kafka topic. +This example uses @extref[Pekko HTTP to send the HTTP request](pekko-http:client-side/connection-level.html#opening-http-connections) and Pekko HTTPs primary JSON support via @extref[Spray JSON](pekko-http:common/json-support.html#spray-json-support) (for Scala) or Jackson JSON (for Java) to convert the map into a JSON structure which gets published to a Kafka topic. -Browse the sources at @link:[Github](https://github.com/akka/alpakka-samples/tree/master/alpakka-sample-http-csv-to-kafka) { open=new }. +Browse the sources at @link:[Github](https://github.com/apache/incubator-pekko-connectors-samples/tree/main/pekko-connectors-sample-http-csv-to-kafka) { open=new }. -To try out this project clone @link:[the Alpakka Samples repository](https://github.com/akka/alpakka-samples) { open=new } and find it in the `alpakka-sample-http-csv-to-kafka` directory. +To try out this project clone @link:[the Pekko Connectors Samples repository](https://github.com/apache/incubator-pekko-connectors-samples) { open=new } and find it in the `pekko-connectors-sample-http-csv-to-kafka` directory. diff --git a/pekko-connectors-sample-http-csv-to-kafka/build.sbt b/pekko-connectors-sample-http-csv-to-kafka/build.sbt index 2f21af6..8013348 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/build.sbt +++ b/pekko-connectors-sample-http-csv-to-kafka/build.sbt @@ -1,5 +1,5 @@ -lazy val alpakka_sample_master = project +lazy val pekko_connectors_sample_master = project .in(file(".")) .aggregate( common, diff --git a/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/application.conf b/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/application.conf index f83f9d0..71a28ae 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/application.conf +++ b/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/application.conf @@ -1,6 +1,6 @@ -akka { - loggers = ["akka.event.slf4j.Slf4jLogger"] - logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" +pekko { + loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"] + logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter" loglevel = "DEBUG" } diff --git a/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/logback.xml b/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/logback.xml index cb745b2..7c4096b 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/logback.xml +++ b/pekko-connectors-sample-http-csv-to-kafka/common/src/main/resources/logback.xml @@ -31,8 +31,7 @@ - - + diff --git a/pekko-connectors-sample-http-csv-to-kafka/docs/src/main/paradox/index.md b/pekko-connectors-sample-http-csv-to-kafka/docs/src/main/paradox/index.md index e074950..705af7a 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/docs/src/main/paradox/index.md +++ b/pekko-connectors-sample-http-csv-to-kafka/docs/src/main/paradox/index.md @@ -5,9 +5,9 @@ Dependencies : @@snip [snip](/project/Dependencies.scala) { #dependencies } -### All Alpakka samples +### All Pekko Connectors samples -Show [Alpakka samples listing](../index.html). +Show [Pekko Connectors samples listing](../index.html). @@toc diff --git a/pekko-connectors-sample-http-csv-to-kafka/project/CommonSettings.scala b/pekko-connectors-sample-http-csv-to-kafka/project/CommonSettings.scala index ddcaaf5..26bc2d5 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/project/CommonSettings.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/project/CommonSettings.scala @@ -4,8 +4,8 @@ import sbtstudent.AdditionalSettings object CommonSettings { lazy val commonSettings = Seq( - organization := "com.lightbend.training", - version := "1.3.0", + organization := "org.apache.pekko", + version := "1.0.0", scalaVersion := Dependencies.scalaVer, scalacOptions ++= CompileOptions.compileOptions, Compile / unmanagedSourceDirectories := List((Compile / scalaSource).value, (Compile / javaSource).value), @@ -16,9 +16,13 @@ object CommonSettings { ThisBuild / parallelExecution := false, GlobalScope / parallelExecution := false, Test / fork := true, - libraryDependencies ++= Dependencies.dependencies + libraryDependencies ++= Dependencies.dependencies, + + // #TODO: Remove these lines ones Pekko Connectors have 1.0.0 + resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", ) ++ AdditionalSettings.initialCmdsConsole ++ AdditionalSettings.initialCmdsTestConsole ++ AdditionalSettings.cmdAliases } + diff --git a/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala b/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala index fc23139..5eee9c4 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/project/Dependencies.scala @@ -4,26 +4,26 @@ object Dependencies { val scalaVer = "2.13.7" // #dependencies val ScalaTestVersion = "3.1.4" - val AkkaVersion = "2.6.19" - val AkkaHttpVersion = "10.1.12" - val AlpakkaVersion = "4.0.0" - val AlpakkaKafkaVersion = "3.0.1" + val PekkoVersion = "1.0.0" + val PekkoHttpVersion = "0.0.0+4469-fb6a5426-SNAPSHOT" // #TODO: Change to release version + val PekkoConnectorsVersion = "0.0.0+131-79ec6fa6-SNAPSHOT" // #TODO: Change to release version + val PekkoConnectorsKafkaVersion = "0.0.0+1761-2291eac2-SNAPSHOT" // #TODO: Change to release version val dependencies = List( - "com.lightbend.akka" %% "akka-stream-alpakka-csv" % AlpakkaVersion, - "com.typesafe.akka" %% "akka-stream-kafka" % AlpakkaKafkaVersion, - "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion, - "com.typesafe.akka" %% "akka-stream" % AkkaVersion, - "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, + "org.apache.pekko" %% "pekko-connectors-csv" % PekkoConnectorsVersion, + "org.apache.pekko" %% "pekko-connectors-kafka" % PekkoConnectorsKafkaVersion, + "org.apache.pekko" %% "pekko-actor-typed" % PekkoVersion, + "org.apache.pekko" %% "pekko-stream" % PekkoVersion, + "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, // Used from Scala - "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, + "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, // Used from Java - "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.11.4", - "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.11.4", + "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.14.3", + "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.14.3", - "org.testcontainers" % "kafka" % "1.14.3", + "org.testcontainers" % "kafka" % "1.18.3", - "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion, + "org.apache.pekko" %% "pekko-slf4j" % PekkoVersion, "ch.qos.logback" % "logback-classic" % "1.2.3" ) // #dependencies diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/README.md b/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/README.md index 2fd10dc..879fe9a 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/README.md @@ -2,12 +2,12 @@ ### Description -This code uses Akka HTTP to request a file containing listed companies from the NASDAQ web site. +This code uses Pekko HTTP to request a file containing listed companies from the NASDAQ web site. It starts the Actor System, imports the Actor System's dispatcher as `ExecutionContext`, and gets a stream materializer from the Actor System. The HTTP request is created as value (it will be sent multiple times in later steps) and sets a specific HTTP request header. -The request is run in an Akka Stream from the single value, issuing the request by Akka HTTP, and printing out the HTTP response. +The request is run in an Pekko Stream from the single value, issuing the request by Pekko HTTP, and printing out the HTTP response. Once the stream completes, the Actor System is terminated and the program exits. diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/java/samples/javadsl/Main.java index 26a6a47..87ca0f3 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/java/samples/javadsl/Main.java @@ -4,14 +4,14 @@ package samples.javadsl; -import akka.Done; -import akka.actor.ActorSystem; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.headers.Accept; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; +import org.apache.pekko.Done; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; import java.util.Collections; import java.util.concurrent.CompletionStage; diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/scala/samples/Main.scala index b2a695e..07de110 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_001_http_request/src/main/scala/samples/Main.scala @@ -4,21 +4,21 @@ package samples -import akka.Done -import akka.actor._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.stream._ -import akka.stream.scaladsl.{ Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.stream._ +import org.apache.pekko.stream.scaladsl.{ Sink, Source } +import org.apache.pekko.util.ByteString import scala.concurrent.Future object Main extends App { - implicit val actorSystem = ActorSystem("alpakka-samples") + implicit val actorSystem = ActorSystem("pekko-connectors-samples") import actorSystem.dispatcher diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/README.md b/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/README.md index ad43dd3..348da08 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/README.md @@ -2,4 +2,4 @@ ### Description -The HTTP response with status OK is expected and the contained HTTP entity is extracted. Instead of the HTTP response, the contained entity (page content) continues in the stream in the form of @scaladoc:[ByteString](akka.util.ByteString) elements. +The HTTP response with status OK is expected and the contained HTTP entity is extracted. Instead of the HTTP response, the contained entity (page content) continues in the stream in the form of @scaladoc:[ByteString](pekko.http.impl.util.JavaMapping.ByteString) elements. diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/java/samples/javadsl/Main.java index 7b2f5df..75b8172 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/java/samples/javadsl/Main.java @@ -4,23 +4,23 @@ package samples.javadsl; -import akka.Done; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import java.util.Collections; import java.util.concurrent.CompletionStage; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -43,7 +43,7 @@ public static void main(String[] args) throws Exception { } private void run() throws Exception { - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); CompletionStage completion = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/scala/samples/Main.scala index 1aeb231..a2f059a 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_002_extract_http_entity/src/main/scala/samples/Main.scala @@ -4,22 +4,22 @@ package samples -import akka.Done -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.stream.scaladsl.{ Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.stream.scaladsl.{ Sink, Source } +import org.apache.pekko.util.ByteString import scala.concurrent.Future object Main extends App { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md index ca8e0b6..30933ef 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/README.md @@ -2,4 +2,4 @@ ### Description -The binary data in @scaladoc:[ByteString](akka.util.ByteString)s is passed into @extref:[Alpakka CSV](alpakka:data-transformations/csv.html) to be parsed and converted per line into a Map. The stream elements becomes a @scala[`Map[String, ByteString]`]@java[`Map`], one entry per column using the column headers as keys. +The binary data in @scaladoc:[ByteString](pekko.http.impl.util.JavaMapping.ByteString)s is passed into @extref:[Pekko Connectors CSV](pekko-connectors:data-transformations/csv.html) to be parsed and converted per line into a Map. The stream elements becomes a @scala[`Map[String, ByteString]`]@java[`Map`], one entry per column using the column headers as keys. diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/java/samples/javadsl/Main.java index 4df410b..f0f58a5 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/java/samples/javadsl/Main.java @@ -4,26 +4,26 @@ package samples.javadsl; -import akka.Done; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.stream.alpakka.csv.javadsl.CsvParsing; -import akka.stream.alpakka.csv.javadsl.CsvToMap; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvParsing; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvToMap; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.concurrent.CompletionStage; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -46,7 +46,7 @@ public static void main(String[] args) throws Exception { } private void run() throws Exception { - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); CompletionStage completion = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/scala/samples/Main.scala index a23e20d..7c1465f 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_003_parse_csv/src/main/scala/samples/Main.scala @@ -4,23 +4,23 @@ package samples -import akka.Done -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } -import akka.stream.scaladsl.{ Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } +import org.apache.pekko.stream.scaladsl.{ Sink, Source } +import org.apache.pekko.util.ByteString import scala.concurrent.Future object Main extends App { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/java/samples/javadsl/Main.java index 40bf270..2ee28e9 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/java/samples/javadsl/Main.java @@ -4,20 +4,20 @@ package samples.javadsl; -import akka.Done; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.stream.alpakka.csv.javadsl.CsvParsing; -import akka.stream.alpakka.csv.javadsl.CsvToMap; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvParsing; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvToMap; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -28,7 +28,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -70,7 +70,7 @@ private String toJson(Map map) throws Exception { } private void run() throws Exception { - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); CompletionStage completion = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/scala/samples/Main.scala index 24bfe81..15f5344 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_004_producing_json/src/main/scala/samples/Main.scala @@ -4,17 +4,17 @@ package samples -import akka.Done -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } -import akka.stream.scaladsl.{ Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } +import org.apache.pekko.stream.scaladsl.{ Sink, Source } +import org.apache.pekko.util.ByteString import spray.json.{ DefaultJsonProtocol, JsValue, JsonWriter } import scala.concurrent.Future @@ -23,7 +23,7 @@ object Main extends App with DefaultJsonProtocol { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/README.md b/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/README.md index 25f58ee..d241feb 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/README.md @@ -2,4 +2,4 @@ ### Description -The CSV data we receive happens to contain an empty column without a header. The `cleanseCsvData` removes that column and turns the column values from @scaladoc:[ByteString](akka.util.ByteString)s into regular `String`s. +The CSV data we receive happens to contain an empty column without a header. The `cleanseCsvData` removes that column and turns the column values from @scaladoc:[ByteString](pekko.http.impl.util.JavaMapping.ByteString)s into regular `String`s. diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/java/samples/javadsl/Main.java index 2593daf..db5e066 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/java/samples/javadsl/Main.java @@ -4,20 +4,20 @@ package samples.javadsl; -import akka.Done; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.stream.alpakka.csv.javadsl.CsvParsing; -import akka.stream.alpakka.csv.javadsl.CsvToMap; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvParsing; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvToMap; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -28,7 +28,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -79,7 +79,7 @@ private String toJson(Map map) throws Exception { } private void run() throws Exception { - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); CompletionStage completion = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/scala/samples/Main.scala index 35254af..b8a6ab5 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_005_cleanse_lines/src/main/scala/samples/Main.scala @@ -4,17 +4,17 @@ package samples -import akka.Done -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } -import akka.stream.scaladsl.{ Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } +import org.apache.pekko.stream.scaladsl.{ Sink, Source } +import org.apache.pekko.util.ByteString import spray.json.{ DefaultJsonProtocol, JsValue, JsonWriter } import scala.concurrent.Future @@ -23,7 +23,7 @@ object Main extends App with DefaultJsonProtocol { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/README.md b/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/README.md index f80b86f..4cbef6c 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/README.md @@ -2,4 +2,4 @@ ### Description -To make sure Akka HTTP is shut down in a proper way it is added to Akka's @extref:[Coordinated shutdown](akka:actors.html#coordinated-shutdown). +To make sure Pekko HTTP is shut down in a proper way it is added to Pekko's @extref:[Coordinated shutdown](pekko:actors.html#coordinated-shutdown). diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/java/samples/javadsl/Main.java index 8a16ad9..ba46e46 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/java/samples/javadsl/Main.java @@ -4,21 +4,21 @@ package samples.javadsl; -import akka.Done; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.CoordinatedShutdown; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.stream.alpakka.csv.javadsl.CsvParsing; -import akka.stream.alpakka.csv.javadsl.CsvToMap; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.actor.CoordinatedShutdown; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvParsing; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvToMap; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; @@ -29,7 +29,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -80,7 +80,7 @@ private String toJson(Map map) throws Exception { } private void run() throws Exception { - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); CompletionStage completion = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/scala/samples/Main.scala index 13c3d44..70c01ff 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_006_coordinated_shutdown/src/main/scala/samples/Main.scala @@ -4,18 +4,18 @@ package samples -import akka.Done -import akka.actor.CoordinatedShutdown -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } -import akka.stream.scaladsl.{ Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.CoordinatedShutdown +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } +import org.apache.pekko.stream.scaladsl.{ Sink, Source } +import org.apache.pekko.util.ByteString import spray.json.{ DefaultJsonProtocol, JsValue, JsonWriter } import scala.concurrent.Future @@ -24,7 +24,7 @@ object Main extends App with DefaultJsonProtocol { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md index e6b6510..25245fc 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md +++ b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/README.md @@ -4,6 +4,6 @@ [Testcontainers](https://www.testcontainers.org/) starts a Kafka broker in Docker. -@extref:[Alpakka Kafka](alpakka-kafka:producer.html) producer settings specify the broker address and the data types for Kafka's key and value. +@extref:[Pekko Connectors Kafka](pekko-connectors-kafka:producer.html) producer settings specify the broker address and the data types for Kafka's key and value. -@scaladoc:[Producer.plainSink](akka.kafka.scaladsl.Producer$) sends the `ProducerRecord`s stream elements to the specified Kafka topic. +@scaladoc:[Producer.plainSink](pekko.kafka.scaladsl.Producer$) sends the `ProducerRecord`s stream elements to the specified Kafka topic. diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/java/samples/javadsl/Main.java index 0960ea5..9e04675 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/java/samples/javadsl/Main.java @@ -4,27 +4,27 @@ package samples.javadsl; -import akka.Done; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.CoordinatedShutdown; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.kafka.ConsumerSettings; -import akka.kafka.ProducerSettings; -import akka.kafka.Subscriptions; -import akka.kafka.javadsl.Consumer; -import akka.kafka.javadsl.Producer; -import akka.stream.alpakka.csv.javadsl.CsvParsing; -import akka.stream.alpakka.csv.javadsl.CsvToMap; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.actor.CoordinatedShutdown; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.kafka.ConsumerSettings; +import org.apache.pekko.kafka.ProducerSettings; +import org.apache.pekko.kafka.Subscriptions; +import org.apache.pekko.kafka.javadsl.Consumer; +import org.apache.pekko.kafka.javadsl.Producer; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvParsing; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvToMap; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -41,7 +41,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -96,7 +96,7 @@ private void run() throws Exception { kafkaBroker.start(); final String bootstrapServers = kafkaBroker.getBootstrapServers(); - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); ProducerSettings kafkaProducerSettings = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/scala/samples/Main.scala index b3f6684..a356806 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_007_produce_to_kafka/src/main/scala/samples/Main.scala @@ -4,20 +4,20 @@ package samples -import akka.Done -import akka.actor.CoordinatedShutdown -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.kafka.scaladsl.{ Consumer, Producer } -import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions } -import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } -import akka.stream.scaladsl.{ Keep, Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.CoordinatedShutdown +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.kafka.scaladsl.{ Consumer, Producer } +import org.apache.pekko.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions } +import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } +import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source } +import org.apache.pekko.util.ByteString import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.{ StringDeserializer, StringSerializer } @@ -30,7 +30,7 @@ object Main extends App with DefaultJsonProtocol { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/java/samples/javadsl/Main.java b/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/java/samples/javadsl/Main.java index 73bc515..230c375 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/java/samples/javadsl/Main.java +++ b/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/java/samples/javadsl/Main.java @@ -4,29 +4,29 @@ package samples.javadsl; -import akka.Done; -import akka.actor.Cancellable; -import akka.actor.CoordinatedShutdown; -import akka.actor.typed.ActorSystem; -import akka.actor.typed.javadsl.Behaviors; -import akka.http.javadsl.Http; -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.model.MediaRanges; -import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.model.headers.Accept; -import akka.japi.Pair; -import akka.kafka.ConsumerSettings; -import akka.kafka.ProducerSettings; -import akka.kafka.Subscriptions; -import akka.kafka.javadsl.Consumer; -import akka.kafka.javadsl.Producer; -import akka.stream.alpakka.csv.javadsl.CsvParsing; -import akka.stream.alpakka.csv.javadsl.CsvToMap; -import akka.stream.javadsl.Keep; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.util.ByteString; +import org.apache.pekko.Done; +import org.apache.pekko.actor.Cancellable; +import org.apache.pekko.actor.CoordinatedShutdown; +import org.apache.pekko.actor.typed.ActorSystem; +import org.apache.pekko.actor.typed.javadsl.Behaviors; +import org.apache.pekko.http.javadsl.Http; +import org.apache.pekko.http.javadsl.model.HttpRequest; +import org.apache.pekko.http.javadsl.model.HttpResponse; +import org.apache.pekko.http.javadsl.model.MediaRanges; +import org.apache.pekko.http.javadsl.model.StatusCodes; +import org.apache.pekko.http.javadsl.model.headers.Accept; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.kafka.ConsumerSettings; +import org.apache.pekko.kafka.ProducerSettings; +import org.apache.pekko.kafka.Subscriptions; +import org.apache.pekko.kafka.javadsl.Consumer; +import org.apache.pekko.kafka.javadsl.Producer; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvParsing; +import org.apache.pekko.stream.connectors.csv.javadsl.CsvToMap; +import org.apache.pekko.stream.javadsl.Keep; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; +import org.apache.pekko.util.ByteString; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -45,7 +45,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import static akka.actor.typed.javadsl.Adapter.toClassic; +import static org.apache.pekko.actor.typed.javadsl.Adapter.toClassic; public class Main { @@ -100,7 +100,7 @@ private void run() throws Exception { kafkaBroker.start(); final String bootstrapServers = kafkaBroker.getBootstrapServers(); - ActorSystem system = ActorSystem.create(Behaviors.empty(), "alpakka-samples"); + ActorSystem system = ActorSystem.create(Behaviors.empty(), "pekko-connectors-samples"); Http http = Http.get(toClassic(system)); ProducerSettings kafkaProducerSettings = diff --git a/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/scala/samples/Main.scala b/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/scala/samples/Main.scala index d148ad3..d76af96 100644 --- a/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/scala/samples/Main.scala +++ b/pekko-connectors-sample-http-csv-to-kafka/step_008_scheduled_download/src/main/scala/samples/Main.scala @@ -6,20 +6,20 @@ package samples import java.util.concurrent.TimeUnit -import akka.Done -import akka.actor.{ CoordinatedShutdown, Cancellable } -import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } -import akka.kafka.scaladsl.{ Consumer, Producer } -import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions } -import akka.stream.alpakka.csv.scaladsl.{ CsvParsing, CsvToMap } -import akka.stream.scaladsl.{ Keep, Sink, Source } -import akka.util.ByteString +import org.apache.pekko.Done +import org.apache.pekko.actor.{ CoordinatedShutdown, Cancellable } +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.actor.typed.scaladsl.adapter._ +import org.apache.pekko.http.scaladsl._ +import org.apache.pekko.http.scaladsl.model.StatusCodes._ +import org.apache.pekko.http.scaladsl.model.headers.Accept +import org.apache.pekko.http.scaladsl.model.{ HttpRequest, HttpResponse, MediaRanges } +import org.apache.pekko.kafka.scaladsl.{ Consumer, Producer } +import org.apache.pekko.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions } +import org.apache.pekko.stream.connectors.csv.scaladsl.{ CsvParsing, CsvToMap } +import org.apache.pekko.stream.scaladsl.{ Keep, Sink, Source } +import org.apache.pekko.util.ByteString import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.{ StringDeserializer, StringSerializer } @@ -33,7 +33,7 @@ object Main extends App with DefaultJsonProtocol { - implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "alpakka-samples") + implicit val actorSystem: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "pekko-connectors-samples") import actorSystem.executionContext