From 501f4adceb41e4be04e1506abd5669a4b58a9035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=99=97?= Date: Sat, 18 Mar 2017 00:20:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=9C=80=E6=B1=82=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tosit/project/dao/impl/TaskDAOImpl.java | 2 +- .../tosit/project/javautils/ParamUtils.java | 2 +- src/main/resources/my.properties | 4 +- .../session/UserVisitAnalyzeService.scala | 100 ++++++++++++------ 4 files changed, 69 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/tosit/project/dao/impl/TaskDAOImpl.java b/src/main/java/com/tosit/project/dao/impl/TaskDAOImpl.java index eb6c56c..a9d6480 100644 --- a/src/main/java/com/tosit/project/dao/impl/TaskDAOImpl.java +++ b/src/main/java/com/tosit/project/dao/impl/TaskDAOImpl.java @@ -49,6 +49,6 @@ public void process(ResultSet rs) throws Exception { } }); - return null; + return task; } } \ No newline at end of file diff --git a/src/main/java/com/tosit/project/javautils/ParamUtils.java b/src/main/java/com/tosit/project/javautils/ParamUtils.java index ea36497..871020a 100644 --- a/src/main/java/com/tosit/project/javautils/ParamUtils.java +++ b/src/main/java/com/tosit/project/javautils/ParamUtils.java @@ -22,7 +22,7 @@ public class ParamUtils { public static Long getTaskIdFromArgs(String[] args, String taskType) { boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); if (local) { - + return Long.valueOf(args[0]); } return 1L; diff --git a/src/main/resources/my.properties b/src/main/resources/my.properties index 1353ac3..59c9dd1 100644 --- a/src/main/resources/my.properties +++ b/src/main/resources/my.properties @@ -3,8 +3,8 @@ spark.local.session.data.path=data/click.log spark.local.user.data.path=data/user.txt spark.local.product.data.path=data/product.txt spark.local.taskid.session=3 -jdbc.url=jdbc:MySQL://localhost:3306/spark_project?characterEncoding=utf8&useSSL=true +jdbc.url=jdbc:MySQL://localhost:3306/session?characterEncoding=utf8&useSSL=true jdbc.user=root jdbc.driver=com.mysql.jdbc.Driver -jdbc.password=scubigdata +jdbc.password=***Wanghan*** jdbc.datasource.size=5 diff --git a/src/main/scala/com/tosit/project/session/UserVisitAnalyzeService.scala b/src/main/scala/com/tosit/project/session/UserVisitAnalyzeService.scala index 662b96b..bdad841 100644 --- a/src/main/scala/com/tosit/project/session/UserVisitAnalyzeService.scala +++ b/src/main/scala/com/tosit/project/session/UserVisitAnalyzeService.scala @@ -4,7 +4,9 @@ import java.text.SimpleDateFormat import java.util.Date import com.tosit.project.constants.Constants -import com.tosit.project.javautils.StringUtils +import com.tosit.project.dao.factory.DAOFactory +import com.tosit.project.exception.TaskException +import com.tosit.project.javautils.{ParamUtils, StringUtils} import com.tosit.project.scalautils.{AnalyzeHelperUnits, InitUnits, SparkUtils} import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD @@ -26,43 +28,71 @@ object UserVisitAnalyzeService { val sQLContext = context._2 // 加载本地session访问日志测试数据 SparkUtils.loadLocalTestDataToTmpTable(sc, sQLContext) + // 创建DAO组件,DAO组件是用来操作数据库的 + val taskDao = DAOFactory.getTaskDAO() + // 通过任务常量名来获取任务ID,并将java.lang.Long转成scala.Long + val taskId = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_SESSION_TASKID).longValue() + val task = if (taskId > 0) taskDao.findById(taskId) else null + // 抛出task异常 + if (task == null) { + throw new TaskException("Can't find task by id: " + taskId); + } + // 获取任务参数 + val taskParam = new JSONObject(task.getTaskParam) - // // 需求1,连接池 - // // 创建DAO组件,DAO组件是用来操作数据库的 - // val taskDao = DAOFactory.getTaskDAO() - // // 通过任务常量名来获取任务ID - // val taskId = ParamUtils.getTaskIdFromArgs(args, Constants.SPARK_LOCAL_SESSION_TASKID) - // val task = if (taskId > 0) taskDao.findById(taskId) else null - // // 抛出task异常 - // if (task == null) { - // throw new TaskException("Can't find task by id: " + taskId); - // } - // // 获取任务参数 - // val taskParam = new JSONObject(task.getTaskParam) + // 测试json + // val param1 = new JSONObject("{\"startDate\":[\"2017-03-06\"],\"endDate\":[\"2017-03-06\"],\"startAge\":[\"40\"],\"endAge\":[\"42\"],\"citys\":[\"city14\"],\"searchWords\":[\"小米5\"]}") + + try { + // 执行需求 + taskId match { + /** + * 需求2 + * 在指定日期范围内,按照session粒度进行数据聚合。要求聚合后的pair RDD的元素是, + * 其中k=sessionid v的格式如下:sessionid=value|searchword=value|clickcaterory=value|age=value| + * professional=value|city=value|sex=value(Spark RDD + Sql) + */ + case 2L => { + val sql = AnalyzeHelperUnits.getSQL(taskParam) + val aggUserVisitAction = sQLContext.sql(sql._2).rdd + val aggUserInfo = sQLContext.sql(sql._1).rdd + val res = displaySession(aggUserInfo, aggUserVisitAction) + print(res.collect().toBuffer) + } + /** + * 需求3,根据用户的查询条件,一个 或者多个:年龄范围,职业(多选),城市(多选),搜索词(多选),点击品类(多选)进行数据过滤, + * 注意:session时间范围是必选的。返回的结果RDD元素格式同上(Spark RDD + Sql) + */ + case 3L => { + val actionRddByDateRange = sessionAggregateByRequirement(sQLContext, taskParam).collect().toBuffer + println(actionRddByDateRange) + } - // 测试json - val param1 = new JSONObject("{\"startDate\":[\"2017-03-06\"],\"endDate\":[\"2017-03-06\"],\"startAge\":[\"40\"],\"endAge\":[\"42\"],\"citys\":[\"city14\"],\"searchWords\":[\"小米5\"]}") - - // // 需求二 - // val aggUserVisitAction = sQLContext.sql("SELECT * FROM user_visit_action WHERE ( date >= \"2017-03-06\") AND ( date <= \"2017-03-06\")").rdd - // val aggUserInfo = sQLContext.sql("SELECT * FROM user_info").rdd - // val res = displaySession(aggUserInfo, aggUserVisitAction) - // print(res.collect().toBuffer) - - // // 需求三 - // val actionRddByDateRange = sessionAggregateByRequirement(sQLContext, param1).collect().toBuffer - // println(actionRddByDateRange) - - // // 需求四 - // val res = getVisitLengthAndStepLength(sc, sQLContext, param1) - // println(res) - // - // 需求五 - val session = getSessionByRequirement(sQLContext, param1) - val hotProducts = getHotCategory(session).iterator - for (i <- hotProducts) - print(i) + /** 需求4 + * 实现自定义累加器完成多个聚合统计业务的计算,统计业务包括访问时长:1~3秒,4~6秒,7~9秒,10~30秒,30~60秒的session访问量统计, + * 访问步长:1~3个页面,4~6个页面等步长的访问统计 注意:业务较为复杂,需要使用多个广播变量时,就会使得程序变得非常复杂, + * 不便于扩展维护(Spark Accumulator) + */ + case 4L => { + val res = getVisitLengthAndStepLength(sc, sQLContext, taskParam) + println(res) + } + + /** 需求5 + * 对通过筛选条件的session,按照各个品类的点击、下单和支付次数,降序排列,获取前10个热门品类。 + * 优先级:点击,下单,支付。二次排序(Spark) + */ + case 5L => { + val session = getSessionByRequirement(sQLContext, taskParam) + val hotProducts = getHotCategory(session).iterator + for (i <- hotProducts) + print(i) + } + } + } catch { + case e: Exception => println("没有匹配的需求编号") + } sc.stop() }