Skip to content
kapsali29 edited this page Jun 5, 2017 · 2 revisions

This project aims on sentiment analysis based on reviews from Amazon's dataset using Python, Scala, SparkML and TextBlob package.

sudo su
$SPARK_HOME/sbin/start-all.sh
$HADOOP_HOME/sbin/start-all.sh
service postgresql start

First of all we will download the data and put them on haddop,after we will transform them from json to parquet format.

wget "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Kindle_Store_5.json.gz"
gunzip reviews_Kindle_Store_5.json.gz
hdfs dfs -put reviews_Kindle_Store_5.json /data/reviews_Kindle_Store_5.json
pyspark

>>> from pyspark.sql import SparkSession
>>> df = spark.read.json("hdfs:///data/reviews_Kindle_Store_5.json")
>>> df.write.parquet("hdfs:///data/kindle_store")

Start Jupyter notebook to write a Python script that will read the parquet file and clear the data, and use SPARK ml to do sentiment analysis in reviews using as training set 90% of data and 10% as test set to evaluate our system.

/root/start-notebook.sh
#!python
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import col, udf

# read parquet data from hadoop
kindle_store = spark.read.parquet("hdfs:///data/kindle_store")
reviews = kindle_store[["reviewText","overall"]]
reviews.show(20)
sqlContext.registerDataFrameAsTable(reviews, "table2")
reviews1 = sqlContext.sql("SELECT reviewText, overall from table2 LIMIT 50000")
#positive->1
#negative->0
def transform(star):
    if star >=3.0:
        return 1.0
    else:
        return 0.0
transformer = udf(transform)

df = reviews1.withColumn("label", transformer(reviews['overall']))
sqlContext.registerDataFrameAsTable(df, "table1")
df2 = sqlContext.sql("SELECT reviewText, label from table1 WHERE reviewText != ''")
(training, test) = df2.randomSplit([0.9, 0.1])
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(training)
regexTokenized.select("reviewText", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized.show(20)

htf = HashingTF(inputCol="words", outputCol="features")
tf = htf.transform(regexTokenized)
train = tf[["label","features"]]

types = [f.dataType for f in train.schema.fields]
types

train2 = train.withColumn("label",train["label"].cast(DoubleType()))
types = [f.dataType for f in train2.schema.fields]
types

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(train2)
test.show()
regexTokenized_test = regexTokenizer.transform(test)
regexTokenized.select("reviewText", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
tf_test = htf.transform(regexTokenized_test)
testSet = tf_test[["label","features"]]
testSet = testSet.withColumn("label",testSet["label"].cast(DoubleType()))
prediction = model.transform(testSet)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.0912548

We then wrote a script in Scala this time, which did the same job with the above Python script, sentiment analysis using SparkML libraries

spark-shell
#!scala

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.classification.NaiveBayes

//Generate SQLContext using the following command. Here, sc means SparkContext object.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

//load parquet file from hadoop file system
val parqfile = sqlContext.read.parquet("hdfs:///data/kindle_store")
//using sql on dataframe 
parqfile.registerTempTable("table1")
val reviews = sqlContext.sql("SELECT reviewText, overall FROM table1  WHERE reviewText != '' LIMIT 50000")
val reviewsdf = reviews.withColumn("label", when(col("overall") >= 3.0, 1.0).otherwise(0.0))

val regexTokenizer = new RegexTokenizer().setInputCol("reviewText").setOutputCol("words").setPattern("\\W")
val countTokens = udf { (words: Seq[String]) => words.length }
val regexTokenized = regexTokenizer.transform(reviewsdf)

regexTokenized.select("reviewText", "words").withColumn("tokens", countTokens(col("words"))).show(false)

val result = regexTokenized.select("label","words")
// split data to training and test 
val splits = result.randomSplit(Array(0.9, 0.1))
val (training, test) = (splits(0), splits(1))

val hashingTF = new HashingTF().setInputCol("words").setOutputCol("features").setNumFeatures(1000)
val tf = hashingTF.transform(training)
val train_set = tf.select("label","features")
val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val pipeline = new Pipeline().setStages(Array(lr))
val model = pipeline.fit(train_set)

val tf_test = hashingTF.transform(test)
val test_set = tf_test.select("label","features")

val predictions = model.transform(test_set)

val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
//Test Error = 0.09660000000000002

//using Naive Bayes classfier
val nb  = new NaiveBayes()
val sec_model = nb.fit(train_set)

val sec_predict = sec_model.transform(test_set)

val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator2.evaluate(sec_predict)
println("Test Error = " + (1.0 - accuracy))
//Test Error = 0.14200000000000002

Naive Bayes Classifier: Test Error = 0.14200000000000002

Logistic Regression Classifier: Test Error = 0.09660000000000002

Below we will use the Textblob library and pySpark, which accepts texts or sentences and outputs polarity, we set for negative polarity negative opinion, for zero polarity neutral opinion and for positive polarity positive opinion. (We set 1->positive, 0->neutral, 2->negative).

We will install textblob library, but first of all we activatw virtualenviroment

source mysparkenv/bin/activate
pip install textblob
pip install simplejson
vim sent_blob.py
#!python
from textblob import TextBlob
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
#set spark context, and create application
conf = SparkConf().setAppName("Sent Analysis Textblob")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
df = sqlCtx.read.load("hdfs:///data/kindle_store")
reviews = df[["reviewText","overall"]]
reviews.show(20)
sqlCtx.registerDataFrameAsTable(reviews, "table2")
reviews1 = sqlCtx.sql("SELECT reviewText, overall from table2")
#positive->1
#neutral->0
#negative->2
def transform(star):
        if star >=3.0:
                return 1.0
        elif star == 3.0:
                return 0.0
        else:
                return 2.0
transformer = udf(transform)
df1 = reviews1.withColumn("label", transformer(reviews['overall']))
sqlCtx.registerDataFrameAsTable(df1, "table1")
df2 = sqlCtx.sql("SELECT reviewText, label from table1 WHERE reviewText != ''")
df2.show()
def apply_blob(sentence):
    temp = TextBlob(sentence).sentiment[0]
    if temp == 0.0:
        return 0.0
    elif temp >= 0.0:
        return 1.0
    else:
        return 2.0
predictions = udf(apply_blob)
blob_df = df2.withColumn("predicted", predictions(df2['reviewText']))
blob_df.show()

true_labels = [i.label for i in blob_df.select("label").collect()]
predicted_labels = [i.predicted for i in blob_df.select("predicted").collect()]
correct = 0
wrong = 0
for i in range(len(true_labels)):
        if true_labels[i] == predicted_labels[i]:
                correct +=1
        else:
                wrong +=1
print('Correct predictions: ', correct)
print('Wrong predictions: ', wrong)
print('Accuracy: ', correct/(correct+wrong))

Correct predictions: 893518 Wrong predictions: 89079 Accuracy: 0.90934330

Clone this wiki locally