Skip to content

Commit

Permalink
generic transaction storage and unit tests. final cleaned up, needs t…
Browse files Browse the repository at this point in the history
…esting
  • Loading branch information
jayunit100 committed Dec 11, 2014
1 parent 9c4fa7f commit b3a1cf6
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 7 deletions.
20 changes: 14 additions & 6 deletions src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sparkapps.ctakes.TwitterInputDStreamCTakes
import sparkapps.tweetstream._
import twitter4j.auth.{OAuthSupport, Authorization}
import scala.runtime.ScalaRunTime._
Expand Down Expand Up @@ -160,14 +161,21 @@ object Driver {
val sCon = new SparkContext(conf)
val ssCon = new StreamingContext(sCon, Seconds(intervalSecs))

/**
* Here is the logic of the entire application.
* We use the generic streaming utility to do the
* spark streaming glue.
*/
TwitterAppTemplate.startStream(
conf,
//fix this line, then app should be testable against cassandra.
{ ssc=>sparkapps.tweetstream.TwitterInputDStreamCTakes(
ssc,
Utils.getAuth,
null,
1)_ },
/**
* The function which creates the DStream, given a context.
*/
{ssc=> TwitterInputDStreamCTakes(ssc, Utils.getAuth, null, 1)_ },
/**
* The function which Process outputs from the DStream,
* given a RDD and a sparkConfiguration.
*/
{
(transactions,sparkConf) =>
//assumes session.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sparkapps.tweetstream
package sparkapps.ctakes

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/sparkapps/tweetstream/TwitterUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import sparkapps.ctakes.TwitterInputDStreamCTakes
import twitter4j.Status
import twitter4j.auth.Authorization

Expand Down

0 comments on commit b3a1cf6

Please sign in to comment.