Skip to content

Commit

Permalink
generic transaction storage and unit tests. wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jayunit100 committed Dec 10, 2014
1 parent b636e6e commit 9c4fa7f
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 161 deletions.
60 changes: 0 additions & 60 deletions src/main/scala/sparkapps/ctakes/CTakesExample.scala

This file was deleted.

67 changes: 30 additions & 37 deletions src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sparkapps.ctakes

import java.io.File

import com.datastax.spark.connector.cql.CassandraConnector
import com.google.gson.Gson
import com.google.gson._
import jregex.Pattern
Expand All @@ -10,6 +11,8 @@ import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import sparkapps.tweetstream._
import twitter4j.auth.{OAuthSupport, Authorization}
import scala.runtime.ScalaRunTime._
/**
* Collect at least the specified number of tweets into json text files.
Expand Down Expand Up @@ -154,43 +157,33 @@ object Driver {
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName+""+System.currentTimeMillis())
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(intervalSecs))

/**
*
*/
val tweetStream = TwitterUtilsCTakes.createStream(
ssc,
Utils.getAuth,
Seq("medical"),
StorageLevel.MEMORY_ONLY)
.map(gson.toJson(_))
.filter(!_.contains("boundingBoxCoordinates"))//SPARK-3390

var checks = 0;
tweetStream.foreachRDD(rdd => {
val outputRDD = rdd.repartition(partitionsEachInterval)
System.out.println(rdd.count());
numTweetsCollected += rdd.count()
System.out.println("\n\n\n PROGRESS ::: "+numTweetsCollected + " so far, out of " + numTweetsToCollect + " \n\n\n ");
if (numTweetsCollected > numTweetsToCollect) {
ssc.stop()
sc.stop();
System.exit(0)
val sCon = new SparkContext(conf)
val ssCon = new StreamingContext(sCon, Seconds(intervalSecs))

TwitterAppTemplate.startStream(
conf,
//fix this line, then app should be testable against cassandra.
{ ssc=>sparkapps.tweetstream.TwitterInputDStreamCTakes(
ssc,
Utils.getAuth,
null,
1)_ },
{
(transactions,sparkConf) =>
//assumes session.
CassandraConnector(sparkConf).withSessionDo {
session => {
val x=1
Thread.sleep(1)
transactions.foreach({
xN =>
val xNtxt=xN.toString+" "+xN.getText;
session.executeAsync(s"INSERT INTO streaming_test.key_value (key, value) VALUES ('$xNtxt' , $x)")}
)
true;
}
}
}
})

/**
* This is where we invoke CTakes. For your CTAkes implementation, you would change the logic here
* to do something like store results to a file, or do a more sophisticated series of tasks.
*/
val stream = tweetStream.map(
x =>
System.out.println(" " + CtakesTermAnalyzer.analyze(x)));

stream.print();
ssc.start()
ssc.awaitTermination()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ object CtakesTermAnalyzer {
//return the iterator.
JCasUtil.select(jcas,classOf[BaseToken]).iterator()
}

def main(args: Array[String]): Unit = {
System.out.println(analyze("The patient might have diabetes. His blood sugare and glucose levels are off the charts. Symptoms include naseau and light headedness."))
}
}
19 changes: 0 additions & 19 deletions src/main/scala/sparkapps/ctakes/MockTwitterWorker.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package sparkapps.ctakes
package sparkapps.tweetstream

import scala.collection.immutable._;
/**
* A simple utility for parsing arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.streaming.receiver.Receiver
* tweet processor, which should take a RececiverInputDSTream[Status] as input,
* and process tweets.
*/
case class MockInputDStreamCTakes(sec:Long)(@transient ssc_ : StreamingContext)
case class MockInputDStream(sec:Long)(@transient ssc_ : StreamingContext)
extends ReceiverInputDStream[Status](ssc_) {

override def slideDuration(): Duration = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sparkapps.tweetstream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}
import sparkapps.ctakes.{CtakesTermAnalyzer, Utils, TwitterUtilsCTakes}
import sparkapps.ctakes.CtakesTermAnalyzer
import com.google.gson.Gson
import com.google.gson._
import jregex.Pattern
Expand All @@ -29,7 +29,7 @@ import twitter4j.Status
* Meanwhile, the production app will create implicits for real data
* which gets put into hbase.
*/
object TwitterStreamingApp {
object TwitterAppTemplate {

import org.apache.spark.rdd;
val total=10;
Expand All @@ -48,7 +48,7 @@ object TwitterStreamingApp {


//Without this, nothing will execute: Streaming context's require an attached consumer to run.
TwitterUtilsCTakes.createStream(
TwitterUtils.createStream(
ssc,
Utils.getAuth,
Seq("medical"),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
package sparkapps.ctakes

import java.nio.ByteBuffer
import java.util.Date
import java.util.concurrent.{Callable, FutureTask}

import org.apache.ctakes.core.fsm.token.BaseToken
import org.apache.uima.analysis_engine.AnalysisEngineDescription
import org.apache.uima.jcas.JCas
import org.uimafit.factory.JCasFactory
import org.uimafit.pipeline.SimplePipeline
import org.uimafit.util.JCasUtil
import twitter4j._
import twitter4j.auth.Authorization
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
package sparkapps.tweetstream

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.spark.streaming.receiver.Receiver
import twitter4j._
import twitter4j.auth.{Authorization, OAuthAuthorization}
import twitter4j.conf.ConfigurationBuilder



Expand All @@ -32,11 +20,10 @@ import org.apache.spark.streaming.receiver.Receiver
* If no Authorization object is provided, initializes OAuth authorization using the system
* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
*/
class TwitterInputDStreamCTakes(
@transient ssc_ : StreamingContext,
case class TwitterInputDStreamCTakes(
@transient ssc_ : StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel,
slideSeconds: Int) extends ReceiverInputDStream[Status](ssc_) {

override def slideDuration(): Duration = {
Expand All @@ -51,7 +38,7 @@ class TwitterInputDStreamCTakes(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())

override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
new TwitterReceiver(authorization, filters, StorageLevel.MEMORY_AND_DISK)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package sparkapps.ctakes
package sparkapps.tweetstream

import org.apache.spark.streaming.twitter.TwitterInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import twitter4j.Status
import twitter4j.auth.Authorization

/**
* Moddified and borrowed from databricks spark tutorial.
*/
object TwitterUtilsCTakes {
object TwitterUtils {
/**
* Create a input stream that returns tweets received from Twitter.
* @param ssc StreamingContext object
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package sparkapps.ctakes
package sparkapps.tweetstream

import java.util.Date

import org.apache.commons.cli.{Options, ParseException, PosixParser}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.HashingTF
import twitter4j._
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
Expand All @@ -19,10 +16,9 @@ object Utils {
* verifies that each input in the string passes the tester function.
* prints error and exists if not.
*/
def checkpoint(
tester : Any => Boolean,
error : Any => Unit,
inputs: List[String]): Unit = {
def checkpoint(tester : Any => Boolean,
error : Any => Unit,
inputs: List[String]): Unit = {
System.out.println("~~~~~~ Checkpoint ~~~~~")
def test(failures:Int, tests : List[String]):Boolean= {
tests match {
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/TestStreaming.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkConf
import sparkapps.tweetstream.{TwitterStreamingApp, MockInputDStreamCTakes, Processor}
import sparkapps.tweetstream.{TwitterAppTemplate, TwitterStreamingApp, MockInputDStream, Processor}

/**
*
Expand Down Expand Up @@ -44,9 +44,9 @@ class TestTwitterETL {
}


TwitterStreamingApp.startStream(
TwitterAppTemplate.startStream(
conf,
sparkapps.tweetstream.MockInputDStreamCTakes(1)_, // <-- how to make this curried?
sparkapps.tweetstream.MockInputDStream(1)_, // <-- how to make this curried?
{
(transactions,sparkConf) =>
//assumes session.
Expand Down

0 comments on commit 9c4fa7f

Please sign in to comment.