From 9c4fa7f77523872666a3fe9915e95fb2aa5ce5c6 Mon Sep 17 00:00:00 2001 From: jayunit100 Date: Wed, 10 Dec 2014 17:16:19 -0500 Subject: [PATCH] generic transaction storage and unit tests. wip --- .../sparkapps/ctakes/CTakesExample.scala | 60 ----------------- .../ctakes/CTakesTwitterStreamingApp.scala | 67 +++++++++---------- ...nalyzer.scala => CtakesTermAnalyzer.scala} | 4 ++ .../sparkapps/ctakes/MockTwitterWorker.scala | 19 ------ .../ArgParser.scala} | 3 +- ...eamCTakes.scala => MockInputDStream.scala} | 2 +- ...ocessor.scala => TwitterAppTemplate.scala} | 6 +- .../TwitterInputDStreamCTakes.scala | 31 +++------ .../TwitterUtils.scala} | 13 ++-- .../{ctakes => tweetstream}/Utils.scala | 12 ++-- src/test/scala/TestStreaming.scala | 6 +- 11 files changed, 62 insertions(+), 161 deletions(-) delete mode 100644 src/main/scala/sparkapps/ctakes/CTakesExample.scala rename src/main/scala/sparkapps/ctakes/{TermAnalyzer.scala => CtakesTermAnalyzer.scala} (91%) delete mode 100644 src/main/scala/sparkapps/ctakes/MockTwitterWorker.scala rename src/main/scala/sparkapps/{ctakes/Parser.scala => tweetstream/ArgParser.scala} (98%) rename src/main/scala/sparkapps/tweetstream/{MockInputDStreamCTakes.scala => MockInputDStream.scala} (97%) rename src/main/scala/sparkapps/tweetstream/{Processor.scala => TwitterAppTemplate.scala} (93%) rename src/main/scala/sparkapps/{ctakes => tweetstream}/TwitterInputDStreamCTakes.scala (86%) rename src/main/scala/sparkapps/{ctakes/TwitterUtilsCTakes.scala => tweetstream/TwitterUtils.scala} (95%) rename src/main/scala/sparkapps/{ctakes => tweetstream}/Utils.scala (90%) diff --git a/src/main/scala/sparkapps/ctakes/CTakesExample.scala b/src/main/scala/sparkapps/ctakes/CTakesExample.scala deleted file mode 100644 index 3e5810a..0000000 --- a/src/main/scala/sparkapps/ctakes/CTakesExample.scala +++ /dev/null @@ -1,60 +0,0 @@ -package sparkapps.ctakes - -import java.text.BreakIterator - -import opennlp.tools.postag.POSTagger -import opennlp.tools.sentdetect.{SentenceDetectorME, SentenceModel, SentenceDetector} -import org.apache.ctakes.assertion.medfacts.cleartk.PolarityCleartkAnalysisEngine -import org.apache.ctakes.clinicalpipeline.ClinicalPipelineFactory.{RemoveEnclosedLookupWindows, CopyNPChunksToLookupWindowAnnotations} -import org.apache.ctakes.constituency.parser.ae.ConstituencyParser -import org.apache.ctakes.contexttokenizer.ae.ContextDependentTokenizerAnnotator -import org.apache.ctakes.core.ae.{TokenizerAnnotatorPTB, SimpleSegmentAnnotator} -import org.apache.ctakes.dependency.parser.ae.{ClearNLPSemanticRoleLabelerAE, ClearNLPDependencyParserAE} -import org.apache.ctakes.dependency.parser.ae.ClearNLPDependencyParserAE._ -import org.apache.ctakes.dictionary.lookup.ae.UmlsDictionaryLookupAnnotator -import org.apache.ctakes.typesystem.`type`.syntax.BaseToken -import org.apache.ctakes.typesystem.`type`.textsem.IdentifiedAnnotation -import org.apache.uima.analysis_engine.{AnalysisEngine, AnalysisEngineDescription} -import org.apache.uima.jcas.JCas -import org.cleartk.chunker.Chunker -import org.uimafit.factory.{AnalysisEngineFactory, AggregateBuilder, JCasFactory} -import org.uimafit.pipeline.SimplePipeline -import org.uimafit.util.JCasUtil - -import scala.collection.JavaConverters._ - - -object CTakesExample { - - def getDefaultPipeline():AnalysisEngine = { - val builder = new AggregateBuilder - builder.add(SimpleSegmentAnnotator.createAnnotatorDescription()); - builder.add(org.apache.ctakes.core.ae.SentenceDetector.createAnnotatorDescription()); - builder.add(TokenizerAnnotatorPTB.createAnnotatorDescription()); - builder.add(ContextDependentTokenizerAnnotator.createAnnotatorDescription()); - builder.add(org.apache.ctakes.postagger.POSTagger.createAnnotatorDescription()); - builder.add(org.apache.ctakes.chunker.ae.Chunker.createAnnotatorDescription()); - builder.add(AnalysisEngineFactory.createPrimitiveDescription(classOf[CopyNPChunksToLookupWindowAnnotations])); - builder.add(AnalysisEngineFactory.createPrimitiveDescription(classOf[RemoveEnclosedLookupWindows])); - //builder.add(UmlsDictionaryLookupAnnotator.createAnnotatorDescription()); builder.add(PolarityCleartkAnalysisEngine.createAnnotatorDescription()); return builder.createAggregateDescription(); } - builder.createAggregate() - } - - def main(args: Array[String]) { - val aed:AnalysisEngine= getDefaultPipeline(); - val jcas:JCas = JCasFactory.createJCas(); - jcas.setDocumentText("The patient is suffering from extreme pain due to shark bite. Recommend continuing use of aspirin, oxycodone, and coumadin. atient denies smoking and chest pain. Patient has no cancer. There is no sign of multiple sclerosis. Continue exercise for obesity and hypertension. "); - - SimplePipeline.runPipeline(jcas, aed); - - //Print out the tokens and Parts of Speech - - val iter = JCasUtil.select(jcas,classOf[BaseToken]).iterator() - System.out.println(iter.hasNext); - while(iter.hasNext) - { - val entity = iter.next(); - System.out.println(entity.getCoveredText + " " + entity.getPartOfSpeech); - } - } -} diff --git a/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala b/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala index c932aa8..90e9c0b 100755 --- a/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala +++ b/src/main/scala/sparkapps/ctakes/CTakesTwitterStreamingApp.scala @@ -2,6 +2,7 @@ package sparkapps.ctakes import java.io.File +import com.datastax.spark.connector.cql.CassandraConnector import com.google.gson.Gson import com.google.gson._ import jregex.Pattern @@ -10,6 +11,8 @@ import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} +import sparkapps.tweetstream._ +import twitter4j.auth.{OAuthSupport, Authorization} import scala.runtime.ScalaRunTime._ /** * Collect at least the specified number of tweets into json text files. @@ -154,43 +157,33 @@ object Driver { val conf = new SparkConf() .setAppName(this.getClass.getSimpleName+""+System.currentTimeMillis()) .setMaster("local[2]") - val sc = new SparkContext(conf) - val ssc = new StreamingContext(sc, Seconds(intervalSecs)) - - /** - * - */ - val tweetStream = TwitterUtilsCTakes.createStream( - ssc, - Utils.getAuth, - Seq("medical"), - StorageLevel.MEMORY_ONLY) - .map(gson.toJson(_)) - .filter(!_.contains("boundingBoxCoordinates"))//SPARK-3390 - - var checks = 0; - tweetStream.foreachRDD(rdd => { - val outputRDD = rdd.repartition(partitionsEachInterval) - System.out.println(rdd.count()); - numTweetsCollected += rdd.count() - System.out.println("\n\n\n PROGRESS ::: "+numTweetsCollected + " so far, out of " + numTweetsToCollect + " \n\n\n "); - if (numTweetsCollected > numTweetsToCollect) { - ssc.stop() - sc.stop(); - System.exit(0) + val sCon = new SparkContext(conf) + val ssCon = new StreamingContext(sCon, Seconds(intervalSecs)) + + TwitterAppTemplate.startStream( + conf, + //fix this line, then app should be testable against cassandra. + { ssc=>sparkapps.tweetstream.TwitterInputDStreamCTakes( + ssc, + Utils.getAuth, + null, + 1)_ }, + { + (transactions,sparkConf) => + //assumes session. + CassandraConnector(sparkConf).withSessionDo { + session => { + val x=1 + Thread.sleep(1) + transactions.foreach({ + xN => + val xNtxt=xN.toString+" "+xN.getText; + session.executeAsync(s"INSERT INTO streaming_test.key_value (key, value) VALUES ('$xNtxt' , $x)")} + ) + true; + } + } } - }) - - /** - * This is where we invoke CTakes. For your CTAkes implementation, you would change the logic here - * to do something like store results to a file, or do a more sophisticated series of tasks. - */ - val stream = tweetStream.map( - x => - System.out.println(" " + CtakesTermAnalyzer.analyze(x))); - - stream.print(); - ssc.start() - ssc.awaitTermination() + ) } } diff --git a/src/main/scala/sparkapps/ctakes/TermAnalyzer.scala b/src/main/scala/sparkapps/ctakes/CtakesTermAnalyzer.scala similarity index 91% rename from src/main/scala/sparkapps/ctakes/TermAnalyzer.scala rename to src/main/scala/sparkapps/ctakes/CtakesTermAnalyzer.scala index e75d085..3a5badb 100644 --- a/src/main/scala/sparkapps/ctakes/TermAnalyzer.scala +++ b/src/main/scala/sparkapps/ctakes/CtakesTermAnalyzer.scala @@ -52,4 +52,8 @@ object CtakesTermAnalyzer { //return the iterator. JCasUtil.select(jcas,classOf[BaseToken]).iterator() } + + def main(args: Array[String]): Unit = { + System.out.println(analyze("The patient might have diabetes. His blood sugare and glucose levels are off the charts. Symptoms include naseau and light headedness.")) + } } diff --git a/src/main/scala/sparkapps/ctakes/MockTwitterWorker.scala b/src/main/scala/sparkapps/ctakes/MockTwitterWorker.scala deleted file mode 100644 index b6e0551..0000000 --- a/src/main/scala/sparkapps/ctakes/MockTwitterWorker.scala +++ /dev/null @@ -1,19 +0,0 @@ -package sparkapps.ctakes - -/** - * Created by jayunit100 on 11/16/14. - */ -class MockTwitterWorker(callback:(twitter4j.Status)=>Unit) { - - def run() = { - //keep running , kill it in onStop(), so that we mimick a real world stream. - (1 to 100000).toStream.takeWhile(x => x < 5).foreach( - i => { - System.out.println("storing mock status " + i) - Thread.sleep(100); - //callback(newStatus(i)) - //store(newStatus(i)) - }) - } - -} diff --git a/src/main/scala/sparkapps/ctakes/Parser.scala b/src/main/scala/sparkapps/tweetstream/ArgParser.scala similarity index 98% rename from src/main/scala/sparkapps/ctakes/Parser.scala rename to src/main/scala/sparkapps/tweetstream/ArgParser.scala index c616fd6..a1fcef6 100644 --- a/src/main/scala/sparkapps/ctakes/Parser.scala +++ b/src/main/scala/sparkapps/tweetstream/ArgParser.scala @@ -1,4 +1,5 @@ -package sparkapps.ctakes +package sparkapps.tweetstream + import scala.collection.immutable._; /** * A simple utility for parsing arguments. diff --git a/src/main/scala/sparkapps/tweetstream/MockInputDStreamCTakes.scala b/src/main/scala/sparkapps/tweetstream/MockInputDStream.scala similarity index 97% rename from src/main/scala/sparkapps/tweetstream/MockInputDStreamCTakes.scala rename to src/main/scala/sparkapps/tweetstream/MockInputDStream.scala index e400308..33b690f 100644 --- a/src/main/scala/sparkapps/tweetstream/MockInputDStreamCTakes.scala +++ b/src/main/scala/sparkapps/tweetstream/MockInputDStream.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.receiver.Receiver * tweet processor, which should take a RececiverInputDSTream[Status] as input, * and process tweets. */ -case class MockInputDStreamCTakes(sec:Long)(@transient ssc_ : StreamingContext) +case class MockInputDStream(sec:Long)(@transient ssc_ : StreamingContext) extends ReceiverInputDStream[Status](ssc_) { override def slideDuration(): Duration = { diff --git a/src/main/scala/sparkapps/tweetstream/Processor.scala b/src/main/scala/sparkapps/tweetstream/TwitterAppTemplate.scala similarity index 93% rename from src/main/scala/sparkapps/tweetstream/Processor.scala rename to src/main/scala/sparkapps/tweetstream/TwitterAppTemplate.scala index 0b6d8e1..41ee573 100644 --- a/src/main/scala/sparkapps/tweetstream/Processor.scala +++ b/src/main/scala/sparkapps/tweetstream/TwitterAppTemplate.scala @@ -3,7 +3,7 @@ package sparkapps.tweetstream import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} -import sparkapps.ctakes.{CtakesTermAnalyzer, Utils, TwitterUtilsCTakes} +import sparkapps.ctakes.CtakesTermAnalyzer import com.google.gson.Gson import com.google.gson._ import jregex.Pattern @@ -29,7 +29,7 @@ import twitter4j.Status * Meanwhile, the production app will create implicits for real data * which gets put into hbase. */ -object TwitterStreamingApp { +object TwitterAppTemplate { import org.apache.spark.rdd; val total=10; @@ -48,7 +48,7 @@ object TwitterStreamingApp { //Without this, nothing will execute: Streaming context's require an attached consumer to run. - TwitterUtilsCTakes.createStream( + TwitterUtils.createStream( ssc, Utils.getAuth, Seq("medical"), diff --git a/src/main/scala/sparkapps/ctakes/TwitterInputDStreamCTakes.scala b/src/main/scala/sparkapps/tweetstream/TwitterInputDStreamCTakes.scala similarity index 86% rename from src/main/scala/sparkapps/ctakes/TwitterInputDStreamCTakes.scala rename to src/main/scala/sparkapps/tweetstream/TwitterInputDStreamCTakes.scala index 8624409..946a365 100644 --- a/src/main/scala/sparkapps/ctakes/TwitterInputDStreamCTakes.scala +++ b/src/main/scala/sparkapps/tweetstream/TwitterInputDStreamCTakes.scala @@ -1,25 +1,13 @@ -package sparkapps.ctakes - -import java.nio.ByteBuffer -import java.util.Date -import java.util.concurrent.{Callable, FutureTask} - -import org.apache.ctakes.core.fsm.token.BaseToken -import org.apache.uima.analysis_engine.AnalysisEngineDescription -import org.apache.uima.jcas.JCas -import org.uimafit.factory.JCasFactory -import org.uimafit.pipeline.SimplePipeline -import org.uimafit.util.JCasUtil -import twitter4j._ -import twitter4j.auth.Authorization -import twitter4j.conf.ConfigurationBuilder -import twitter4j.auth.OAuthAuthorization +package sparkapps.tweetstream +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.Logging import org.apache.spark.streaming.receiver.Receiver +import twitter4j._ +import twitter4j.auth.{Authorization, OAuthAuthorization} +import twitter4j.conf.ConfigurationBuilder @@ -32,11 +20,10 @@ import org.apache.spark.streaming.receiver.Receiver * If no Authorization object is provided, initializes OAuth authorization using the system * properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. */ -class TwitterInputDStreamCTakes( - @transient ssc_ : StreamingContext, +case class TwitterInputDStreamCTakes( + @transient ssc_ : StreamingContext, twitterAuth: Option[Authorization], filters: Seq[String], - storageLevel: StorageLevel, slideSeconds: Int) extends ReceiverInputDStream[Status](ssc_) { override def slideDuration(): Duration = { @@ -51,7 +38,7 @@ class TwitterInputDStreamCTakes( private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) override def getReceiver(): Receiver[Status] = { - new TwitterReceiver(authorization, filters, storageLevel) + new TwitterReceiver(authorization, filters, StorageLevel.MEMORY_AND_DISK) } } diff --git a/src/main/scala/sparkapps/ctakes/TwitterUtilsCTakes.scala b/src/main/scala/sparkapps/tweetstream/TwitterUtils.scala similarity index 95% rename from src/main/scala/sparkapps/ctakes/TwitterUtilsCTakes.scala rename to src/main/scala/sparkapps/tweetstream/TwitterUtils.scala index 80d1171..60041f1 100644 --- a/src/main/scala/sparkapps/ctakes/TwitterUtilsCTakes.scala +++ b/src/main/scala/sparkapps/tweetstream/TwitterUtils.scala @@ -1,17 +1,16 @@ -package sparkapps.ctakes +package sparkapps.tweetstream -import org.apache.spark.streaming.twitter.TwitterInputDStream -import twitter4j.Status -import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import twitter4j.Status +import twitter4j.auth.Authorization /** * Moddified and borrowed from databricks spark tutorial. */ -object TwitterUtilsCTakes { +object TwitterUtils { /** * Create a input stream that returns tweets received from Twitter. * @param ssc StreamingContext object diff --git a/src/main/scala/sparkapps/ctakes/Utils.scala b/src/main/scala/sparkapps/tweetstream/Utils.scala similarity index 90% rename from src/main/scala/sparkapps/ctakes/Utils.scala rename to src/main/scala/sparkapps/tweetstream/Utils.scala index 1db066a..be1d875 100644 --- a/src/main/scala/sparkapps/ctakes/Utils.scala +++ b/src/main/scala/sparkapps/tweetstream/Utils.scala @@ -1,10 +1,7 @@ -package sparkapps.ctakes +package sparkapps.tweetstream import java.util.Date -import org.apache.commons.cli.{Options, ParseException, PosixParser} -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.feature.HashingTF import twitter4j._ import twitter4j.auth.OAuthAuthorization import twitter4j.conf.ConfigurationBuilder @@ -19,10 +16,9 @@ object Utils { * verifies that each input in the string passes the tester function. * prints error and exists if not. */ - def checkpoint( - tester : Any => Boolean, - error : Any => Unit, - inputs: List[String]): Unit = { + def checkpoint(tester : Any => Boolean, + error : Any => Unit, + inputs: List[String]): Unit = { System.out.println("~~~~~~ Checkpoint ~~~~~") def test(failures:Int, tests : List[String]):Boolean= { tests match { diff --git a/src/test/scala/TestStreaming.scala b/src/test/scala/TestStreaming.scala index 02a71e7..b167605 100644 --- a/src/test/scala/TestStreaming.scala +++ b/src/test/scala/TestStreaming.scala @@ -1,6 +1,6 @@ import com.datastax.spark.connector.cql.CassandraConnector import org.apache.spark.SparkConf -import sparkapps.tweetstream.{TwitterStreamingApp, MockInputDStreamCTakes, Processor} +import sparkapps.tweetstream.{TwitterAppTemplate, TwitterStreamingApp, MockInputDStream, Processor} /** * @@ -44,9 +44,9 @@ class TestTwitterETL { } - TwitterStreamingApp.startStream( + TwitterAppTemplate.startStream( conf, - sparkapps.tweetstream.MockInputDStreamCTakes(1)_, // <-- how to make this curried? + sparkapps.tweetstream.MockInputDStream(1)_, // <-- how to make this curried? { (transactions,sparkConf) => //assumes session.