diff --git a/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala b/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala index ced06d0..daef567 100644 --- a/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala +++ b/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala @@ -64,7 +64,7 @@ object PipeLine extends AkkaApp with ArgumentsParser { | } |} """.stripMargin - val pipelineConfig = PipeLineConfig(ConfigFactory.parseFile(new java.io.File(pipelineString))) + val pipelineConfig = PipeLineConfig(ConfigFactory.parseString(pipelineString)) val processors = config.getInt("processors") val persistors = config.getInt("persistors") val topic = config.getString("topic") diff --git a/project/Build.scala b/project/Build.scala index 219a8d5..8a4d6f8 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -24,6 +24,7 @@ object Build extends sbt.Build { val clouderaVersion = "2.6.0-cdh5.4.2" val clouderaHBaseVersion = "1.0.0-cdh5.4.2" val gearpumpVersion = "0.6.2-SNAPSHOT" + val gearpumpTapVersion = "0.0.1-SNAPSHOT" val junitVersion = "4.12" val kafkaVersion = "0.8.2.1" val mockitoVersion = "1.10.17" @@ -46,7 +47,8 @@ object Build extends sbt.Build { "sonatype" at "https://oss.sonatype.org/content/repositories/releases", "bintray/non" at "http://dl.bintray.com/non/maven", "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos", - "clockfly" at "http://dl.bintray.com/clockfly/maven" + "clockfly" at "http://dl.bintray.com/clockfly/maven", + "local maven" at "file://"+Path.userHome.absolutePath+"/.m2/repository" ), addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full) ) ++ @@ -131,7 +133,7 @@ object Build extends sbt.Build { new File(packagePath).renameTo(new File(target)) } ) - ).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline) + ).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, tap_pipeline) lazy val kafka_hdfs_pipeline = Project( id = "gearpump-kafka-hdfs-pipeline", @@ -266,4 +268,77 @@ object Build extends sbt.Build { ) ) + lazy val tap_pipeline = Project( + id = "gearpump-tap-pipeline", + base = file("tap-pipeline"), + settings = commonSettings ++ myAssemblySettings ++ + Seq( + mergeStrategy in assembly := { + case PathList("META-INF", "maven","org.slf4j","slf4j-api", ps) if ps.startsWith("pom") => MergeStrategy.discard + case x => + val oldStrategy = (mergeStrategy in assembly).value + oldStrategy(x) + }, + libraryDependencies ++= Seq( + "io.spray" %% "spray-can" % sprayVersion, + "com.lihaoyi" %% "upickle" % upickleVersion, + "com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "provided" + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.apache.htrace", "htrace-core"), + "com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests", + "com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided" + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.apache.htrace", "htrace-core"), + "com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests", + "com.github.intel-hadoop" %% "gearpump-external-kafka" % gearpumpVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.apache.htrace", "htrace-core"), + "org.trustedanalytics.gearpump" % "config-tools" % gearpumpTapVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.apache.htrace", "htrace-core") + exclude("commons-beanutils", "commons-beanutils-core") + exclude("commons-beanutils", "commons-beanutils"), + "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2", + "com.github.intel-hadoop" %% "gearpump-external-hbase" % gearpumpVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.apache.htrace", "htrace-core"), + "com.github.intel-hadoop" %% "gearpump-external-hbase" % gearpumpVersion % "test" classifier "tests", + "com.julianpeeters" % "avro-scala-macro-annotations_2.11" % "0.9.0", + "org.apache.hadoop" % "hadoop-hdfs" % clouderaVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("org.mortbay.jetty", "jetty-util") + exclude("org.mortbay.jetty", "jetty") + exclude("org.apache.htrace", "htrace-core") + exclude("tomcat", "jasper-runtime"), + "org.apache.hadoop" % "hadoop-yarn-api" % clouderaVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("com.google.guava", "guava") + exclude("com.google.protobuf", "protobuf-java") + exclude("commons-lang", "commons-lang") + exclude("org.apache.htrace", "htrace-core") + exclude("commons-logging", "commons-logging") + exclude("org.apache.hadoop", "hadoop-annotations"), + "org.apache.hadoop" % "hadoop-yarn-client" % clouderaVersion + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("com.google.guava", "guava") + exclude("com.sun.jersey", "jersey-client") + exclude("commons-cli", "commons-cli") + exclude("commons-lang", "commons-lang") + exclude("commons-logging", "commons-logging") + exclude("org.apache.htrace", "htrace-core") + exclude("log4j", "log4j") + exclude("org.apache.hadoop", "hadoop-annotations") + exclude("org.mortbay.jetty", "jetty-util") + exclude("org.apache.hadoop", "hadoop-yarn-api") + exclude("org.apache.hadoop", "hadoop-yarn-common"), + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", + "org.mockito" % "mockito-core" % mockitoVersion % "test", + "junit" % "junit" % junitVersion % "test" + ) ++ hadoopDependency, + mainClass in (Compile, packageBin) := Some("io.gearpump.examples.tap_pipeline.PipeLine"), + target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor + ) + ) } diff --git a/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala b/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala new file mode 100644 index 0000000..74610db --- /dev/null +++ b/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.examples.tap_pipeline + +import akka.actor.ActorSystem +import com.typesafe.config.{ConfigFactory, ConfigRenderOptions} +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import io.gearpump.external.hbase.HBaseSink +import io.gearpump.streaming.StreamApplication +import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory} +import io.gearpump.streaming.sink.DataSinkProcessor +import io.gearpump.streaming.source.DataSourceProcessor +import io.gearpump.tap.TapJsonConfig +import io.gearpump.util.Graph._ +import io.gearpump.util.{AkkaApp, Graph, LogUtil} +import org.slf4j.Logger + +object PipeLine extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "hbase"-> CLIOption[String]("", required = false, defaultValue = Some("hbase")), + "kafka"-> CLIOption[String]("", required = false, defaultValue = Some("kafka")), + "table"-> CLIOption[String]("", required = false, defaultValue = Some("gp_tap_table")), + "topic"-> CLIOption[String]("", required = false, defaultValue = Some("gp_tap_topic")) + ) + + def application(config: ParseResult, system: ActorSystem): StreamApplication = { + implicit val actorSystem = system + + val conf = ConfigFactory.load + val services = conf.root.withOnlyKey("VCAP_SERVICES").render(ConfigRenderOptions.defaults().setJson(true)) + val tjc = new TapJsonConfig(services) + val hbaseconfig = tjc.getHBase(config.getString("hbase")) + //val kafkaconfig = tjc.getKafka(config.getString("hbase")) + val kafkaconfig = Map( + "zookeepers" -> "10.10.10.46:9092,10.10.10.164:9092,10.10.10.236:9092", + "brokers" -> "10.10.10.46:2181,10.10.10.236:2181,10.10.10.164:2181/kafka" + ) + val topic = config.getString("topic") + val table = config.getString("table") + val zookeepers = kafkaconfig.get("zookeepers").get + val brokers = kafkaconfig.get("brokers").get + val source = DataSourceProcessor(new KafkaSource(topic, zookeepers,new KafkaStorageFactory(zookeepers, brokers)), 1) + val sink = DataSinkProcessor(new HBaseSink(table, hbaseconfig), 1) + val app = StreamApplication("TAPPipeline", Graph( + source ~> sink + ), UserConfig.empty) + app + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val context = ClientContext(akkaConf) + val appId = context.submit(application(config, context.system)) + context.close() + } + +}