From b3a1cf6e0e4a9056887d8c5f33c32a173de7d6dd Mon Sep 17 00:00:00 2001 From: jayunit100 Date: Wed, 10 Dec 2014 19:10:27 -0500 Subject: [PATCH] generic transaction storage and unit tests. final cleaned up, needs testing --- .../ctakes/CTakesTwitterStreamingApp.scala | 20 +++++++++++++------ .../TwitterInputDStreamCTakes.scala | 2 +- .../sparkapps/tweetstream/TwitterUtils.scala | 1 + 3 files changed, 16 insertions(+), 7 deletions(-) rename src/main/scala/sparkapps/{tweetstream => ctakes}/TwitterInputDStreamCTakes.scala (99%) diff --git a/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala b/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala index 90e9c0b..58370a5 100755 --- a/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala +++ b/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala @@ -11,6 +11,7 @@ import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} +import sparkapps.ctakes.TwitterInputDStreamCTakes import sparkapps.tweetstream._ import twitter4j.auth.{OAuthSupport, Authorization} import scala.runtime.ScalaRunTime._ @@ -160,14 +161,21 @@ object Driver { val sCon = new SparkContext(conf) val ssCon = new StreamingContext(sCon, Seconds(intervalSecs)) + /** + * Here is the logic of the entire application. + * We use the generic streaming utility to do the + * spark streaming glue. + */ TwitterAppTemplate.startStream( conf, - //fix this line, then app should be testable against cassandra. - { ssc=>sparkapps.tweetstream.TwitterInputDStreamCTakes( - ssc, - Utils.getAuth, - null, - 1)_ }, + /** + * The function which creates the DStream, given a context. + */ + {ssc=> TwitterInputDStreamCTakes(ssc, Utils.getAuth, null, 1)_ }, + /** + * The function which Process outputs from the DStream, + * given a RDD and a sparkConfiguration. + */ { (transactions,sparkConf) => //assumes session. diff --git a/src/main/scala/sparkapps/tweetstream/TwitterInputDStreamCTakes.scala b/src/main/scala/sparkapps/ctakes/TwitterInputDStreamCTakes.scala similarity index 99% rename from src/main/scala/sparkapps/tweetstream/TwitterInputDStreamCTakes.scala rename to src/main/scala/sparkapps/ctakes/TwitterInputDStreamCTakes.scala index 946a365..b862155 100644 --- a/src/main/scala/sparkapps/tweetstream/TwitterInputDStreamCTakes.scala +++ b/src/main/scala/sparkapps/ctakes/TwitterInputDStreamCTakes.scala @@ -1,4 +1,4 @@ -package sparkapps.tweetstream +package sparkapps.ctakes import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel diff --git a/src/main/scala/sparkapps/tweetstream/TwitterUtils.scala b/src/main/scala/sparkapps/tweetstream/TwitterUtils.scala index 60041f1..e335bd5 100644 --- a/src/main/scala/sparkapps/tweetstream/TwitterUtils.scala +++ b/src/main/scala/sparkapps/tweetstream/TwitterUtils.scala @@ -4,6 +4,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream +import sparkapps.ctakes.TwitterInputDStreamCTakes import twitter4j.Status import twitter4j.auth.Authorization