#Instructions for the workshop
#connect to owens ssh [email protected]
#copy files cp -r ~soottikkal/workshop/Oct17-Bigdata ./
#check files cd Oct17-Bigdata ls
request 1 interactive node qsub -I -l nodes=1:ppn=28 -l walltime=04:00:00 -A PZS0687
#launch spark module load spark/2.0.0 pyspark --executor-memory 4G --driver-memory 4G
#Example 1: Unstructured data
#create a RDD #make sure that the path is correct
data = sc.textFile(“Oct17-Bigdata/spark/README.md”) #count number of lines
data.count() 99 #see the content of the RDD
data.take(3) [u’# Apache Spark’, u’’, u’Spark is a fast and general cluster computing system for Big Data. It provides’] data.collect() #check data type
type(data) <class ‘pyspark.rdd.RDD’> #transformation of RDD
linesWithSpark = data.filter(lambda line: “Spark” in line) #action on RDD linesWithSpark.count() 19 #combining transformation and actions
data.filter(lambda line: “Spark” in line).count() 19 #wordcount
wordCounts = data.flatMap(lambda line: line.split()).map(lambda word: (word,1)).reduceByKey(lambda a, b: a+b) wordCounts.collect() #Example 2: Structured data
#About the data: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99
#load data and run basic operations #make sure that the path is correct #ignore warnings
data=spark.read.csv(“Oct17-Bigdata/spark/data.csv”, header=‘TRUE’) data.count() 494021 data.take(1) [Row(dst_bytes=u’5450’, duration=u’0’, flag=u’SF’, protocal_type=u’tcp’, service=u’http’, src_bytes=u’181’)] data.take(3) [Row(dst_bytes=u’5450’, duration=u’0’, flag=u’SF’, protocal_type=u’tcp’, service=u’http’, src_bytes=u’181’), Row(dst_bytes=u’486’, duration=u’0’, flag=u’SF’, protocal_type=u’tcp’, service=u’http’, src_bytes=u’239’), Row(dst_bytes=u’1337’, duration=u’0’, flag=u’SF’, protocal_type=u’tcp’, service=u’http’, src_bytes=u’235’)] data.printSchema() root |-- dst_bytes: long (nullable = true) |-- duration: long (nullable = true) |-- flag: string (nullable = true) |-- protocal_type: string (nullable = true) |-- service: string (nullable = true) |-- src_bytes: long (nullable = true) data.show(5) ±--------±-------±—±------------±------±--------+ |dst_bytes|duration|flag|protocal_type|service|src_bytes| ±--------±-------±—±------------±------±--------+ | 5450| 0| SF| tcp| http| 181| | 486| 0| SF| tcp| http| 239| | 1337| 0| SF| tcp| http| 235| | 1337| 0| SF| tcp| http| 219| | 2032| 0| SF| tcp| http| 217| ±--------±-------±—±------------±------±--------+ only showing top 5 rows data.select(“dst_bytes”,“flag”).show(5) ±--------±—+ |dst_bytes|flag| ±--------±—+ | 5450| SF| | 486| SF| | 1337| SF| | 1337| SF| | 2032| SF| ±--------±—+ only showing top 5 rows data.filter(data.flag!=“SF”).show(5) ±--------±-------±—±------------±------±--------+ |dst_bytes|duration|flag|protocal_type|service|src_bytes| ±--------±-------±—±------------±------±--------+ | 29200| 0| S1| tcp| http| 228| | 9156| 0| S1| tcp| http| 212| | 0| 0| REJ| tcp| other| 0| | 0| 0| REJ| tcp| other| 0| | 0| 0| REJ| tcp| other| 0| ±--------±-------±—±------------±------±--------+ only showing top 5 rows data.select(“protocal_type”, “duration”, “dst_bytes”).groupBy(“protocal_type”).count().show() ±------------±-----+ |protocal_type| count| ±------------±-----+ | tcp|190065| | udp| 20354| | icmp|283602| ±------------±-----+ data.select(“protocal_type”, “duration”, “dst_bytes”).filter(data.duration>1000).filter(data.dst_bytes==0).groupBy(“protocal_type”).count().show() ±------------±----+ |protocal_type|count| ±------------±----+ | tcp| 139| ±------------±----+ #SparkSQL querries #Register data as a table named “interactions”
data.registerTempTable(“interactions”) Select tcp network interactions with more than 1 second duration and no transfer from destination tcp = sqlContext.sql(" SELECT duration, dst_bytes FROM interactions WHERE protocal_type =‘tcp’ AND duration>1000 AND dst_bytes=0") tcp.show(5) #exit from the pyspark shell exit()
#exit form the compute node exit
#submitting batch job
#check files ls cd Oct17-Bigdata cd spark ls cat stati.py cat sql.py cat stati.pbs cat sql.pbs
#submitg first job qsub sql.pbs
#submit second job qsub stati.pbs
#checking job status qstat
make sure backtick character(Above the ‘tab’ key) entered correctly qstat | grep whoami ls
#check out log files after Jobs are completed more stati.log more sql.log
#submitting hadoop jobs cd …/ cd hadoop qsub sub-wordcount.pbs qsub sub-grep.pbs
#check job status qstat | grep whoami
#check out results after Jobs are completed cd wordcout-out more part-r-00000
cd …/ cd grep-out more part-r-00000
exit
You can also: