Skip to content

Commit

Permalink
更新需求选择模式
Browse files Browse the repository at this point in the history
  • Loading branch information
王晗 committed Mar 17, 2017
1 parent 6fe5cda commit 501f4ad
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/tosit/project/dao/impl/TaskDAOImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public void process(ResultSet rs) throws Exception {
}
});

return null;
return task;
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/tosit/project/javautils/ParamUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ParamUtils {
public static Long getTaskIdFromArgs(String[] args, String taskType) {
boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL);
if (local) {

return Long.valueOf(args[0]);
}

return 1L;
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/my.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ spark.local.session.data.path=data/click.log
spark.local.user.data.path=data/user.txt
spark.local.product.data.path=data/product.txt
spark.local.taskid.session=3
jdbc.url=jdbc:MySQL://localhost:3306/spark_project?characterEncoding=utf8&useSSL=true
jdbc.url=jdbc:MySQL://localhost:3306/session?characterEncoding=utf8&useSSL=true
jdbc.user=root
jdbc.driver=com.mysql.jdbc.Driver
jdbc.password=scubigdata
jdbc.password=***Wanghan***
jdbc.datasource.size=5
100 changes: 65 additions & 35 deletions src/main/scala/com/tosit/project/session/UserVisitAnalyzeService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import java.text.SimpleDateFormat
import java.util.Date

import com.tosit.project.constants.Constants
import com.tosit.project.javautils.StringUtils
import com.tosit.project.dao.factory.DAOFactory
import com.tosit.project.exception.TaskException
import com.tosit.project.javautils.{ParamUtils, StringUtils}
import com.tosit.project.scalautils.{AnalyzeHelperUnits, InitUnits, SparkUtils}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
Expand All @@ -26,43 +28,71 @@ object UserVisitAnalyzeService {
val sQLContext = context._2
// 加载本地session访问日志测试数据
SparkUtils.loadLocalTestDataToTmpTable(sc, sQLContext)
// 创建DAO组件,DAO组件是用来操作数据库的
val taskDao = DAOFactory.getTaskDAO()
// 通过任务常量名来获取任务ID,并将java.lang.Long转成scala.Long
val taskId = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_SESSION_TASKID).longValue()
val task = if (taskId > 0) taskDao.findById(taskId) else null
// 抛出task异常
if (task == null) {
throw new TaskException("Can't find task by id: " + taskId);
}
// 获取任务参数
val taskParam = new JSONObject(task.getTaskParam)

// // 需求1,连接池
// // 创建DAO组件,DAO组件是用来操作数据库的
// val taskDao = DAOFactory.getTaskDAO()
// // 通过任务常量名来获取任务ID
// val taskId = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_SESSION_TASKID)
// val task = if (taskId > 0) taskDao.findById(taskId) else null
// // 抛出task异常
// if (task == null) {
// throw new TaskException("Can't find task by id: " + taskId);
// }
// // 获取任务参数
// val taskParam = new JSONObject(task.getTaskParam)
// 测试json
// val param1 = new JSONObject("{\"startDate\":[\"2017-03-06\"],\"endDate\":[\"2017-03-06\"],\"startAge\":[\"40\"],\"endAge\":[\"42\"],\"citys\":[\"city14\"],\"searchWords\":[\"小米5\"]}")

try {
// 执行需求
taskId match {
/**
* 需求2
* 在指定日期范围内,按照session粒度进行数据聚合。要求聚合后的pair RDD的元素是<k:String,v:String>,
* 其中k=sessionid v的格式如下:sessionid=value|searchword=value|clickcaterory=value|age=value|
* professional=value|city=value|sex=value(Spark RDD + Sql)
*/
case 2L => {
val sql = AnalyzeHelperUnits.getSQL(taskParam)
val aggUserVisitAction = sQLContext.sql(sql._2).rdd
val aggUserInfo = sQLContext.sql(sql._1).rdd
val res = displaySession(aggUserInfo, aggUserVisitAction)
print(res.collect().toBuffer)
}

/**
* 需求3,根据用户的查询条件,一个 或者多个:年龄范围,职业(多选),城市(多选),搜索词(多选),点击品类(多选)进行数据过滤,
* 注意:session时间范围是必选的。返回的结果RDD元素格式同上(Spark RDD + Sql)
*/
case 3L => {
val actionRddByDateRange = sessionAggregateByRequirement(sQLContext, taskParam).collect().toBuffer
println(actionRddByDateRange)
}

// 测试json
val param1 = new JSONObject("{\"startDate\":[\"2017-03-06\"],\"endDate\":[\"2017-03-06\"],\"startAge\":[\"40\"],\"endAge\":[\"42\"],\"citys\":[\"city14\"],\"searchWords\":[\"小米5\"]}")

// // 需求二
// val aggUserVisitAction = sQLContext.sql("SELECT * FROM user_visit_action WHERE ( date >= \"2017-03-06\") AND ( date <= \"2017-03-06\")").rdd
// val aggUserInfo = sQLContext.sql("SELECT * FROM user_info").rdd
// val res = displaySession(aggUserInfo, aggUserVisitAction)
// print(res.collect().toBuffer)

// // 需求三
// val actionRddByDateRange = sessionAggregateByRequirement(sQLContext, param1).collect().toBuffer
// println(actionRddByDateRange)

// // 需求四
// val res = getVisitLengthAndStepLength(sc, sQLContext, param1)
// println(res)
//
// 需求五
val session = getSessionByRequirement(sQLContext, param1)
val hotProducts = getHotCategory(session).iterator
for (i <- hotProducts)
print(i)
/** 需求4
* 实现自定义累加器完成多个聚合统计业务的计算,统计业务包括访问时长:1~3秒,4~6秒,7~9秒,10~30秒,30~60秒的session访问量统计,
* 访问步长:1~3个页面,4~6个页面等步长的访问统计 注意:业务较为复杂,需要使用多个广播变量时,就会使得程序变得非常复杂,
* 不便于扩展维护(Spark Accumulator)
*/
case 4L => {
val res = getVisitLengthAndStepLength(sc, sQLContext, taskParam)
println(res)
}

/** 需求5
* 对通过筛选条件的session,按照各个品类的点击、下单和支付次数,降序排列,获取前10个热门品类。
* 优先级:点击,下单,支付。二次排序(Spark)
*/
case 5L => {
val session = getSessionByRequirement(sQLContext, taskParam)
val hotProducts = getHotCategory(session).iterator
for (i <- hotProducts)
print(i)
}
}
} catch {
case e: Exception => println("没有匹配的需求编号")
}

sc.stop()
}
Expand Down

0 comments on commit 501f4ad

Please sign in to comment.