Skip to content

Commit

Permalink
Merge branch 'release/0.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderdean committed Oct 14, 2015
2 parents e3baa65 + 45dc36b commit 2aa8ce3
Show file tree
Hide file tree
Showing 14 changed files with 588 additions and 107 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
Version 0.2.0 (2015-10-14)
--------------------------
Added a custom context based on EC2 instance metadata (#2)
Added POST request support (#11)
Added support for page view events (#12)
Added support for structured events (#13)
Added dvce_sent_tstamp (#15)
Timestamp units changed to milliseconds (#17)
Dependencies versions bumped (#21)
Removed infinite loop of failed requests (#24)

Version 0.1.0 (2015-05-28)
--------------------------
Initial release
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ limitations under the License.
[license-image]: http://img.shields.io/badge/license-Apache--2-blue.svg?style=flat
[license]: http://www.apache.org/licenses/LICENSE-2.0

[release-image]: http://img.shields.io/badge/release-0.1.0-blue.svg?style=flat
[release-image]: http://img.shields.io/badge/release-0.2.0-blue.svg?style=flat
[releases]: https://github.com/snowplow/snowplow-scala-tracker/releases

[snowplow]: http://snowplowanalytics.com
Expand Down
6 changes: 3 additions & 3 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ object BuildSettings {
// Basic settings for our app
lazy val basicSettings = Seq[Setting[_]](
organization := "com.snowplowanalytics",
version := "0.1.0",
version := "0.2.0",
description := "Scala tracker for Snowplow",
scalaVersion := "2.10.4",
crossScalaVersions := Seq("2.10.4", "2.11.5"),
scalaVersion := "2.10.6",
crossScalaVersions := Seq("2.10.6", "2.11.5"),
scalacOptions := Seq("-deprecation", "-encoding", "utf8"),
resolvers ++= Dependencies.resolutionRepos
)
Expand Down
7 changes: 4 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ object Dependencies {
// Scala
val scalaUtil = "0.1.0"
val json4s = "3.2.11"
val sprayClient = "1.3.2"
val akka = "2.3.5"
val spray = "1.3.3"
val akka = "2.3.14"
object collUtil {
val _29 = "5.3.10"
val _210 = "6.3.4"
Expand Down Expand Up @@ -65,7 +65,7 @@ object Dependencies {

// Scala
val scalaUtil = "com.snowplowanalytics" % "scala-util" % V.scalaUtil
val sprayClient = "io.spray" %% "spray-client" % V.sprayClient
val sprayClient = "io.spray" %% "spray-client" % V.spray
val akka = "com.typesafe.akka" %% "akka-actor" % V.akka
val json4sJackson = "org.json4s" %% "json4s-jackson" % V.json4s
object collUtil {
Expand All @@ -83,6 +83,7 @@ object Dependencies {
val _210 = "org.specs2" %% "specs2" % V.specs2._210 % "test"
val _211 = "org.specs2" %% "specs2" % V.specs2._211 % "test"
}
val sprayTest = "io.spray" %% "spray-testkit" % V.spray % "test"
}

def onVersion[A](all: Seq[A] = Seq(), on29: => Seq[A] = Seq(), on210: => Seq[A] = Seq(), on211: => Seq[A] = Seq()) =
Expand Down
3 changes: 2 additions & 1 deletion project/SnowplowTrackerBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ object SnowplowTrackerBuild extends Build {
Libraries.json4sJackson,
Libraries.sprayClient,
Libraries.akka,
Libraries.mockito),
Libraries.mockito,
Libraries.sprayTest),
on29 = Seq(Libraries.collUtil._29, Libraries.specs2._29),
on210 = Seq(Libraries.collUtil._210, Libraries.specs2._210),
on211 = Seq(Libraries.collUtil._211, Libraries.specs2._211)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright (c) 2015 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.scalatracker

// Scala
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.util.control.NonFatal

// Akka
import akka.util.Timeout

// Spray
import spray.http._
import spray.client.pipelining._

// json4s
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// This library
import emitters.RequestUtils

/**
* Trait with parsing EC2 meta data logic
* http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
*/
object Ec2Metadata {
import RequestUtils.system
import system.dispatcher
val shortTimeout = 10.seconds
implicit val timeout = Timeout(shortTimeout)
val pipeline: HttpRequest => Future[HttpResponse] = sendReceive

val instanceIdentitySchema = "iglu:com.amazon.aws.ec2/instance_identity_document"
val instanceIdentityUri = "http://169.254.169.254/latest/dynamic/instance-identity/document/"

private var contextSlot: Option[SelfDescribingJson] = None

/**
* Get context stored in mutable variable
*
* @return some context or None in case of any error or not completed request
*/
def context: Option[SelfDescribingJson] = contextSlot

/**
* Set callback on successful instance identity GET request
*/
def initializeContextRequest: Unit = {
getInstanceContextFuture.onSuccess {
case json: SelfDescribingJson => contextSlot = Some(json)
}
}

/**
* Tries to make blocking request to EC2 instance identity document
* On EC2 request takes ~6ms, while on non-EC2 box it blocks thread for 3 second
*
* @return some context or None in case of any error including 3 sec timeout
*/
def getInstanceContextBlocking: Option[SelfDescribingJson] =
try {
Some(Await.result(getInstanceContextFuture, 3 seconds))
}
catch {
case NonFatal(_) => None
}

/**
* Tries to GET self-describing JSON with instance identity
* or timeout after 10 seconds
*
* @return future JSON with identity data
*/
def getInstanceContextFuture: Future[SelfDescribingJson] =
getInstanceIdentity.map(SelfDescribingJson(instanceIdentitySchema, _))

/**
* Tries to GET instance identity document for EC2 instance
*
* @return future JSON object with identity data
*/
def getInstanceIdentity: Future[JObject] = {
val instanceIdentityDocument = pipeline(Get(instanceIdentityUri))
instanceIdentityDocument.map(_.entity.asString).map { (resp: String) =>
parseOpt(resp) match {
case Some(jsonObject: JObject) => {
val prepared = prepareEc2Context(jsonObject)
if (prepared.values.keySet.size == 0) { throw new Exception("Document contains no known keys") }
else { prepared }
}
case _ =>
throw new Exception("Document can not be parsed")
}
}
}

/**
* Recursively parse AWS EC2 instance metadata to get whole metadata
*
* @param url full url to the endpoint (usually http://169.254.169.254/latest/meta-data/)
* @return future JSON object with metadata
*/
def getMetadata(url: String): Future[JObject] = {
val key = url.split("/").last
if (!url.endsWith("/")) { // Leaf
getContent(url).map { value => key -> JString(value) }
} else { // Node
val sublinks = getContents(url)
val subnodes: Future[List[JObject]] = sublinks.flatMap { links =>
Future.sequence { links.map { link => getMetadata(url + link) } }
}
val mergedObject = subnodes.map { _.fold(JObject(Nil))(_.merge(_)) }
mergedObject.map(key -> _)
}
}

// URL regex to for `transformUrl`
private val publicKey = ".*/latest/meta-data/public-keys/(\\d+)\\=[A-Za-z0-9-_]+$".r

/**
* Handle URL which should be handled in different ways
* e.g. we can't GET public-keys/0-key-name, we should change it to public-keys/0
* to get data
*
* @param url current URL
* @return modified URL if we're trying to get on of special cases
*/
def transformUrl(url: String): String = url match {
case publicKey(i) => (url.split("/").dropRight(1) :+ i).mkString("/") + "/"
case _ => url
}

/**
* Get URL content (for leaf-link)
*
* @param url leaf URL (without slash at the end)
* @return future value
*/
private def getContent(url: String): Future[String] =
pipeline(Get(url)).map(_.entity.asString)

/**
* Get content of node-link
*
* @param url node url (with slash at the end)
* @return future list of sublinks
*/
private def getContents(url: String): Future[List[String]] =
getContent(url).map(_.split('\n').toList)

// all keys of current instance identity schema
private val instanceIdentityKeys = Set(
"architecture", "accountId", "availabilityZone", "billingProducts",
"devpayProductCodes", "imageId", "instanceId", "instanceType", "kernelId",
"pendingTime", "privateIp", "ramdiskId", "region", "version")

/**
* Make sure EC2 context contains only keys known
* at iglu:com.amazon.aws.ec2/instance_identity_document
*
* @param context JSON object with EC2 context
* @return true if object is context
*/
private def prepareEc2Context(context: JObject): JObject =
context.filterField {
case (key, _) => instanceIdentityKeys.contains(key)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,34 @@ class Payload {
/**
* Add a key-value pair
*
* @param name
* @param value
* @param name parameter name
* @param value parameter value
*/
def add(name: String, value: String) {
def add(name: String, value: String): Unit = {
if (!name.isEmpty && name != null && !value.isEmpty && value != null) {
nvPairs += (name -> value)
}
}

/**
* Overloaded add function for Option. Don't modify payload for None
*
* @param name parameter name
* @param value optional parameter value
*/
def add(name: String, value: Option[String]): Unit = {
value match {
case Some(v) => add(name, v)
case None =>
}
}

/**
* Add a map of key-value pairs one by one
*
* @param dict
*/
def addDict(dict: Map[String, String]) {
def addDict(dict: Map[String, String]): Unit = {
dict foreach {
case (k, v) => add(k, v)
}
Expand All @@ -64,7 +77,7 @@ class Payload {
json: JObject,
encodeBase64: Boolean,
typeWhenEncoded: String,
typeWhenNotEncoded: String) {
typeWhenNotEncoded: String): Unit = {

val jsonString = compact(render(json))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@ package com.snowplowanalytics.snowplow.scalatracker

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

object SelfDescribingJson {

/**
* Create a SelfDescribingJson from the individual components of a schema
*
* @param protocol
* @param vendor
* @param name
* @param format
* @param model
* @param revision
* @param addition
* @param data
* @param protocol schema path protocol
* @param vendor schema vendor
* @param name schema name
* @param format schema format (eg. jsonschema)
* @param model model number of SchemaVer
* @param revision revision number of SchemaVer
* @param addition addition number of SchemaVer
* @param data JSON instance
*/
def apply(
protocol: String,
Expand All @@ -48,7 +47,7 @@ object SelfDescribingJson {
/**
* Convenience method to create an outer unstruct_event self-describing JSON
*
* @param schema
* @param schema the schema string
* @param data Unstructured event self-describing JSOn
*/
def apply(schema: String, data: SelfDescribingJson): SelfDescribingJson = {
Expand All @@ -58,7 +57,7 @@ object SelfDescribingJson {
/**
* Convenience method to turn a sequence of contexts into a self-describing JSON
*
* @param schema
* @param schema the schema string
* @param data Sequence of self-describing JSONs representing custom contexts
*/
def apply(schema: String, data: Seq[SelfDescribingJson]): SelfDescribingJson = {
Expand All @@ -69,8 +68,8 @@ object SelfDescribingJson {
/**
* JSON representing an unstructured event or a custom context
*
* @param schema
* @param data
* @param schema the schema string
* @param data JSON instance
*/
case class SelfDescribingJson(schema: String, data: JValue) {
def toJObject(): JObject = ("schema" -> schema) ~ ("data" -> data)
Expand Down
Loading

0 comments on commit 2aa8ce3

Please sign in to comment.