-
Notifications
You must be signed in to change notification settings - Fork 0
/
FinalModel.py
83 lines (60 loc) · 2.68 KB
/
FinalModel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# %% Imports
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import itertools
import numpy as np
#%%
def format(df):
df = df.select('userId',"trackId","count")
return df
#%% Main
def main(spark, sc):
#spark configs
sc.setLogLevel("OFF")
spark.conf.set("spark.blacklist.enabled", "False")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
#read in file
file_path = ['hdfs:/user/zm2114/cf_train_new.parquet',
'hdfs:/user/zm2114/cf_validation.parquet',
'hdfs:/user/zm2114/cf_test.parquet']
train = format(spark.read.parquet(file_path[0]))
val = format(spark.read.parquet(file_path[1]))
test = format(spark.read.parquet(file_path[2]))
#-------------------------------------------------------------------------------------
params = [ [100], #alpha
[10] , #regParam
[20] , #maxIter
[125]] #rank
params = list(itertools.product(*params))
precision = []
val1 = val.groupBy("userId").agg(F.collect_list("trackId").alias("trackId_preds"))
for i,z in zip(params,range(len(params))):
als = ALS(rank = i[3], maxIter=i[2],regParam=i[1],userCol="userId", itemCol="trackId", ratingCol="count",
alpha = i[0], implicitPrefs = True)
model = als.fit(train) ##train
users = val.select(als.getUserCol()).distinct()
userSubsetRecs = model.recommendForUserSubset(users, 500)
userSubsetRecs = userSubsetRecs.select("userId","recommendations.trackId")
k = userSubsetRecs.join(val1,"userId")
k = k.rdd.map(lambda row: (row[1], row[2]))
metrics = RankingMetrics(k)
precision.append(metrics.meanAveragePrecision)
for i in range(len(params)):
print(params[i])
print("alpha= " + str(params[i][0]))
print("regParam= " + str(params[i][1]))
print("maxIter= " + str(params[i][2]))
print("rank= " + str(params[i][3]))
print("MAP= " + str(precision[i]))
print('-----------------------------------------------------')
#%% Func call
if __name__ == "__main__":
# Create the spark session object
spark = SparkSession.builder.appName('FinalModel').getOrCreate()
sc = spark.sparkContext
main(spark, sc)