Skip to content

Commit

Permalink
feat(job): poll daily task
Browse files Browse the repository at this point in the history
  • Loading branch information
greenhat616 committed Aug 17, 2023
1 parent ef07e71 commit 591f6de
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 52 deletions.
7 changes: 7 additions & 0 deletions internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"github.com/gogf/gf/v2/errors/gerror"

"github.com/gogf/gf/v2/os/gcfg"

Expand Down Expand Up @@ -75,6 +76,12 @@ var (
// adapter := gcache.NewAdapterRedis(g.Redis())
// g.DB().GetCache().SetAdapter(adapter)

// 注册计划任务
err = service.Job().Register(ctx)
if err != nil {
return gerror.Wrap(err, "注册计划任务失败")
}

s := g.Server()
s.SetServerAgent(consts.AppName + " " + consts.Version) // 设置服务名称
s.AddSearchPath("resource/public") // 静态文件
Expand Down
3 changes: 0 additions & 3 deletions internal/logic/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,5 @@ func (s *sJob) Register(ctx context.Context) error {
if e := RegisterPollTask(ctx); e != nil {
return e
}
if e := RegisterPollDailyReport(ctx); e != nil {
return e
}
return nil
}
245 changes: 245 additions & 0 deletions internal/logic/job/poll/daily_report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package poll

import (
"context"
"fmt"
"sync"
"time"

"github.com/hitokoto-osc/reviewer/internal/service"

"github.com/hitokoto-osc/reviewer/internal/model"

"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/hitokoto-osc/reviewer/internal/consts"
"golang.org/x/sync/errgroup"

"github.com/gogf/gf/v2/os/gtime"
"github.com/hitokoto-osc/reviewer/internal/dao"
"github.com/hitokoto-osc/reviewer/internal/model/entity"
)

func DailyReport(ctx context.Context) error {
g.Log().Debug(ctx, "开始执行每日投票报告任务...")
var (
pipelines []entity.PollPipeline
pollsActive []entity.Poll
users []entity.Users
)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
var e error
users, e = getReviewsAndAdminsThatShouldDoNotification(egCtx)
return gerror.Wrap(e, "获取用户列表失败")
})
eg.Go(func() error {
var e error
pipelines, e = getPollPipelinesPastDay(egCtx)
return gerror.Wrap(e, "获取投票处理记录失败")
})
eg.Go(func() error {
var e error
pollsActive, e = getActivePolls(egCtx)
return gerror.Wrap(e, "获取投票列表失败")
})
err := eg.Wait()
if err != nil {
g.Log().Error(ctx, err)
return err
}
accept, reject, needEdited := calcProcessedFieldCount(pipelines)
// 生成系统信息
systemInformation := &model.DailyReportSystemInformation{
Total: len(pollsActive),
ProcessTotal: len(pipelines),
ProcessAccept: accept,
ProcessReject: reject,
ProcessNeedEdited: needEdited,
}
g.Log().Debug(ctx, "开始为每个用户生成报告...")
if len(users) == 0 {
g.Log().Debug(ctx, "没有用户需要发送通知,因此不生成报告")
return nil
}
msgChan := make(chan *model.DailyReportNotificationMessage)
wg := sync.WaitGroup{}
wg.Add(len(users))
for i := 0; i < len(users); i++ {
user := users[i]
go func() {
defer wg.Done()
g.Log().Debugf(ctx, "开始为用户 %d(%s) 生成报告...", user.Id, user.Name)
msg, e := generateDailyReportForUser(egCtx, &user, systemInformation, pipelines, pollsActive)
if e != nil {
e = gerror.Wrapf(e, "生成用户 %d(%s)的报告失败", user.Id, user.Name)
g.Log().Error(ctx, e)
return
}
msgChan <- msg
}()
}
go func() {
wg.Wait()
close(msgChan) // 关闭通道
}()
// 收集结果
msgs := make([]model.DailyReportNotificationMessage, 0, len(users))
for msg := range msgChan {
msgs = append(msgs, *msg)
}
g.Log().Debug(ctx, "开始发送通知...")
err = service.Notification().DailyReportNotification(ctx, msgs)
if err != nil {
return gerror.Wrap(err, "发送通知失败")
}
g.Log().Debug(ctx, "每日投票报告任务执行完成")
return nil
}

func generateDailyReportForUser(ctx context.Context,
user *entity.Users,
sysInfo *model.DailyReportSystemInformation,
pipelines []entity.PollPipeline,
pollInWaiting []entity.Poll,
) (*model.DailyReportNotificationMessage, error) {
pollLogs, err := getUserPollLogsPastDay(ctx, user.Id)
if err != nil {
return nil, gerror.Wrap(err, "获取用户投票记录失败")
}
userInfo := generateUserInformation(pipelines, pollInWaiting, pollLogs, sysInfo)
msg := &model.DailyReportNotificationMessage{
CreatedAt: gtime.Now().Format("c"),
To: user.Email,
UserName: user.Name,
SystemInformation: *sysInfo,
UserInformation: *userInfo,
}
g.Log().Debugf(ctx, "用户 %d(%s)的报告生成完成。", user.Id, user.Name)
return msg, nil
}

// generateUserInformation 生成用户信息(通知的字段)
func generateUserInformation(
pipelines []entity.PollPipeline,
pollInWaiting []entity.Poll,
pollLogs []entity.PollLog,
sysInfo *model.DailyReportSystemInformation,
) *model.DailyReportUserInformation {
approve, reject, needModify := calcPolledFieldCount(pollLogs)
polled := &model.DailyReportMessageUserInformationPolled{
Total: len(pollLogs),
Accept: approve,
Reject: reject,
NeedEdited: needModify,
}
userInfo := &model.DailyReportUserInformation{
Polled: *polled,
Waiting: 0,
Accepted: 0,
Rejected: 0,
InNeedEdited: 0,
WaitForPolling: 0,
}

pollProcessedPastDayMap := make(g.MapIntInt)
for _, pipeline := range pipelines {
pollProcessedPastDayMap[pipeline.PollId] = pipeline.Operate
}
pollInWaitingMap := make(g.MapIntBool)
for _, poll := range pollInWaiting {
pollInWaitingMap[poll.Id] = true
}
for i := 0; i < len(pollLogs); i++ {
log := &pollLogs[i]
if op, ok := pollProcessedPastDayMap[log.PollId]; ok {
switch op {
case int(consts.PollStatusApproved):
userInfo.Accepted++
case int(consts.PollStatusRejected):
userInfo.Rejected++
case int(consts.PollStatusNeedModify):
userInfo.InNeedEdited++
}
}
if pollInWaitingMap[log.PollId] {
userInfo.WaitForPolling++
}
}
userInfo.WaitForPolling = sysInfo.Total - userInfo.Waiting // 修正需要用户投票的数据
return userInfo
}

func getReviewsAndAdminsThatShouldDoNotification(ctx context.Context) ([]entity.Users, error) {
var users []entity.Users
err := dao.Users.Ctx(ctx).Raw(fmt.Sprintf(
"SELECT * FROM `%s` WHERE `%s` = 1 OR `%s` = 1 IN ("+ // 筛选出管理员和审核员
"SELECT `%s` FROM `%s` WHERE `%s` = 1 AND `%s` = 1"+ // 筛选出开启通知的用户
")",
dao.Users.Table(),
dao.Users.Columns().IsReviewer,
dao.Users.Columns().IsAdmin,
dao.UserNotification.Columns().UserId,
dao.UserNotification.Table(),
dao.UserNotification.Columns().EmailNotificationGlobal,
dao.UserNotification.Columns().EmailNotificationPollDailyReport,
)).Scan(&users)
return users, err
}

func getPollPipelinesPastDay(ctx context.Context) ([]entity.PollPipeline, error) {
var pipelines []entity.PollPipeline
err := dao.PollPipeline.Ctx(ctx).
WhereBetween(dao.PollPipeline.Columns().CreatedAt,
gtime.Now().Add(-time.Hour*24), // nolint:gomnd // 24 小时
gtime.Now(),
).Scan(&pipelines)
return pipelines, err
}

func getUserPollLogsPastDay(ctx context.Context, userID uint) ([]entity.PollLog, error) {
var logs []entity.PollLog
err := dao.PollLog.Ctx(ctx).
Where(dao.PollLog.Columns().UserId, userID).
WhereBetween(dao.PollLog.Columns().CreatedAt,
gtime.Now().Add(-time.Hour*24), // nolint:gomnd // 24 小时
gtime.Now(),
).Scan(&logs)
return logs, err
}

func getActivePolls(ctx context.Context) ([]entity.Poll, error) {
var polls []entity.Poll
err := dao.Poll.Ctx(ctx).
WhereLT(dao.Poll.Columns().Status, int(consts.PollStatusClosed)).
Scan(&polls)
return polls, err
}

func calcProcessedFieldCount(pipelines []entity.PollPipeline) (approve, reject, needModify int) {
for _, pipeline := range pipelines {
switch pipeline.Operate {
case int(consts.PollStatusApproved):
approve++
case int(consts.PollStatusRejected):
reject++
case int(consts.PollStatusNeedModify):
needModify++
}
}
return
}

func calcPolledFieldCount(logs []entity.PollLog) (approve, reject, needModify int) {
for _, log := range logs {
switch log.Type {
case int(consts.PollStatusApproved):
approve += log.Point
case int(consts.PollStatusRejected):
reject += log.Point
case int(consts.PollStatusNeedModify):
needModify += log.Point
}
}
return
}
17 changes: 0 additions & 17 deletions internal/logic/job/poll_daily_report.go

This file was deleted.

26 changes: 21 additions & 5 deletions internal/logic/job/poll_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,26 @@ import (
"github.com/gogf/gf/v2/os/gcron"
)

const PollTaskCron = "@every 1m30s" // 每 90 秒执行一次
const PollTickTaskCron = "@every 1m30s" // 每 90 秒执行一次
const PollDailyTaskCron = "0 30 8 */1 * *" // 每天八点半执行

func DoPollTask(ctx context.Context) {
func DoPollTickTask(ctx context.Context) {
wg := sync.WaitGroup{}
wg.Add(2)
e := make(chan error, 2)
go func() {
defer wg.Done()
err := poll.RemoveInvalidPolls(ctx)
if err != nil {
e <- err
}
wg.Done()
}()
go func() {
defer wg.Done()
err := poll.MoveOverduePolls(ctx)
if err != nil {
e <- err
}
wg.Done()
}()
wg.Wait()
if len(e) > 0 {
Expand All @@ -42,8 +43,23 @@ func DoPollTask(ctx context.Context) {
}
}

func DoPollDailyTask(ctx context.Context) {
err := poll.ClearInactiveReviewer(ctx)
if err != nil {
g.Log().Error(ctx, err)
}
err = poll.DailyReport(ctx)
if err != nil {
g.Log().Error(ctx, err)
}
}

func RegisterPollTask(ctx context.Context) error {
g.Log().Debug(ctx, "Registering Poll Task...")
_, err := gcron.AddSingleton(ctx, PollTaskCron, DoPollTask)
_, err := gcron.AddSingleton(ctx, PollTickTaskCron, DoPollTickTask)
if err != nil {
return err
}
_, err = gcron.AddSingleton(ctx, PollDailyTaskCron, DoPollDailyTask)
return err
}
16 changes: 3 additions & 13 deletions internal/logic/notification/poll_daily_report_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,9 @@ var PollDailyReportNotificationSettingField = dao.UserNotification.Columns().Ema
// DailyReportNotification 发送每日报告通知
// 直接交给 Event 生产消息
func (s *sNotification) DailyReportNotification(ctx context.Context, rawData []model.DailyReportNotificationMessage) error {
users, err := s.GetUsersShouldDoNotification(ctx, PollDailyReportNotificationSettingField)
if err != nil {
return err
}
// filter
data := make([]any, 0, len(users))
for i := 0; i < len(users); i++ {
v := &users[i]
for j := 0; j < len(rawData); j++ {
if rawData[j].To == v.Email {
data = append(data, rawData[j])
}
}
data := make([]any, 0, len(rawData))
for i := 0; i < len(rawData); i++ {
data = append(data, rawData[i])
}
return DoNotification(ctx, PollDailyReportNotificationExchange, PollDailyReportNotificationRoutingKey, data)
}
Loading

0 comments on commit 591f6de

Please sign in to comment.