From 88645b93364f1b2467a26e08343c5649c3cb847d Mon Sep 17 00:00:00 2001 From: suaaa7 Date: Sun, 22 Dec 2019 14:04:31 +0900 Subject: [PATCH 1/5] Add SparkMLLrBatch --- .gitignore | 21 +++++ .jvmopts | 5 + .scalafmt.conf | 11 +++ Makefile | 18 ++++ .../scala/spark/ml/lr/SparkMLLrBatch.scala | 20 ++++ build.sbt | 92 +++++++++++++++++++ project/assembly.sbt | 1 + project/build.properties | 1 + project/plugins.sbt | 10 ++ 9 files changed, 179 insertions(+) create mode 100644 .jvmopts create mode 100644 .scalafmt.conf create mode 100644 Makefile create mode 100644 batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala create mode 100644 build.sbt create mode 100644 project/assembly.sbt create mode 100644 project/build.properties create mode 100644 project/plugins.sbt diff --git a/.gitignore b/.gitignore index 9c07d4a..5ba381f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,23 @@ *.class *.log + +# sbt specific +.cache +.history +.lib +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Scala-IDE specific +.scala_dependencies +.worksheet + +# Docker cache +.cache +.ivy2 +.sbt +.bash_history diff --git a/.jvmopts b/.jvmopts new file mode 100644 index 0000000..ec0ad45 --- /dev/null +++ b/.jvmopts @@ -0,0 +1,5 @@ +-Xms1024m +-Xmx2048m +-XX:ReservedCodeCacheSize=128m +-XX:MaxMetaspaceSize=256m +-Xss2m diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..779c7ce --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,11 @@ +style: defaultWithAlign +maxColumn: 80 +docstrings: JavaDoc +align: most +rewrite { + rules: [AvoidInfix, PreferCurlyFors, RedundantBraces, RedundantParens, SortImports] + redundantBraces.maxLines: 10 +} +align.openParenCallSite: false +align.openParenDefnSite: false +danglingParentheses: true diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e762138 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +.PHONY: dbuild +dbuild: + docker build \ + --build-arg BASE_IMAGE_TAG="8u212-b04-jdk-stretch" \ + --build-arg SBT_VERSION="1.3.5" \ + --build-arg SCALA_VERSION="2.13.1" \ + --build-arg USER_ID=1001 \ + --build-arg GROUP_ID=1001 \ + -t hseeberger/scala-sbt \ + github.com/hseeberger/scala-sbt.git#:debian + +.PHONY: dbash +dbash: + docker run --rm -it -v `pwd`:/root hseeberger/scala-sbt bash + +.PHONY: dassembly +dassembly: + docker run --rm -it -v `pwd`:/root hseeberger/scala-sbt sbt assembly diff --git a/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala b/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala new file mode 100644 index 0000000..b1e0fe9 --- /dev/null +++ b/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala @@ -0,0 +1,20 @@ +package spark.ml.lr + +import com.twitter.app.App +import com.twitter.logging.Logging +import org.apache.spark.sql.SparkSession + +object SparkMLLrBatch extends App with Logging { + def main(): Unit = { + val spark = SparkSession + .builder() + .appName("SparkMLLrBatch") + .getOrCreate() + + log.info("Batch Started") + + log.info("Batch Completed") + + spark.stop() + } +} diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..ffa79c2 --- /dev/null +++ b/build.sbt @@ -0,0 +1,92 @@ +lazy val root = project + .in(file(("."))) + .settings(commonSettings) + .aggregate(core, api, batch) + +lazy val core = project + .in(file("core")) + .settings(commonSettings) + .settings(coreSettings) + +lazy val api = project + .in(file("api")) + .dependsOn(core % "compile->compile; test->test") + .settings(apiSettings) + .settings(commonSettings) + +lazy val batch = project + .in(file("batch")) + .dependsOn(core % "compile->compile; test->test") + .settings(batchSettings) + .settings(commonSettings) + +lazy val commonSettings = Seq( + scalaVersion := "2.11.12", + scalacOptions ++= Seq( + "-deprecation", + "-encoding", "utf-8", + "-explaintypes", + "-feature", + "-language:higherKinds", + ), + scalacOptions in (Compile, console) := (scalacOptions in (Compile, console)).value, + scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value, + + resolvers += Resolver.sonatypeRepo("releases"), + + test in assembly := {}, + + assemblyMergeStrategy in assembly := { + case PathList(ps @ _*) if ps.last.endsWith(".properties") => MergeStrategy.first + case PathList(ps @ _*) if ps.last.endsWith(".class") => MergeStrategy.first + case PathList(ps @ _*) if ps.last == "BUILD" => MergeStrategy.discard + case ".gitkeep" => MergeStrategy.first + case x => + val oldStrategy = (assemblyMergeStrategy in assembly).value + oldStrategy(x) + } +) + +lazy val coreSettings = coreLibraryDependencies ++ coreTestingLibraryDependencies + +lazy val coreLibraryDependencies = Seq( + libraryDependencies ++= Seq( + "com.github.pureconfig" %% "pureconfig" % "0.12.1", + "com.twitter" %% "util-app" % "19.12.0", + "com.twitter" %% "util-logging" % "19.12.0", + "eu.timepit" %% "refined-pureconfig" % "0.9.10", + "net.jafama" % "jafama" % "2.3.1", + ) +) + +lazy val coreTestingLibraryDependencies = Seq( + libraryDependencies ++= Seq( + "org.scalacheck" %% "scalacheck" % "1.14.2" % "test", + "org.scalatest" %% "scalatest" % "3.0.8" % "test", + "org.mockito" % "mockito-core" % "3.1.0" % "test", + ) +) + +lazy val apiSettings = apiLibraryDependencies + +lazy val apiLibraryDependencies = Seq() + +lazy val batchSettings = batchLibraryDependencies + +lazy val sparkVersion = "2.4.4" + +lazy val batchLibraryDependencies = Seq( + libraryDependencies ++= Seq( + "com.treasuredata.client" % "td-client" % "0.9.0", + "org.apache.hadoop" % "hadoop-aws" % "3.2.1" % "provided", + "org.apache.spark" %% "spark-core" % sparkVersion % "provided", + "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + ) +) + +addCommandAlias("fmt", ";scalafmt ;test:scalafmt") + +scalafmtVersion in ThisBuild := "1.5.1" +scalafmtTestOnCompile in ThisBuild := true +scalafmtShowDiff in (ThisBuild, scalafmt) := true diff --git a/project/assembly.sbt b/project/assembly.sbt new file mode 100644 index 0000000..15a88b0 --- /dev/null +++ b/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5") diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..6624da7 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.3.5 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..72d2a7f --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,10 @@ +logLevel := Level.Warn + +resolvers += "twitter-repo" at "https://maven.twttr.com" + +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.12") +addSbtPlugin("com.lucidchart" % "sbt-scalafmt" % "1.16") +addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.5.0") +addSbtPlugin("com.twitter" % "scrooge-sbt-plugin" % "19.10.0") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.4.1") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.7") From c278aac65ac1424db066f3c5fbcc1f88f8e8aaa6 Mon Sep 17 00:00:00 2001 From: suaaa7 Date: Mon, 23 Dec 2019 01:42:41 +0900 Subject: [PATCH 2/5] Add Config & Update Batch --- batch/src/main/resources/application.conf | 7 ++ batch/src/main/resources/dev.conf | 7 ++ .../src/main/resources/log4j-spark.properties | 43 +++++++++++ batch/src/main/scala/spark/ml/Config.scala | 42 +++++++++++ .../scala/spark/ml/entity/Environment.scala | 19 +++++ .../scala/spark/ml/lr/SparkMLLrBatch.scala | 73 +++++++++++++++++++ batch/src/main/scala/spark/ml/package.scala | 5 ++ data/train_data_v1.csv | 30 ++++++++ 8 files changed, 226 insertions(+) create mode 100644 batch/src/main/resources/application.conf create mode 100644 batch/src/main/resources/dev.conf create mode 100644 batch/src/main/resources/log4j-spark.properties create mode 100644 batch/src/main/scala/spark/ml/Config.scala create mode 100644 batch/src/main/scala/spark/ml/entity/Environment.scala create mode 100644 batch/src/main/scala/spark/ml/package.scala create mode 100644 data/train_data_v1.csv diff --git a/batch/src/main/resources/application.conf b/batch/src/main/resources/application.conf new file mode 100644 index 0000000..4d9921f --- /dev/null +++ b/batch/src/main/resources/application.conf @@ -0,0 +1,7 @@ +models { + v1 { + modelPath: "spark/model/model_v1" + trainDataPath: "spark/data/train_data_v1.csv" + modelName: "v1" + } +} diff --git a/batch/src/main/resources/dev.conf b/batch/src/main/resources/dev.conf new file mode 100644 index 0000000..b83a0ac --- /dev/null +++ b/batch/src/main/resources/dev.conf @@ -0,0 +1,7 @@ +include "application.conf" + +environment: "dev" + +s3 { + bucketName: "emr-spark-ap-northeast-1-dev" +} diff --git a/batch/src/main/resources/log4j-spark.properties b/batch/src/main/resources/log4j-spark.properties new file mode 100644 index 0000000..71652d0 --- /dev/null +++ b/batch/src/main/resources/log4j-spark.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Set the default spark-shell log level to WARN. When running the spark-shell, the +# log level for this class is used to overwrite the root logger's log level, so that +# the user can have different defaults for the shell and regular Spark apps. +log4j.logger.org.apache.spark.repl.Main=WARN + +# Settings to quiet third party logs that are too verbose +log4j.logger.org.sparkproject.jetty=WARN +log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO + +# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs +# in SparkSQL with Hive support +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL +log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR + +# Parquet related logging +log4j.logger.org.apache.parquet.CorruptStatistics=ERROR +log4j.logger.parquet.CorruptStatistics=ERROR diff --git a/batch/src/main/scala/spark/ml/Config.scala b/batch/src/main/scala/spark/ml/Config.scala new file mode 100644 index 0000000..957e8a0 --- /dev/null +++ b/batch/src/main/scala/spark/ml/Config.scala @@ -0,0 +1,42 @@ +package spark.ml + +import spark.ml.entity.Environment +import spark.ml.Config.{ModelGroupsConfig, S3Config} +import pureconfig._ +import pureconfig.generic.auto._ +import pureconfig.generic.ProductHint + +final case class Config( + environment: Environment, + models: ModelGroupsConfig, + s3: S3Config +) + +object Config { + implicit val environmentConvert: ConfigConvert[Environment] = + ConfigConvert.viaNonEmptyStringTry[Environment]( + s => Environment.fromString(s).asScala, + e => e.toString + ) + + def load: Config = { + implicit def hint[T]: ProductHint[T] = + ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) + + ConfigSource.default.loadOrThrow[Config] + } + + final case class ModelGroupsConfig( + v1: LrModelConfig + ) + + final case class LrModelConfig( + modelPath: String, + trainDataPath: String, + modelName: String + ) + + final case class S3Config( + bucketName: String + ) +} diff --git a/batch/src/main/scala/spark/ml/entity/Environment.scala b/batch/src/main/scala/spark/ml/entity/Environment.scala new file mode 100644 index 0000000..23144b9 --- /dev/null +++ b/batch/src/main/scala/spark/ml/entity/Environment.scala @@ -0,0 +1,19 @@ +package spark.ml.entity + +import com.twitter.util._ + +sealed abstract class Environment(s: String) { + override val toString: String = s +} + +object Environment { + case object Dev extends Environment("dev") + case object Prod extends Environment("prod") + + def fromString(s: String): Try[Environment] = + s match { + case Dev.toString => Return(Dev) + case Prod.toString => Return(Prod) + case _ => Throw(new Exception(s"Invalid environment: $s")) + } +} diff --git a/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala b/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala index b1e0fe9..a6d4e86 100644 --- a/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala +++ b/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala @@ -2,7 +2,16 @@ package spark.ml.lr import com.twitter.app.App import com.twitter.logging.Logging +import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.feature.{ + OneHotEncoderEstimator, + StringIndexer, + VectorAssembler +} +import org.apache.spark.ml.Pipeline import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ +import spark.ml.config object SparkMLLrBatch extends App with Logging { def main(): Unit = { @@ -13,6 +22,70 @@ object SparkMLLrBatch extends App with Logging { log.info("Batch Started") + val trainSchema = StructType( + Array( + StructField("label", DoubleType, false), + StructField("uid", StringType, false), + StructField("hour", IntegerType, false), + StructField("advertiserId", IntegerType, false), + StructField("campaignId", IntegerType, false), + StructField("adId", IntegerType, false), + StructField("siteId", IntegerType, false), + StructField("c1", IntegerType, false), + StructField("c2", IntegerType, false), + StructField("n1", DoubleType, false), + StructField("n2", DoubleType, false) + ) + ) + + val catFeatures = Array( + "uid", + "hour", + "advertiserId", + "campaignId", + "adId", + "siteId", + "c1", + "c2" + ) + + val trainDF = spark.read + .format("com.databricks.spark.csv") + .option("header", "false") + .schema(trainSchema) + .load(s"s3a://${config.s3.bucketName}/${config.models.v1.trainDataPath}") + + val indexers = + catFeatures.map { name => + new StringIndexer() + .setInputCol(name) + .setOutputCol(s"${name}_indexed") + .setHandleInvalid("keep") + } + + val encoder = new OneHotEncoderEstimator() + .setInputCols(indexers.map(_.getOutputCol)) + .setOutputCols(catFeatures.map(name => s"${name}_processed")) + + val assembler = new VectorAssembler() + .setInputCols(encoder.getOutputCols) + .setOutputCol("features") + + val lr = new LogisticRegression() + .setMaxIter(50) + .setRegParam(0.001) + .setStandardization(false) + + val pipeline = new Pipeline() + .setStages(indexers ++ Array(encoder, assembler, lr)) + + val model = pipeline.fit(trainDF) + model.write + .overwrite() + .save( + s"s3a://${config.s3.bucketName}/${config.models.v1.modelPath}" + ) + log.info("Batch Completed") spark.stop() diff --git a/batch/src/main/scala/spark/ml/package.scala b/batch/src/main/scala/spark/ml/package.scala new file mode 100644 index 0000000..7c3ea51 --- /dev/null +++ b/batch/src/main/scala/spark/ml/package.scala @@ -0,0 +1,5 @@ +package spark + +package object ml { + lazy val config: Config = Config.load +} diff --git a/data/train_data_v1.csv b/data/train_data_v1.csv new file mode 100644 index 0000000..dd6946d --- /dev/null +++ b/data/train_data_v1.csv @@ -0,0 +1,30 @@ +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9 +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,5,5,40,50,5,7,1,0.1,1.1 +0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,7,1,3,13,1,6,2,0.7,1.9 +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,10,2,11,21,2,2,2,0.8,1.7 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,15,3,20,30,3,4,2,0.2,1.5 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,20,5,41,51,4,2,2,0.1,1.1 +0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,23,4,31,41,5,8,2,0.7,1.9 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9 +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,5,5,40,50,5,7,1,0.1,1.1 +0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,7,1,3,13,1,6,2,0.7,1.9 +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,10,2,11,21,2,2,2,0.8,1.7 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,15,3,20,30,3,4,2,0.2,1.5 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,20,5,41,51,4,2,2,0.1,1.1 +0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,23,4,31,41,5,8,2,0.7,1.9 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9 +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,5,5,40,50,5,7,1,0.1,1.1 +0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,7,1,3,13,1,6,2,0.7,1.9 +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,10,2,11,21,2,2,2,0.8,1.7 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,15,3,20,30,3,4,2,0.2,1.5 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,20,5,41,51,4,2,2,0.1,1.1 +0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,23,4,31,41,5,8,2,0.7,1.9 From f32dd8b6150c51d86b6dfaf0a1575917f407819e Mon Sep 17 00:00:00 2001 From: suaaa7 Date: Sat, 28 Dec 2019 22:04:05 +0900 Subject: [PATCH 3/5] Add transformer --- .../ml/transformer/ColRenameTransformer.scala | 39 ++++++++++++++ .../ml/transformer/ConcatTransformer.scala | 49 +++++++++++++++++ .../ml/transformer/IsNotNullTransformer.scala | 50 +++++++++++++++++ .../transformer/LogarithmicTransformer.scala | 54 +++++++++++++++++++ 4 files changed, 192 insertions(+) create mode 100644 batch/src/main/scala/spark/ml/transformer/ColRenameTransformer.scala create mode 100644 batch/src/main/scala/spark/ml/transformer/ConcatTransformer.scala create mode 100644 batch/src/main/scala/spark/ml/transformer/IsNotNullTransformer.scala create mode 100644 batch/src/main/scala/spark/ml/transformer/LogarithmicTransformer.scala diff --git a/batch/src/main/scala/spark/ml/transformer/ColRenameTransformer.scala b/batch/src/main/scala/spark/ml/transformer/ColRenameTransformer.scala new file mode 100644 index 0000000..5bbe58f --- /dev/null +++ b/batch/src/main/scala/spark/ml/transformer/ColRenameTransformer.scala @@ -0,0 +1,39 @@ +package spark.ml.transformer + +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.{Param, ParamMap} +import org.apache.spark.ml.util.{ + DefaultParamsReadable, + DefaultParamsWritable, + Identifiable +} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Dataset} + +class ColRenameTransformer(override val uid: String) + extends Transformer + with DefaultParamsWritable { + + def this() = this(Identifiable.randomUID("ColRenameTransformer")) + def setInputCol(value: String): this.type = set(inputCol, value) + def setOutputCol(value: String): this.type = set(outputCol, value) + def getOutputCol: String = getOrDefault((outputCol)) + + val inputCol = new Param[String](this, "inputCol", "input column") + val outputCol = new Param[String](this, "outputCol", "output column") + + override def transform(dataset: Dataset[_]): DataFrame = { + val inCol = extractParamMap.getOrElse(inputCol, "input") + val outCol = extractParamMap.getOrElse(outputCol, "output") + + dataset.drop(outCol).withColumnRenamed(inCol, outCol) + } + + override def copy(extra: ParamMap): ColRenameTransformer = defaultCopy(extra) + override def transformSchema(schema: StructType): StructType = schema +} + +object ColRenameTransformer + extends DefaultParamsReadable[ColRenameTransformer] { + override def load(path: String): ColRenameTransformer = super.load(path) +} diff --git a/batch/src/main/scala/spark/ml/transformer/ConcatTransformer.scala b/batch/src/main/scala/spark/ml/transformer/ConcatTransformer.scala new file mode 100644 index 0000000..c5ece3e --- /dev/null +++ b/batch/src/main/scala/spark/ml/transformer/ConcatTransformer.scala @@ -0,0 +1,49 @@ +package spark.ml.transformer + +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCol} +import org.apache.spark.ml.util.{ + DefaultParamsReadable, + DefaultParamsWritable, + Identifiable +} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, concat, lit, when} +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +class ConcatTransformer(override val uid: String) + extends Transformer + with HasInputCols + with HasOutputCol + with DefaultParamsWritable { + + def this() = this(Identifiable.randomUID("ConcatTransformer")) + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def copy(extra: ParamMap): ConcatTransformer = + defaultCopy(extra) + + override def transform(dataset: Dataset[_]): DataFrame = { + val inCols = $(inputCols) + dataset.withColumn( + $(outputCol), + concat( + when(col(inCols.head).isNotNull, col(inCols.head)) + .otherwise(lit("null")), + when(col(inCols.last).isNotNull, col(inCols.last)) + .otherwise(lit("null")) + ) + ) + } + + override def transformSchema(schema: StructType): StructType = { + require($(inputCols).length == 2, "InputCols must have 2 cols") + schema.add(StructField($(outputCol), StringType, false)) + } +} + +object ConcatTransformer extends DefaultParamsReadable[ConcatTransformer] { + override def load(path: String): ConcatTransformer = super.load(path) +} diff --git a/batch/src/main/scala/spark/ml/transformer/IsNotNullTransformer.scala b/batch/src/main/scala/spark/ml/transformer/IsNotNullTransformer.scala new file mode 100644 index 0000000..e5447b2 --- /dev/null +++ b/batch/src/main/scala/spark/ml/transformer/IsNotNullTransformer.scala @@ -0,0 +1,50 @@ +package spark.ml.transformer + +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCols} +import org.apache.spark.ml.util.{ + DefaultParamsReadable, + DefaultParamsWritable, + Identifiable +} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, lit, when} +import org.apache.spark.sql.types._ + +class IsNotNullTransformer(override val uid: String) + extends Transformer + with HasInputCols + with HasOutputCols + with DefaultParamsWritable { + + def this() = this(Identifiable.randomUID("IsNotNullTransformer")) + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + override def copy(extra: ParamMap): IsNotNullTransformer = + defaultCopy(extra) + + override def transform(dataset: Dataset[_]): DataFrame = + $(inputCols).zip($(outputCols)).toSeq.foldLeft(dataset.toDF()) { + (df, cols) => + df.withColumn( + cols._2, + when(col(cols._1).isNotNull, lit(1.0)) + .otherwise(lit(0.0)) + ) + } + + override def transformSchema(schema: StructType): StructType = { + val outputFields = $(inputCols).zip($(outputCols)).map { + case (inputCol, outputCol) => + StructField(outputCol, DoubleType, schema(inputCol).nullable) + } + StructType(schema ++ outputFields) + } +} + +object IsNotNullTransformer + extends DefaultParamsReadable[IsNotNullTransformer] { + override def load(path: String): IsNotNullTransformer = super.load(path) +} diff --git a/batch/src/main/scala/spark/ml/transformer/LogarithmicTransformer.scala b/batch/src/main/scala/spark/ml/transformer/LogarithmicTransformer.scala new file mode 100644 index 0000000..e218e61 --- /dev/null +++ b/batch/src/main/scala/spark/ml/transformer/LogarithmicTransformer.scala @@ -0,0 +1,54 @@ +package spark.ml.transformer + +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.{HasInputCols, HasOutputCols} +import org.apache.spark.ml.util.{ + DefaultParamsReadable, + DefaultParamsWritable, + Identifiable +} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, log => sparkLog} +import org.apache.spark.sql.types._ + +class LogarithmicTransformer(override val uid: String) + extends Transformer + with HasInputCols + with HasOutputCols + with DefaultParamsWritable { + + def this() = this(Identifiable.randomUID("LogarithmicTransformer")) + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + override def copy(extra: ParamMap): LogarithmicTransformer = + defaultCopy(extra) + + override def transform(dataset: Dataset[_]): DataFrame = + $(inputCols).zip($(outputCols)).toSeq.foldLeft(dataset.toDF()) { + (df, cols) => + df.withColumn(cols._2, sparkLog(col(cols._1) + 1)) + } + + override def transformSchema(schema: StructType): StructType = { + val fields = schema($(inputCols).toSet) + fields.foreach { fieldSchema => + require( + fieldSchema.dataType.isInstanceOf[NumericType], + s"${fieldSchema.name} does not match numeric type" + ) + } + + val outputFields = $(inputCols).zip($(outputCols)).map { + case (inputCol, outputCol) => + StructField(outputCol, DoubleType, schema(inputCol).nullable) + } + StructType(schema ++ outputFields) + } +} + +object LogarithmicTransformer + extends DefaultParamsReadable[LogarithmicTransformer] { + override def load(path: String): LogarithmicTransformer = super.load(path) +} From 7d6f2ceb172ac0b723107f0a0696553a90b4a5cb Mon Sep 17 00:00:00 2001 From: suaaa7 Date: Wed, 1 Jan 2020 05:55:20 +0900 Subject: [PATCH 4/5] Use transformer in Batch --- .../main/scala/spark/ml/entity/Features.scala | 33 +++++++ .../scala/spark/ml/entity/TrainData.scala | 38 ++++++++ .../scala/spark/ml/lr/SparkMLLrBatch.scala | 92 +++++++++++-------- data/train_data_v1.csv | 60 ++++++------ 4 files changed, 156 insertions(+), 67 deletions(-) create mode 100644 batch/src/main/scala/spark/ml/entity/Features.scala create mode 100644 batch/src/main/scala/spark/ml/entity/TrainData.scala diff --git a/batch/src/main/scala/spark/ml/entity/Features.scala b/batch/src/main/scala/spark/ml/entity/Features.scala new file mode 100644 index 0000000..ae62f8e --- /dev/null +++ b/batch/src/main/scala/spark/ml/entity/Features.scala @@ -0,0 +1,33 @@ +package spark.ml.entity + +import org.apache.spark.ml.linalg.Vector + +object Features { + val catFeatures = Array( + "uid", + "hour", + "advertiserId", + "campaignId", + "adId", + "siteId", + "c1", + "c2" + ) + + val concatFeatures = Array( + Array("campaignId", "adId") -> "ca", + Array("c1", "c2") -> "cc" + ) + + val isNotNullFeatures = Array( + "c3" + ) + + val quaFeatures = Array( + "n1" + ) + + val logFeatures = Array( + "n2" + ) +} diff --git a/batch/src/main/scala/spark/ml/entity/TrainData.scala b/batch/src/main/scala/spark/ml/entity/TrainData.scala new file mode 100644 index 0000000..fa267c7 --- /dev/null +++ b/batch/src/main/scala/spark/ml/entity/TrainData.scala @@ -0,0 +1,38 @@ +package spark.ml.entity + +import org.apache.spark.sql.types._ + +final case class TrainData( + label: Double, + uid: String, + hour: Int, + advertiserId: Int, + campaignId: Int, + adId: Int, + siteId: Int, + c1: Int, + c2: Int, + n1: Double, + n2: Double, + c3: Int +) + +object TrainData { + val schema: StructType = + StructType( + Array( + StructField("label", DoubleType, false), + StructField("uid", StringType, false), + StructField("hour", IntegerType, false), + StructField("advertiserId", IntegerType, false), + StructField("campaignId", IntegerType, false), + StructField("adId", IntegerType, false), + StructField("siteId", IntegerType, false), + StructField("c1", IntegerType, false), + StructField("c2", IntegerType, false), + StructField("n1", DoubleType, false), + StructField("n2", DoubleType, false), + StructField("c3", IntegerType, false) + ) + ) +} diff --git a/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala b/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala index a6d4e86..9b54dfb 100644 --- a/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala +++ b/batch/src/main/scala/spark/ml/lr/SparkMLLrBatch.scala @@ -4,6 +4,7 @@ import com.twitter.app.App import com.twitter.logging.Logging import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{ + Imputer, OneHotEncoderEstimator, StringIndexer, VectorAssembler @@ -12,6 +13,12 @@ import org.apache.spark.ml.Pipeline import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import spark.ml.config +import spark.ml.entity.{Features, TrainData} +import spark.ml.transformer.{ + ConcatTransformer, + IsNotNullTransformer, + LogarithmicTransformer +} object SparkMLLrBatch extends App with Logging { def main(): Unit = { @@ -22,62 +29,73 @@ object SparkMLLrBatch extends App with Logging { log.info("Batch Started") - val trainSchema = StructType( - Array( - StructField("label", DoubleType, false), - StructField("uid", StringType, false), - StructField("hour", IntegerType, false), - StructField("advertiserId", IntegerType, false), - StructField("campaignId", IntegerType, false), - StructField("adId", IntegerType, false), - StructField("siteId", IntegerType, false), - StructField("c1", IntegerType, false), - StructField("c2", IntegerType, false), - StructField("n1", DoubleType, false), - StructField("n2", DoubleType, false) - ) - ) - - val catFeatures = Array( - "uid", - "hour", - "advertiserId", - "campaignId", - "adId", - "siteId", - "c1", - "c2" - ) - val trainDF = spark.read .format("com.databricks.spark.csv") .option("header", "false") - .schema(trainSchema) + .schema(TrainData.schema) .load(s"s3a://${config.s3.bucketName}/${config.models.v1.trainDataPath}") + val concaters = Features.concatFeatures.map { feature => + new ConcatTransformer() + .setInputCols(feature._1) + .setOutputCol(feature._2) + } + val indexers = - catFeatures.map { name => - new StringIndexer() - .setInputCol(name) - .setOutputCol(s"${name}_indexed") - .setHandleInvalid("keep") + (Features.catFeatures ++ (Features.concatFeatures.map(_._2))).map { + name => + new StringIndexer() + .setInputCol(name) + .setOutputCol(s"${name}_indexed") + .setHandleInvalid("keep") } val encoder = new OneHotEncoderEstimator() .setInputCols(indexers.map(_.getOutputCol)) - .setOutputCols(catFeatures.map(name => s"${name}_processed")) + .setOutputCols( + (Features.catFeatures ++ (Features.concatFeatures.map(_._2))) + .map(name => s"${name}_processed") + ) + + val isNotNuller = new IsNotNullTransformer() + .setInputCols(Features.isNotNullFeatures) + .setOutputCols( + Features.isNotNullFeatures.map(name => "${name}_processed") + ) + + val logger = new LogarithmicTransformer() + .setInputCols(Features.logFeatures) + .setOutputCols(Features.logFeatures.map(name => s"${name}_log")) + + val imputer = new Imputer() + .setInputCols(Features.quaFeatures ++ logger.getOutputCols) + .setOutputCols( + (Features.quaFeatures ++ Features.logFeatures) + .map(name => s"${name}_processed") + ) val assembler = new VectorAssembler() - .setInputCols(encoder.getOutputCols) + .setInputCols( + encoder.getOutputCols ++ isNotNuller.getOutputCols ++ imputer.getOutputCols + ) .setOutputCol("features") val lr = new LogisticRegression() - .setMaxIter(50) + .setMaxIter(100) .setRegParam(0.001) .setStandardization(false) + val stages = concaters ++ indexers ++ Array( + encoder, + isNotNuller, + logger, + imputer, + assembler, + lr + ) + val pipeline = new Pipeline() - .setStages(indexers ++ Array(encoder, assembler, lr)) + .setStages(stages) val model = pipeline.fit(trainDF) model.write diff --git a/data/train_data_v1.csv b/data/train_data_v1.csv index dd6946d..3500c6a 100644 --- a/data/train_data_v1.csv +++ b/data/train_data_v1.csv @@ -1,30 +1,30 @@ -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1 -1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9 -1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7 -0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,5,5,40,50,5,7,1,0.1,1.1 -0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,7,1,3,13,1,6,2,0.7,1.9 -1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,10,2,11,21,2,2,2,0.8,1.7 -0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,15,3,20,30,3,4,2,0.2,1.5 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,20,5,41,51,4,2,2,0.1,1.1 -0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,23,4,31,41,5,8,2,0.7,1.9 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1 -1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9 -1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7 -0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,5,5,40,50,5,7,1,0.1,1.1 -0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,7,1,3,13,1,6,2,0.7,1.9 -1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,10,2,11,21,2,2,2,0.8,1.7 -0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,15,3,20,30,3,4,2,0.2,1.5 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,20,5,41,51,4,2,2,0.1,1.1 -0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,23,4,31,41,5,8,2,0.7,1.9 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1 -1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9 -1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7 -0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,5,5,40,50,5,7,1,0.1,1.1 -0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,7,1,3,13,1,6,2,0.7,1.9 -1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,10,2,11,21,2,2,2,0.8,1.7 -0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,15,3,20,30,3,4,2,0.2,1.5 -0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,20,5,41,51,4,2,2,0.1,1.1 -0.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,23,4,31,41,5,8,2,0.7,1.9 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, +1.0,cccccccc-5555-6666-7777-jjjjjjjjjjjj,1,2,10,20,3,3,1,0.8,1.7,324 +0.0,dddddddd-8888-9999-0000-kkkkkkkkkkkk,3,4,30,40,4,1,1,0.2,1.5,443 +0.0,aaaaaaaa-0000-1111-2222-hhhhhhhhhhhh,0,1,1,11,1,9,1,0.1,1.1,1111 +1.0,bbbbbbbb-3333-4444-5555-iiiiiiiiiiii,0,1,2,12,2,5,1,0.7,1.9, From 08e396c3aa8f7e76657aa8c769b2ed0c999d9f2c Mon Sep 17 00:00:00 2001 From: suaaa7 Date: Wed, 1 Jan 2020 05:57:29 +0900 Subject: [PATCH 5/5] Update Makefile for emr --- Makefile | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/Makefile b/Makefile index e762138..340ef8b 100644 --- a/Makefile +++ b/Makefile @@ -16,3 +16,25 @@ dbash: .PHONY: dassembly dassembly: docker run --rm -it -v `pwd`:/root hseeberger/scala-sbt sbt assembly + +.PHONY: upload-jar +upload-jar: + aws s3 cp \ + ./batch/target/scala-2.11/batch-assembly-*.jar \ + s3://$${BUCKET_NAME}/spark/ + +.PHONY: upload-csv +upload-csv: + aws s3 cp \ + ./data/train_data_v1.csv \ + s3://$${BUCKET_NAME}/spark/data/ + +.PHONY: add-steps +add-steps: + aws emr add-steps --cluster-id $${CLUSTER_ID} --steps \ + Type=CUSTOM_JAR,Name=SparkMLLr,ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[spark-submit,--class,spark.ml.lr.SparkMLLrBatch,--deploy-mode,cluster,--master,yarn,--conf,'spark.executor.extraJavaOptions=-Dconfig.resource=dev.conf',--conf,'spark.driver.extraJavaOptions=-Dconfig.resource=dev.conf',s3://$${BUCKET_NAME}/spark/batch-assembly-0.1.0-SNAPSHOT.jar] + +.PHONY: checkenv +checkenv: + @echo $${BUCKET_NAME} + @echo $${CLUSTER_ID}