diff --git a/.travis.yml b/.travis.yml index 11360975..dfe77865 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,9 @@ language: scala scala: -- 2.10.4 -- 2.11.5 +- 2.10.6 +- 2.11.11 jdk: -- oraclejdk7 -- openjdk6 -- openjdk7 +- openjdk8 script: - sbt test deploy: @@ -13,7 +11,7 @@ deploy: provider: script script: ./.travis/deploy.sh $TRAVIS_TAG on: - condition: '"${TRAVIS_SCALA_VERSION}" == "2.11.5" && "${TRAVIS_JDK_VERSION}" == "oraclejdk7"' + condition: '"${TRAVIS_SCALA_VERSION}" == "2.11.11" && "${TRAVIS_JDK_VERSION}" == "openjdk8"' tags: true env: global: diff --git a/CHANGELOG b/CHANGELOG index ef1b1174..b3becdc4 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,18 @@ +Version 0.4.0 (2017-10-18) +-------------------------- +Add Scala 2.12 support (#51) +Add a note in Scaladoc about queue capacity (#16) +Add stress-testing module (#60) +Allow to provide ExecutionContext for async emitters (#57) +Bump Scala 2.11 to 2.11.11 (#54) +Bump SBT to 0.13.16 (#53) +Bump sbt-bintray to 0.5.1 (#59) +Drop Scala 2.9 support (#50) +Deprecate trackUnstructEvent (#43) +Remove blocking calls (#55) +Replace Akka with scalaj-http (#49) +Update READMEs markdown in according with CommonMark (#52) + Version 0.3.0 (2016-05-14) -------------------------- Added support for true timestamp (#28) diff --git a/README.md b/README.md index 587f1242..13e91fbd 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,16 @@ # Snowplow Scala Tracker [![Build Status][travis-image]][travis] -[![Release] [release-image]][releases] +[![Release][release-image]][releases] [![License][license-image]][license] ## Overview -Add analytics to your Scala, Akka and Play apps and servers using the **[Snowplow] [snowplow]** event tracker for **[Scala][scala]**. +Add analytics to your Scala, Akka and Play apps and servers using the **[Snowplow][snowplow]** event tracker for **[Scala][scala]**. ## Quickstart -Assuming git, **[Vagrant] [vagrant-install]** and **[VirtualBox] [virtualbox-install]** installed: +Assuming git, **[Vagrant][vagrant-install]** and **[VirtualBox][virtualbox-install]** installed: ```bash host$ git clone https://github.com/snowplow/snowplow-scala-tracker.git @@ -22,16 +22,16 @@ guest$ sbt test ## Find out more -| Technical Docs | Setup Guide | Roadmap | Contributing | -|---------------------------------|---------------------------|-------------------------|-----------------------------------| -| ![i1] [techdocs-image] | ![i2] [setup-image] | ![i3] [roadmap-image] | ![i4] [contributing-image] | -| **[Technical Docs] [techdocs]** | **[Setup Guide] [setup]** | **[Roadmap] [roadmap]** | **[Contributing] [contributing]** | +| Technical Docs | Setup Guide | Roadmap | Contributing | +|--------------------------------|--------------------------|------------------------|----------------------------------| +| ![i1][techdocs-image] | ![i2][setup-image] | ![i3][roadmap-image] | ![i4][contributing-image] | +| **[Technical Docs][techdocs]** | **[Setup Guide][setup]** | **[Roadmap][roadmap]** | **[Contributing][contributing]** | ## Copyright and license -The Snowplow Scala Tracker is copyright 2015-2016 Snowplow Analytics Ltd. +The Snowplow Scala Tracker is copyright 2015-2017 Snowplow Analytics Ltd. -Licensed under the **[Apache License, Version 2.0] [license]** (the "License"); +Licensed under the **[Apache License, Version 2.0][license]** (the "License"); you may not use this software except in compliance with the License. Unless required by applicable law or agreed to in writing, software @@ -46,7 +46,7 @@ limitations under the License. [license-image]: http://img.shields.io/badge/license-Apache--2-blue.svg?style=flat [license]: http://www.apache.org/licenses/LICENSE-2.0 -[release-image]: http://img.shields.io/badge/release-0.3.0-blue.svg?style=flat +[release-image]: http://img.shields.io/badge/release-0.4.0-blue.svg?style=flat [releases]: https://github.com/snowplow/snowplow-scala-tracker/releases [snowplow]: http://snowplowanalytics.com diff --git a/build.sbt b/build.sbt new file mode 100644 index 00000000..6f99de32 --- /dev/null +++ b/build.sbt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +lazy val root = project.in(file(".")) + .settings(Seq[Setting[_]]( + organization := "com.snowplowanalytics", + version := "0.4.0", + description := "Scala tracker for Snowplow", + name := "snowplow-scala-tracker", + description := "Scala analytics SDK for Snowplow", + scalaVersion := "2.11.11", + crossScalaVersions := Seq("2.10.6", "2.11.11", "2.12.3"), + scalacOptions := Seq("-deprecation", "-encoding", "utf8"), + javacOptions ++= Seq("-source", "1.8", "-target", "1.8") + )) + .settings(BuildSettings.buildSettings) + .settings(Seq( + shellPrompt := { _ => name.value + " > " } + )) + .settings( + libraryDependencies := Seq( + Dependencies.Libraries.scalajHttp, + Dependencies.Libraries.json4sJackson, + Dependencies.Libraries.mockito, + Dependencies.Libraries.specs2, + Dependencies.Libraries.scalaCheck) + ) diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index c1ccb6ba..7e5c32ce 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -12,36 +12,28 @@ */ import bintray.BintrayPlugin._ import bintray.BintrayKeys._ + import sbt._ import Keys._ object BuildSettings { - // Basic settings for our app - lazy val basicSettings = Seq[Setting[_]]( - organization := "com.snowplowanalytics", - version := "0.3.0", - description := "Scala tracker for Snowplow", - scalaVersion := "2.10.6", - crossScalaVersions := Seq("2.10.6", "2.11.5"), - scalacOptions := Seq("-deprecation", "-encoding", "utf8"), - resolvers ++= Dependencies.resolutionRepos + // Makes our SBT settings available in runtime + lazy val scalifySettings = Seq( + sourceGenerators in Compile += Def.task { + val file = (sourceManaged in Compile).value / "settings.scala" + IO.write(file, """package com.snowplowanalytics.snowplow.scalatracker.generated + |object ProjectSettings { + | val version = "%s" + | val name = "%s" + | val organization = "%s" + | val scalaVersion = "%s" + |} + |""".stripMargin.format(version.value, name.value, organization.value, scalaVersion.value)) + Seq(file) + }.taskValue ) - // Makes our SBT app settings available from within the ETL - lazy val scalifySettings = Seq(sourceGenerators in Compile <+= (sourceManaged in Compile, version, name, organization, scalaVersion) map { (d, v, n, o, sv) => - val file = d / "settings.scala" - IO.write(file, """package com.snowplowanalytics.snowplow.scalatracker.generated - |object ProjectSettings { - | val version = "%s" - | val name = "%s" - | val organization = "%s" - | val scalaVersion = "%s" - |} - |""".stripMargin.format(v, n, o, sv)) - Seq(file) - }) - // Bintray publishing settings lazy val publishSettings = bintraySettings ++ Seq[Setting[_]]( licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0.html")), @@ -51,7 +43,7 @@ object BuildSettings { // Maven Central publishing settings lazy val mavenCentralExtras = Seq[Setting[_]]( - pomIncludeRepository := { x => false }, + pomIncludeRepository := { _ => false }, homepage := Some(url("http://snowplowanalytics.com")), scmInfo := Some(ScmInfo(url("https://github.com/snowplow/snowplow-scala-tracker"), "scm:git@github.com:snowplow/snowplow-scala-tracker.git")), pomExtra := ( @@ -65,5 +57,5 @@ object BuildSettings { ) ) - lazy val buildSettings = basicSettings ++ scalifySettings ++ publishSettings ++ mavenCentralExtras + lazy val buildSettings = scalifySettings ++ publishSettings ++ mavenCentralExtras } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 929e7451..8da73da5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -11,83 +11,32 @@ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ import sbt._ -import Keys._ object Dependencies { - val resolutionRepos = Seq( - // For Twitter's LRU cache - "Twitter Maven Repo" at "http://maven.twttr.com/", - "Sonatype" at "https://oss.sonatype.org/content/repositories/releases" - ) - object V { - // Java - val commonsLang = "3.1" - val commonsCodec = "1.5" - val jodaTime = "2.3" - val jodaMoney = "0.9" - val jodaConvert = "1.2" - val jackson = "1.9.7" - // Scala - val json4s = "3.2.11" - val spray = "1.3.3" - val akka = "2.3.14" - object collUtil { - val _29 = "5.3.10" - val _210 = "6.3.4" - val _211 = "6.23.0" - } + val scalajHttp = "2.3.0" + val json4s = "3.2.11" // Java (test only) - val mockito = "1.9.5" + val mockito = "1.9.5" // Scala (test only) - object specs2 { - val _29 = "1.12.4.1" - val _210 = "2.3.13" - val _211 = "2.3.13" - } + val specs2 = "3.9.5" + val scalaCheck = "1.13.4" } object Libraries { - // Java - val commonsCodec = "commons-codec" % "commons-codec" % V.commonsCodec - val commonsLang = "org.apache.commons" % "commons-lang3" % V.commonsLang - val jodaTime = "joda-time" % "joda-time" % V.jodaTime - val jodaMoney = "org.joda" % "joda-money" % V.jodaMoney - val jodaConvert = "org.joda" % "joda-convert" % V.jodaConvert - val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % V.jackson - // Scala - val sprayClient = "io.spray" %% "spray-client" % V.spray - val akka = "com.typesafe.akka" %% "akka-actor" % V.akka - val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s - object collUtil { - val _29 = "com.twitter" % "util-collection" % V.collUtil._29 - val _210 = "com.twitter" %% "util-collection" % V.collUtil._210 - val _211 = "com.twitter" %% "util-collection" % V.collUtil._211 - } + val scalajHttp = "org.scalaj" %% "scalaj-http" % V.scalajHttp + val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s // Java (test only) - val mockito = "org.mockito" % "mockito-all" % V.mockito % "test" + val mockito = "org.mockito" % "mockito-all" % V.mockito % "test" // Scala (test only) - object specs2 { - val _29 = "org.specs2" %% "specs2" % V.specs2._29 % "test" - val _210 = "org.specs2" %% "specs2" % V.specs2._210 % "test" - val _211 = "org.specs2" %% "specs2" % V.specs2._211 % "test" - } - val sprayTest = "io.spray" %% "spray-testkit" % V.spray % "test" + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % "test" + val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % "test" } - - def onVersion[A](all: Seq[A] = Seq(), on29: => Seq[A] = Seq(), on210: => Seq[A] = Seq(), on211: => Seq[A] = Seq()) = - scalaVersion(v => all ++ (if (v.contains("2.9.")) { - on29 - } else if (v.contains("2.10.")) { - on210 - } else { - on211 - })) } diff --git a/project/SnowplowTrackerBuild.scala b/project/SnowplowTrackerBuild.scala deleted file mode 100644 index 6758f3f3..00000000 --- a/project/SnowplowTrackerBuild.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2013-2015 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, - * and you may not use this file except in compliance with the Apache License Version 2.0. - * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the Apache License Version 2.0 is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. - */ -import sbt._ -import Keys._ - -object SnowplowTrackerBuild extends Build { - - import Dependencies._ - import BuildSettings._ - - // Configure prompt to show current project - override lazy val settings = super.settings :+ { - shellPrompt := { s => Project.extract(s).currentProject.id + " > " } - } - - // Define our project, with basic project information and library dependencies - lazy val project = Project("snowplow-scala-tracker", file(".")) - .settings(buildSettings: _*) - .settings( - libraryDependencies <++= Dependencies.onVersion( - all = Seq( - Libraries.commonsLang, - Libraries.commonsCodec, - Libraries.jodaTime, - Libraries.jodaConvert, - Libraries.jodaMoney, - Libraries.jackson, - Libraries.json4sJackson, - Libraries.sprayClient, - Libraries.akka, - Libraries.mockito, - Libraries.sprayTest), - on29 = Seq(Libraries.collUtil._29, Libraries.specs2._29), - on210 = Seq(Libraries.collUtil._210, Libraries.specs2._210), - on211 = Seq(Libraries.collUtil._211, Libraries.specs2._211) - ) - ) -} diff --git a/project/build.properties b/project/build.properties index 43b8278c..b7dd3cb2 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.11 +sbt.version=1.0.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 40c4ee2d..0433cb5e 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1 @@ -addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0") \ No newline at end of file +addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.1") diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Ec2Metadata.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Ec2Metadata.scala index cd8f60b6..68fec36f 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Ec2Metadata.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Ec2Metadata.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -12,36 +12,23 @@ */ package com.snowplowanalytics.snowplow.scalatracker -// Scala import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global import scala.util.control.NonFatal +import scala.util.{ Success, Failure } -// Akka -import akka.util.Timeout +import scalaj.http._ -// Spray -import spray.http._ -import spray.client.pipelining._ - -// json4s import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -// This library -import emitters.RequestUtils - /** * Trait with parsing EC2 meta data logic * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html */ object Ec2Metadata { - import RequestUtils.system - import system.dispatcher - val shortTimeout = 10.seconds - implicit val timeout = Timeout(shortTimeout) - val pipeline: HttpRequest => Future[HttpResponse] = sendReceive val instanceIdentitySchema = "iglu:com.amazon.aws.ec2/instance_identity_document/jsonschema/1-0-0" val instanceIdentityUri = "http://169.254.169.254/latest/dynamic/instance-identity/document/" @@ -58,9 +45,10 @@ object Ec2Metadata { /** * Set callback on successful instance identity GET request */ - def initializeContextRequest: Unit = { - getInstanceContextFuture.onSuccess { - case json: SelfDescribingJson => contextSlot = Some(json) + def initializeContextRequest(): Unit = { + getInstanceContextFuture.onComplete { + case Success(json: SelfDescribingJson) => contextSlot = Some(json) + case Failure(error) => System.err.println(s"Unable to retrieve EC2 context. ${error.getMessage}") } } @@ -72,7 +60,7 @@ object Ec2Metadata { */ def getInstanceContextBlocking: Option[SelfDescribingJson] = try { - Some(Await.result(getInstanceContextFuture, 3 seconds)) + Some(Await.result(getInstanceContextFuture, 3.seconds)) } catch { case NonFatal(_) => None @@ -93,16 +81,15 @@ object Ec2Metadata { * @return future JSON object with identity data */ def getInstanceIdentity: Future[JObject] = { - val instanceIdentityDocument = pipeline(Get(instanceIdentityUri)) - instanceIdentityDocument.map(_.entity.asString).map { (resp: String) => + val instanceIdentityDocument = getContent(instanceIdentityUri) + instanceIdentityDocument.map { (resp: String) => parseOpt(resp) match { - case Some(jsonObject: JObject) => { + case Some(jsonObject: JObject) => val prepared = prepareEc2Context(jsonObject) - if (prepared.values.keySet.size == 0) { throw new Exception("Document contains no known keys") } + if (prepared.values.keySet.isEmpty) { throw new RuntimeException("Document contains no known keys") } else { prepared } - } case _ => - throw new Exception("Document can not be parsed") + throw new RuntimeException("Document can not be parsed") } } } @@ -144,13 +131,13 @@ object Ec2Metadata { } /** - * Get URL content (for leaf-link) + * Get string body of URL * * @param url leaf URL (without slash at the end) * @return future value */ private def getContent(url: String): Future[String] = - pipeline(Get(url)).map(_.entity.asString) + Future.apply(Http(url).asString.body) /** * Get content of node-link diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Payload.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Payload.scala index 526a3163..1696205f 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Payload.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Payload.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -12,10 +12,9 @@ */ package com.snowplowanalytics.snowplow.scalatracker -import org.apache.commons.codec.binary.Base64 +import java.util.Base64 import org.json4s._ -import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import scala.collection.mutable.{Map => MMap} @@ -54,11 +53,7 @@ class Payload { } } - /** - * Add a map of key-value pairs one by one - * - * @param dict - */ + /** Add a map of key-value pairs one by one */ def addDict(dict: Map[String, String]): Unit = { dict foreach { case (k, v) => add(k, v) @@ -68,7 +63,7 @@ class Payload { /** * Stringify a JSON and add it * - * @param json + * @param json JSON object to encode * @param encodeBase64 Whether to base 64 encode the JSON * @param typeWhenEncoded Key to use if encodeBase64 is true * @param typeWhenNotEncoded Key to use if encodeBase64 is false @@ -82,7 +77,7 @@ class Payload { val jsonString = compact(render(json)) if (encodeBase64) { - add(typeWhenEncoded, new String(Base64.encodeBase64(jsonString.getBytes(Encoding)), Encoding)) + add(typeWhenEncoded, new String(Base64.getEncoder.encode(jsonString.getBytes(Encoding)), Encoding)) } else { add(typeWhenNotEncoded, jsonString) } diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/SelfDescribingJson.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/SelfDescribingJson.scala index 030c4178..de868cb3 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/SelfDescribingJson.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/SelfDescribingJson.scala @@ -72,5 +72,5 @@ object SelfDescribingJson { * @param data JSON instance */ case class SelfDescribingJson(schema: String, data: JValue) { - def toJObject(): JObject = ("schema" -> schema) ~ ("data" -> data) + def toJObject: JObject = ("schema" -> schema) ~ ("data" -> data) } diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Tracker.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Tracker.scala index ff335b7e..301bf70a 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Tracker.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/Tracker.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -63,7 +63,7 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB * @param payload constructed event map */ private def send(payload: Payload): Unit = { - val event = payload.get + val event = payload.get() emitters foreach { e => e.input(event) } @@ -109,7 +109,7 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB * @return payload with contexts */ private def addContext(payload: Payload, contexts: Seq[SelfDescribingJson]): Payload = { - if (!contexts.isEmpty) { + if (contexts.nonEmpty) { val contextsEnvelope = SelfDescribingJson( "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1", contexts @@ -124,13 +124,29 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB /** * Track a Snowplow unstructured event + * Alias for `trackSelfDescribingEvent` * * @param unstructEvent self-describing JSON for the event * @param contexts list of additional contexts * @param timestamp optional user-provided timestamp (ms) for the event * @return The tracker instance */ + @deprecated("Use Tracker#trackSelfDescribingEvent instead", "0.4.0") def trackUnstructEvent( + unstructEvent: SelfDescribingJson, + contexts: Seq[SelfDescribingJson] = Nil, + timestamp: Option[Timestamp] = None): Tracker = + trackSelfDescribingEvent(unstructEvent, contexts, timestamp) + + /** + * Track a Snowplow self-describing event + * + * @param unstructEvent self-describing JSON for the event + * @param contexts list of additional contexts + * @param timestamp optional user-provided timestamp (ms) for the event + * @return The tracker instance + */ + def trackSelfDescribingEvent( unstructEvent: SelfDescribingJson, contexts: Seq[SelfDescribingJson] = Nil, timestamp: Option[Timestamp] = None): Tracker = { @@ -150,21 +166,6 @@ class Tracker(emitters: Seq[TEmitter], namespace: String, appId: String, encodeB this } - /** - * Track a Snowplow unstructured event - * Alias for [[trackUnstructEvent]] - * - * @param unstructEvent self-describing JSON for the event - * @param contexts list of additional contexts - * @param timestamp optional user-provided timestamp (ms) for the event - * @return The tracker instance - */ - def trackSelfDescribingEvent( - unstructEvent: SelfDescribingJson, - contexts: Seq[SelfDescribingJson] = Nil, - timestamp: Option[Timestamp] = None): Tracker = - trackUnstructEvent(unstructEvent, contexts, timestamp) - /** * Track a Snowplow structured event * diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncBatchEmitter.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncBatchEmitter.scala index ff0d9f3c..102d6eb3 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncBatchEmitter.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncBatchEmitter.scala @@ -10,14 +10,15 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.scalatracker.emitters +package com.snowplowanalytics.snowplow.scalatracker +package emitters -// Java import java.util.concurrent.LinkedBlockingQueue -// Scala import scala.collection.mutable.ListBuffer +import scala.concurrent.ExecutionContext +import RequestUtils.{CollectorRequest, PostCollectorRequest, CollectorParams} object AsyncBatchEmitter { // Avoid starting thread in constructor @@ -28,10 +29,11 @@ object AsyncBatchEmitter { * @param port collector port * @param bufferSize quantity of events in batch request * @param https should this use the https scheme + * @param ec thread pool to send HTTP requests to collector * @return emitter */ - def createAndStart(host: String, port: Int = 80, bufferSize: Int = 50, https: Boolean = false): AsyncBatchEmitter = { - val emitter = new AsyncBatchEmitter(host, port, bufferSize, https = https) + def createAndStart(host: String, port: Int = 80, bufferSize: Int = 50, https: Boolean = false)(implicit ec: ExecutionContext): AsyncBatchEmitter = { + val emitter = new AsyncBatchEmitter(ec, host, port, bufferSize, https = https) emitter.startWorker() emitter } @@ -40,27 +42,28 @@ object AsyncBatchEmitter { /** * Asynchronous batch emitter * Store events in buffer and send them with POST request when buffer exceeds `bufferSize` + * Backed by `java.util.concurrent.LinkedBlockingQueue`, which has + * capacity of `Int.MaxValue` will block thread when buffer reach capacity * * @param host collector host * @param port collector port * @param bufferSize quantity of events in a batch request * @param https should this use the https scheme */ -class AsyncBatchEmitter private(host: String, port: Int, bufferSize: Int, https: Boolean = false) extends TEmitter { +class AsyncBatchEmitter private(ec: ExecutionContext, host: String, port: Int, bufferSize: Int, https: Boolean = false) extends TEmitter { - val queue = new LinkedBlockingQueue[Seq[Map[String, String]]]() - - // 2 seconds timeout after 1st failed request - val initialBackoffPeriod = 2000 + private val queue = new LinkedBlockingQueue[CollectorRequest]() private var buffer = ListBuffer[Map[String, String]]() + private val collector = CollectorParams(host, port, https) + // Start consumer thread synchronously trying to send events to collector val worker = new Thread { - override def run { + override def run() { while (true) { val batch = queue.take() - RequestUtils.retryPostUntilSuccessful(host, batch, port, initialBackoffPeriod, https = https) + RequestUtils.send(queue, ec, collector, batch) } } } @@ -74,10 +77,13 @@ class AsyncBatchEmitter private(host: String, port: Int, bufferSize: Int, https: * @param event Fully assembled event */ def input(event: Map[String, String]): Unit = { - buffer.append(event) - if (buffer.size >= bufferSize) { - queue.put(buffer) - buffer = ListBuffer[Map[String, String]]() + // Multiple threads can input via same tracker and override buffer + buffer.synchronized { + buffer.append(event) + if (buffer.size >= bufferSize) { + queue.put(PostCollectorRequest(1, buffer.toList)) + buffer = ListBuffer[Map[String, String]]() + } } } diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncEmitter.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncEmitter.scala index 88513cb1..0eda79de 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncEmitter.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/AsyncEmitter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -10,24 +10,29 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.scalatracker.emitters +package com.snowplowanalytics.snowplow.scalatracker +package emitters -// Java import java.util.concurrent.LinkedBlockingQueue +import scala.concurrent.ExecutionContext + +import RequestUtils.{CollectorParams, CollectorRequest, GetCollectorRequest} object AsyncEmitter { // Avoid starting thread in constructor /** * Start async emitter with single event payload + * Backed by `java.util.concurrent.LinkedBlockingQueue`, which has + * capacity of `Int.MaxValue` will block thread when buffer reach capacity * * @param host collector host * @param port collector port * @param https should this use the https scheme * @return emitter */ - def createAndStart(host: String, port: Int = 80, https: Boolean = false): AsyncEmitter = { - val emitter = new AsyncEmitter(host, port, https) + def createAndStart(host: String, port: Int = 80, https: Boolean = false)(implicit ec: ExecutionContext): AsyncEmitter = { + val emitter = new AsyncEmitter(ec, host, port, https) emitter.startWorker() emitter } @@ -40,19 +45,17 @@ object AsyncEmitter { * @param port collector port * @param https should this use the https scheme */ -class AsyncEmitter private(host: String, port: Int, https: Boolean = false) extends TEmitter { +class AsyncEmitter private(ec: ExecutionContext, host: String, port: Int, https: Boolean) extends TEmitter { - val queue = new LinkedBlockingQueue[Map[String, String]]() + val queue = new LinkedBlockingQueue[CollectorRequest]() - // 2 seconds timeout after 1st failed request - val initialBackoffPeriod = 2000 + private val collector = CollectorParams(host, port, https) - // TODO: consider move retryGet/PostUntilSuccessful with adding of stm to Emitter (it's not requests logic) val worker = new Thread { - override def run { + override def run() { while (true) { val event = queue.take() - RequestUtils.retryGetUntilSuccessful(host, event, port, initialBackoffPeriod, https = https) + RequestUtils.send(queue, ec, collector, event) } } } @@ -66,7 +69,7 @@ class AsyncEmitter private(host: String, port: Int, https: Boolean = false) exte * @param event Fully assembled event */ def input(event: Map[String, String]): Unit = { - queue.put(event) + queue.put(GetCollectorRequest(1, event)) } private def startWorker(): Unit = { diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/RequestUtils.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/RequestUtils.scala index 13f67cc6..d014cc39 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/RequestUtils.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/RequestUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -13,40 +13,53 @@ package com.snowplowanalytics.snowplow.scalatracker package emitters -// Scala -import scala.util.{ Failure, Success } -import scala.util.control.NonFatal -import scala.concurrent.{ - Future, - Await -} -import scala.concurrent.duration._ - -// Akka -import akka.io.IO -import akka.actor.ActorSystem -import akka.pattern.ask -import akka.util.Timeout - -// Spray -import spray.http._ -import spray.httpx.marshalling.Marshaller -import spray.client.pipelining._ -import spray.can.Http -import spray.util.{ Utils => _, _ } - -// json4s +import java.util.concurrent.BlockingQueue +import java.util.{Timer, TimerTask} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Random, Success} + +import scalaj.http._ + import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -// Config -import com.typesafe.config.ConfigFactory - /** - * Object to hold methods for sending HTTP requests + * Module responsible for communication with collector */ object RequestUtils { + + /** Payload (either GET or POST) ready to be send to collector */ + sealed trait CollectorRequest extends Product with Serializable { + /** Attempt to send */ + def attempt: Int + + /** Increment attempt number. Must be used whenever payload failed */ + def updateAttempt: CollectorRequest = this match { + case g: GetCollectorRequest => g.copy(attempt = attempt + 1) + case p: PostCollectorRequest => p.copy(attempt = attempt + 1) + } + + /** + * Return same payload, but with updated stm + * **Must** be used right before payload goes to collector + */ + def updateStm: CollectorRequest = this match { + case GetCollectorRequest(_, map) => + val stm = System.currentTimeMillis().toString + GetCollectorRequest(attempt, map.updated("stm", stm)) + case PostCollectorRequest(_, list) => + val stm = System.currentTimeMillis().toString + PostCollectorRequest(attempt, list.map(_.updated("stm", stm))) + } + } + + case class GetCollectorRequest(attempt: Int, payload: Map[String, String]) extends CollectorRequest + case class PostCollectorRequest(attempt: Int, payload: List[Map[String, String]]) extends CollectorRequest + + case class CollectorParams(host: String, port: Int, https: Boolean) + // JSON object with Iglu URI to Schema for payload private val payloadBatchStub: JObject = ("schema", "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4") @@ -59,171 +72,76 @@ object RequestUtils { private def postPayload(payload: Seq[Map[String, String]]): String = compact(payloadBatchStub ~ ("data", payload)) - /** - * Marshall batch of events to string payload with application/json - */ - implicit val eventsMarshaller = Marshaller.of[Seq[Map[String,String]]](ContentTypes.`application/json`) { - (value, ct, ctx) => ctx.marshalTo(HttpEntity(ct, postPayload(value))) - } - - implicit val system = ActorSystem( - generated.ProjectSettings.name, - ConfigFactory.parseString("akka.daemonic=on")) - import system.dispatcher // Context for Futures - val longTimeout = 5.minutes - implicit val timeout = Timeout(longTimeout) - val pipeline: HttpRequest => Future[HttpResponse] = sendReceive - - // Close all connections when the application exits - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() { - shutdown() - } - }) - - /** - * Construct GET request with single event payload - * - * @param host URL host (not header) - * @param payload map of event keys - * @param port URL port (not header) - * @param https should this request use the https scheme - * @return HTTP request with event - */ - private[emitters] def constructGetRequest(host: String, payload: Map[String, String], port: Int, https: Boolean = false): HttpRequest = { - val uri = Uri() - .withScheme(Uri.httpScheme(https)) - .withPath(Uri.Path("/i")) - .withAuthority(Uri.Authority(Uri.Host(host), port)) - .withQuery(payload) - Get(uri) - } - /** * Construct POST request with batch event payload * - * @param host URL host (not header) - * @param payload list of events - * @param port URL port (not header) - * @param https should this request use the https scheme + * @param collector endpoint preferences + * @param request events enveloped with either Get or Post request * @return HTTP request with event */ - private[emitters] def constructPostRequest(host: String, payload: Seq[Map[String, String]], port: Int, https: Boolean = false): HttpRequest = { - val uri = Uri() - .withScheme(Uri.httpScheme(https)) - .withPath(Uri.Path("/com.snowplowanalytics.snowplow/tp2")) - .withAuthority(Uri.Authority(Uri.Host(host), port)) - Post(uri, payload) + private[emitters] def constructRequest(collector: CollectorParams, request: CollectorRequest): HttpRequest = { + val scheme = if (collector.https) "https://" else "http://" + request match { + case PostCollectorRequest(_, payload) => + Http(s"$scheme${collector.host}:${collector.port}/com.snowplowanalytics.snowplow/tp2") + .postData(postPayload(payload)) + .header("content-type", "application/json") + case GetCollectorRequest(_, payload) => + val scheme = if (collector.https) "https://" else "http://" + Http(s"$scheme${collector.host}:${collector.port}/i").params(payload) + } } /** - * Attempt a GET request once - * - * @param host collector host - * @param payload event map - * @param port collector port - * @param https should this request use the https scheme - * @return Whether the request succeeded + * Attempt a HTTP request. Return request back to queue + * if it was unsuccessful + * @param ec thread pool to send HTTP requests to collector + * @param originQueue reference to queue, where event can be re-added + * in case of unsuccessful delivery + * @param collector endpoint preferences + * @param payload either GET or POST payload */ - def attemptGet(host: String, payload: Map[String, String], port: Int = 80, https: Boolean = false): Boolean = { - val payloadWithStm = payload ++ Map("stm" -> System.currentTimeMillis().toString) - val req = constructGetRequest(host, payloadWithStm, port, https) - val future = pipeline(req) - val result = Await.ready(future, longTimeout).value.get - result match { - case Success(s) => s.status.isSuccess // 404 match Success(_) too - case Failure(_) => false - } + def send(originQueue: BlockingQueue[CollectorRequest], ec: ExecutionContext, collector: CollectorParams, payload: CollectorRequest): Unit = { + sendAsync(ec, collector, payload).onComplete { + case Success(s) if s.code >= 200 && s.code < 300 => () + case _ => backToQueue(originQueue, payload.updateAttempt) + }(ec) } /** - * Attempt a GET request until success or 10th try - * Double backoff period after each failed try - * - * @param host collector host - * @param payload event map - * @param port collector port - * @param backoffPeriod How long to wait after first failed request - * @param attempt accumulated value of tries - * @param https should this request use the https scheme - */ - def retryGetUntilSuccessful( - host: String, - payload: Map[String, String], - port: Int = 80, - backoffPeriod: Long, - attempt: Int = 1, - https: Boolean = false) { - - val getSuccessful = try { - attemptGet(host, payload, port, https) - } catch { - case NonFatal(f) => false - } + * Attempt a HTTP request + * @param ec thread pool to send HTTP requests to collector + * @param collector endpoint preferences + * @param payload either GET or POST payload + */ + def sendAsync(ec: ExecutionContext, collector: CollectorParams, payload: CollectorRequest): Future[HttpResponse[_]] = + Future(constructRequest(collector, payload.updateStm).asBytes)(ec) - if (!getSuccessful && attempt < 10) { - Thread.sleep(backoffPeriod) - retryGetUntilSuccessful(host, payload, port, backoffPeriod * 2, attempt + 1, https) - } - } + /** Timer thread, responsible for adding failed payloads to queue after delay */ + private val timer = new Timer("snowplow-event-retry-timer", true) - /** - * Attempt a POST request once - * - * @param host collector host - * @param payload event map - * @param port collector port - * @param https should this request use the https scheme - * @return Whether the request succeeded - */ - def attemptPost(host: String, payload: Seq[Map[String, String]], port: Int = 80, https: Boolean = false): Boolean = { - val stm = System.currentTimeMillis().toString - val payloadWithStm = payload.map(_ ++ Map("stm" -> stm)) - val req = constructPostRequest(host, payloadWithStm, port, https) - val future = pipeline(req) - val result = Await.ready(future, longTimeout).value.get - result match { - case Success(s) => s.status.isSuccess // 404 match Success(_) too - case Failure(_) => false - } - } + /** RNG to generate back-off periods */ + private val rng = new Random() /** - * Attempt a POST request until success or 10th try - * Double backoff period after each failed try - * - * @param host collector host - * @param payload event map - * @param port collector port - * @param backoffPeriod How long to wait after first failed request - * @param attempt accumulated value of tries - * @param https should this request use the https scheme - */ - def retryPostUntilSuccessful( - host: String, - payload: Seq[Map[String, String]], - port: Int = 80, - backoffPeriod: Long, - attempt: Int = 1, - https: Boolean = false) { - - val getSuccessful = try { - attemptPost(host, payload, port, https) - } catch { - case NonFatal(f) => false - } - - if (!getSuccessful && attempt < 10) { - Thread.sleep(backoffPeriod) - retryPostUntilSuccessful(host, payload, port, backoffPeriod * 2, attempt + 1, https) + * Schedule re-adding of a failed event to queue after some delay. + * Delay is calculated based on number of undertaken attempts + */ + def backToQueue(queue: BlockingQueue[CollectorRequest], event: CollectorRequest): Unit = { + if (event.attempt > 10) System.err.println("Snowplow Scala Tracker gave up trying to send a payload to collector after 10 attempts") + else { + val task = new TimerTask { + override def run(): Unit = queue.put(event) + } + val delay = getDelay(event.attempt) + timer.schedule(task, delay) } } - /** - * Close the actor system and all connections - */ - def shutdown() { - IO(Http).ask(Http.CloseAll)(1.second).await - system.shutdown + /** Get delay with increased non-linear back-off period */ + private def getDelay(attempt: Int): Int = { + val rangeMin = attempt.toDouble + val rangeMax = attempt.toDouble * 3 + ((rangeMin + (rangeMax - rangeMin) * rng.nextDouble()) * 1000).toInt } } diff --git a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/SyncEmitter.scala b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/SyncEmitter.scala index ef3e932c..9684a56c 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/SyncEmitter.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow/scalatracker/emitters/SyncEmitter.scala @@ -10,18 +10,37 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. */ -package com.snowplowanalytics.snowplow.scalatracker.emitters +package com.snowplowanalytics.snowplow.scalatracker +package emitters + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.global +import scala.concurrent.duration._ +import scala.util.Failure + +import RequestUtils.{ GetCollectorRequest, CollectorParams } /** - * Blocking emitter - * - * @param host - * @param port - * @param https + * Blocking emitter. + * This emitter blocks whole thread (from global execution context) + * for specified amount of time. Use at own risk + * @param host collector host + * @param port collector port + * @param https whether to use HTTPS + * @param blockingDuration amount of time to wait (block) for response */ -class SyncEmitter(host: String, port: Int = 80, https: Boolean = false) extends TEmitter { +class SyncEmitter(host: String, port: Int = 80, https: Boolean = false, blockingDuration: Duration = 5.seconds) extends TEmitter { + + private val collector = CollectorParams(host, port, https) def input(event: Map[String, String]): Unit = { - RequestUtils.attemptGet(host, event, port, https) + val response = RequestUtils.sendAsync(global, collector, GetCollectorRequest(1, event)) + Await.ready(response, blockingDuration).value match { + case None => + System.err.println(s"Snowplow SyncEmitter failed to get response in $blockingDuration") + case Some(Failure(f)) => + System.err.println(s"Snowplow SyncEmitter failed send event: ${f.getMessage}") + case _ => () + } } } diff --git a/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/SelfDescribingJsonSpec.scala b/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/SelfDescribingJsonSpec.scala index 869166d7..4bcc3175 100644 --- a/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/SelfDescribingJsonSpec.scala +++ b/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/SelfDescribingJsonSpec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved. + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -13,9 +13,7 @@ package com.snowplowanalytics.snowplow.scalatracker // json4s -import org.json4s._ import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ // Specs2 import org.specs2.mutable.Specification diff --git a/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/StressTest.scala b/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/StressTest.scala new file mode 100644 index 00000000..9760af12 --- /dev/null +++ b/src/test/scala/com.snowplowanalytics.snowplow.scalatracker/StressTest.scala @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2015-2017 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.scalatracker + +import java.io.FileWriter + +import org.json4s.{JArray, JObject, JString, JValue} +import org.json4s.jackson.JsonMethods.{compact, parseOpt} +import org.json4s.JsonAST.JDouble + +import org.scalacheck.Gen + +import Tracker.{DeviceCreatedTimestamp, Timestamp, TrueTimestamp} + +import com.snowplowanalytics.snowplow.scalatracker.emitters.AsyncBatchEmitter + +/** + * Ad-hoc load testing + */ +object StressTest { + + /** ADT for all possible event types Tracker can accept */ + sealed trait EventArguments + case class PageView(url: String, title: Option[String], referrer: Option[String], contexts: Option[List[SelfDescribingJson]], timestamp: Option[Timestamp]) extends EventArguments + + // Parser typeclass. Useless so far + trait Read[A] { def reads(line: String): A } + + object Read { + def apply[A: Read] = implicitly[Read[A]] + } + + implicit val sdJsonsRead = new Read[List[SelfDescribingJson]] { + def parseJson(json: JValue): SelfDescribingJson = { + json match { + case JObject(fields) => + val map = fields.toMap + map.get("schema") match { + case Some(JString(schema)) => SelfDescribingJson(schema, map("data")) + } + } + } + + def reads(line: String): List[SelfDescribingJson] = + parseOpt(line) match { + case Some(JArray(list)) => list.map(parseJson) + case None => Nil + } + } + + implicit val tstmpRead = new Read[Option[Timestamp]] { + def reads(line: String): Option[Timestamp] = { + line.split(":").toList match { + case List("ttm", tstamp) => Some(TrueTimestamp(tstamp.toLong)) + case List("dtm", tstamp) => Some(DeviceCreatedTimestamp(tstamp.toLong)) + case _ => None + } + } + } + + implicit val eventRead = new Read[EventArguments] { + def reads(line: String): EventArguments = { + val cols = line.split("\t", -1).lift + (cols(0), cols(1)) match { + case (Some("pv"), Some(url)) => + val ctx: Option[List[SelfDescribingJson]] = cols(4).map(sdJsonsRead.reads) + val timestamp = cols(5).flatMap(tstmpRead.reads) + PageView(url, cols(2), cols(3), ctx, timestamp) + } + } + } + + // Generate valid pseudo-URL + val urlGen = for { + protocol <- Gen.oneOf(List("http://", "https://")) + port <- Gen.oneOf(List("", ":80", ":8080", ":443", ":10505")) + + lengthDomain <- Gen.choose(1, 3) + topDomain <- Gen.oneOf(List("com", "ru", "co.uk", "org", "mobi", "by")) + domainList <- Gen.containerOfN[List, String](lengthDomain, Gen.alphaLowerStr) + + lengthUrl <- Gen.choose(0, 5) + urlList <- Gen.containerOfN[List, String](lengthUrl, Gen.alphaNumStr) + url = new java.net.URL(protocol + domainList.mkString(".") + s".$topDomain" + port + "/" + urlList.mkString("/")) + } yield url + + // Generate geolocation context + val geoLocationGen = for { + latitude <- Gen.choose[Double](-90, 90) + longitude <- Gen.choose[Double](-180, 180) + data = JObject("latitude" -> JDouble(latitude), "longitude" -> JDouble(longitude)) + sd = SelfDescribingJson("iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-1-0", data) + } yield sd + + // Generate timestamp + val timestampGen = for { + tstType <- Gen.option(Gen.oneOf(List(TrueTimestamp.apply _, DeviceCreatedTimestamp.apply _))) + tstamp <- Gen.choose[Long](1508316432L - (2 * 365 * 86400), 1508316432L) + result <- tstType.map { x => x(tstamp) } + } yield result + + // Generate whole pageview event + val pageViewGen = for { + url <- urlGen.map(_.toString) + title <- Gen.option(Gen.alphaNumStr) + referrer <- Gen.option(urlGen.map(_.toString)) + ctx <- Gen.option(geoLocationGen.map(x => List(x))) + tstamp <- timestampGen + } yield PageView(url, title, referrer, ctx, tstamp) + + def writeContext(sd: List[SelfDescribingJson]): String = + compact(JArray(sd.map(s => s.toJObject))) + + def writeTimestamp(tstamp: Timestamp): String = tstamp match { + case TrueTimestamp(tst) => s"ttm:$tst" + case DeviceCreatedTimestamp(tst) => s"dtm:$tst" + } + + def writeEvent(event: PageView) = + s"pv\t${event.url}\t${event.title.getOrElse("")}\t${event.referrer.getOrElse("")}\t${event.contexts.map(writeContext).getOrElse("")}\t${event.timestamp.map(writeTimestamp).getOrElse("")}" + + def write(path: String, cardinality: Int): Unit = { + var i = 0 + val fw = new FileWriter(path) + while (i < cardinality) { + pageViewGen.sample.map(writeEvent) match { + case Some(line) => fw.write(line + "\n") + case None => () + } + i = i + 1 + } + } + + /** + * Thread imitating application's work thread that has access to tracker + * Constructor blocks until events are not loaded into memory + */ + class TrackerThread(path: String, tracker: Tracker) { + // It can take some time + val events = scala.io.Source.fromFile(path).getLines().map(Read[EventArguments].reads).toList + + println(s"TrackerThread for $path initialized") + + def getWorker: Thread = { + val worker = new Thread { + private var i = 0 + override def run() { + events.foreach { + case PageView(url, title, referrer, contexts, timestamp) => + tracker.trackPageView(url, title, referrer, contexts.getOrElse(Nil), timestamp) + i = i + 1 + if (i % 1000 == 0) { + println(s"One more 1000 from $path") + } + } + println(s"TrackerThread for $path sent $i events") + i = 0 + } + } + worker.setDaemon(true) + worker + } + } + + /** + * Main method. Starts specified amount of separate threads sharing a tracker, + * each reading its own file and sending events via the same tracker. + * All threads should be prepared (parse events and store them in memory) during + * construction. When function returns - its ready to be started by foreach(_.run()) + * ``` + * println(System.currentTimeMillis) + * res0.foreach(_.run()) + * res0.foreach(_.join()) + * println(System.currentTimeMillis) + * ``` + * + * @param collector single collector for all threads + * @param dir directory with temporary event TSVs + * @param cardinality amount of events in each TSV + * @param threads amount of parallel threads + * @return list of threads + */ + def testAsyncBatch(collector: String, port: Int, dir: String, cardinality: Int, threads: Int = 1) = { + import scala.concurrent.ExecutionContext.Implicits.global + + val files = List.fill(threads)(dir).zipWithIndex.map { case (path, i) => s"$path/events-$i.tsv" } + files.foreach { file => write(file, cardinality) } + println(s"Writing to files completed. ${files.mkString(", ")}") + + val emitter = AsyncBatchEmitter.createAndStart(collector, port, bufferSize = 10) + val tracker = new Tracker(List(emitter), "test-tracker-ns", "test-app") + + files.map(file => new TrackerThread(file, tracker).getWorker) + } +} +