Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jayunit100 committed Dec 9, 2014
1 parent 66fe9eb commit b636e6e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/sparkapps/tweetstream/Processor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import twitter4j.Status
* Meanwhile, the production app will create implicits for real data
* which gets put into hbase.
*/
object Processor {
object TwitterStreamingApp {

import org.apache.spark.rdd;
val total=10;
Expand All @@ -38,7 +38,7 @@ object Processor {

def startStream(sparkConf:SparkConf,
stream:(StreamingContext => ReceiverInputDStream[Status]),
etl:(Array[Status],SparkConf)=>Boolean) = {
pluggableETLFunction:(Array[Status],SparkConf)=>Boolean) = {
val sc=new SparkContext(sparkConf);

val ssc=new StreamingContext(sc, Seconds(intervalSecs))
Expand All @@ -56,7 +56,7 @@ object Processor {
.foreachRDD(rdd => {
count+=1
if (count>2) {
etl(rdd.collect(),
pluggableETLFunction(rdd.collect(),
///<<<<<<<<<< TODO ::: is this still NULL
sparkConf)
ssc.stop()
Expand Down
19 changes: 13 additions & 6 deletions src/test/scala/TestStreaming.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.SparkConf
import sparkapps.tweetstream.MockInputDStreamCTakes
import sparkapps.tweetstream.Processor
import sparkapps.tweetstream.{TwitterStreamingApp, MockInputDStreamCTakes, Processor}

/**
*
Expand All @@ -14,10 +13,18 @@ import sparkapps.tweetstream.Processor
* 3) ETLs function is sent as an argument to the Processor.
*
*/
class TestCassandraProcessor {
class TestTwitterETL {


/**
* Add other examples.... i.e. hbase ETL, here.
*/

/**
* Here is an implementation of Cassandra based ETL.
*/
@org.junit.Test
def test(){
def testCassandraETL(){
/**
* FYI This should fail fast for you if cassandra isnt set up right :).
* make sure and turn off iptables if you get a "operation timed out" exception.
Expand All @@ -37,7 +44,7 @@ class TestCassandraProcessor {
}


Processor.startStream(
TwitterStreamingApp.startStream(
conf,
sparkapps.tweetstream.MockInputDStreamCTakes(1)_, // <-- how to make this curried?
{
Expand All @@ -58,7 +65,7 @@ class TestCassandraProcessor {
})
}

sparkapps.tweetstream.Processor.startCassandraStream()
startCassandraStream()
}

}

0 comments on commit b636e6e

Please sign in to comment.