Skip to content

Commit

Permalink
When new event type is supported, it is pushed to existing users on t…
Browse files Browse the repository at this point in the history
…he init run
  • Loading branch information
violog committed Feb 12, 2024
1 parent 5a9855c commit bd4778f
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 44 deletions.
3 changes: 3 additions & 0 deletions internal/data/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type EventsQ interface {
// SelectReopenable returns events matching criteria: there are no open or
// fulfilled events of this type for a specific user.
SelectReopenable() ([]ReopenableEvent, error)
// SelectAbsentTypes returns events matching criteria: there are no events of
// this type for a specific user. Filters are not applied to this selection.
SelectAbsentTypes(allTypes ...string) ([]ReopenableEvent, error)

FilterByID(string) EventsQ
FilterByUserDID(string) EventsQ
Expand Down
28 changes: 28 additions & 0 deletions internal/data/pg/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/Masterminds/squirrel"
"github.com/rarimo/rarime-points-svc/internal/data"
Expand Down Expand Up @@ -138,6 +139,33 @@ func (q *events) SelectReopenable() ([]data.ReopenableEvent, error) {
return res, nil
}

func (q *events) SelectAbsentTypes(allTypes ...string) ([]data.ReopenableEvent, error) {
values := make([]string, len(allTypes))
for i, t := range allTypes {
values[i] = fmt.Sprintf("('%s')", t)
}

query := fmt.Sprintf(`
WITH types(type) AS (
VALUES %s
)
SELECT u.user_did, t.type
FROM (
SELECT DISTINCT user_did FROM %s
) u
CROSS JOIN types t
LEFT JOIN %s e ON e.user_did = u.user_did AND e.type = t.type
WHERE e.type IS NULL;
`, strings.Join(values, ", "), eventsTable, eventsTable)

var res []data.ReopenableEvent
if err := q.db.SelectRaw(&res, query); err != nil {
return nil, fmt.Errorf("select absent types for each did: %w", err)
}

return res, nil
}

func (q *events) FilterByID(id string) data.EventsQ {
return q.applyCondition(squirrel.Eq{"id": id})
}
Expand Down
123 changes: 105 additions & 18 deletions internal/service/workers/reopener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,118 @@ import (
"fmt"
"time"

"github.com/rarimo/rarime-points-svc/internal/config"
"github.com/rarimo/rarime-points-svc/internal/data"
"github.com/rarimo/rarime-points-svc/internal/data/evtypes"
"github.com/rarimo/rarime-points-svc/internal/data/pg"
"gitlab.com/distributed_lab/logan/v3"
)

func (w *worker) initialRun() error {
types := w.types.NamesByFrequency(w.freq)
if len(types) == 0 {
w.log.Info("Initial run: no events to reopen: all types expired or no types with frequency exist")
return nil
func initialRun(cfg config.Config) error {
var (
q = pg.NewEvents(cfg.DB())
log = cfg.Log().WithField("who", "reopener[initializer]")
col = &initCollector{
q: q,
types: cfg.EventTypes(),
log: log,
}
)

events, err := col.collect()
if err != nil {
return fmt.Errorf("collect events: %w", err)
}

err = q.New().Insert(prepareForReopening(events)...)
if err != nil {
return fmt.Errorf("insert events to be opened: %w", err)
}

log.Infof("Reopened %d events on the initial run", len(events))
return nil
}

type initCollector struct {
q data.EventsQ
types evtypes.Types
log *logan.Entry
}

func (c *initCollector) collect() ([]data.ReopenableEvent, error) {
var (
now = time.Now().UTC()
monOffset = int(time.Monday - now.Weekday())
midnight = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
weekStart = midnight.AddDate(0, 0, monOffset).Unix()
)

daily, err := c.selectReopenable(evtypes.Daily, midnight.Unix())
if err != nil {
return nil, fmt.Errorf("select daily events: %w", err)
}

weekly, err := c.selectReopenable(evtypes.Weekly, weekStart)
if err != nil {
return nil, fmt.Errorf("select weekly events: %w", err)
}

absent, err := c.selectAbsent()
if err != nil {
return nil, fmt.Errorf("select absent events: %w", err)
}

return w.reopenEvents(types, true)
dw := append(daily, weekly...)
return append(dw, absent...), nil
}

func (w *worker) beforeTimeFilter() int64 {
now := time.Now().UTC()
midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
func (c *initCollector) selectReopenable(freq evtypes.Frequency, before int64) ([]data.ReopenableEvent, error) {
types := c.types.NamesByFrequency(freq)

switch w.freq {
case evtypes.Daily:
return midnight.Unix()
case evtypes.Weekly:
// current_day + (monday - current_day) = monday
offset := int(time.Monday - now.Weekday())
return midnight.AddDate(0, 0, offset).Unix()
default:
panic(fmt.Errorf("unexpected frequency: %s", w.freq))
res, err := c.q.New().FilterByType(types...).
FilterByUpdatedAtBefore(before).
SelectReopenable()
if err != nil {
return nil, fmt.Errorf("select reopenable events [freq=%s before=%d types=%v]: %w", freq, before, types, err)
}

log := c.log.WithFields(logan.F{
"frequency": freq,
"before": before,
"types": types,
})

if len(res) == 0 {
log.Debug("No events to reopen on initial run")
return nil, nil
}

log.Infof("%d (DID, type) pairs to reopen: %v", len(res), res)
return res, nil
}

func (c *initCollector) selectAbsent() ([]data.ReopenableEvent, error) {
typesAll := c.types.List()
typeNames := make([]string, len(typesAll))

for i, t := range typesAll {
if t.NoAutoOpen {
continue
}
typeNames[i] = t.Name
}

res, err := c.q.New().SelectAbsentTypes(typeNames...)
if err != nil {
return nil, fmt.Errorf("select events with absent types [types=%v]: %w", typeNames, err)
}

log := c.log.WithField("types", typeNames)
if len(res) == 0 {
log.Debug("No new event types found to open for new users")
return nil, nil
}

log.Infof("%d new (DID, type) pairs to open: %v", len(res), res)
return res, nil
}
39 changes: 13 additions & 26 deletions internal/service/workers/reopener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,14 @@ const retryPeriod = 5 * time.Minute
type worker struct {
name string
freq evtypes.Frequency
types evtypes.Types
q data.EventsQ
types evtypes.Types
log *logan.Entry
}

func Run(ctx context.Context, cfg config.Config) {
var (
atUTC = gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))
daily = newWorker(cfg, evtypes.Daily)
weekly = newWorker(cfg, evtypes.Weekly)
)
if err := daily.initialRun(); err != nil {
panic(fmt.Errorf("failed to do initial run for daily events: %w", err))
}
if err := weekly.initialRun(); err != nil {
panic(fmt.Errorf("failed to do initial run for weekly events: %w", err))
if err := initialRun(cfg); err != nil {
panic(fmt.Errorf("initial run failed: %w", err))
}

scheduler, err := gocron.NewScheduler(
Expand All @@ -45,16 +37,17 @@ func Run(ctx context.Context, cfg config.Config) {
panic(fmt.Errorf("failed to initialize scheduler: %w", err))
}

atUTC := gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))
_, err = scheduler.NewJob(
gocron.DailyJob(1, atUTC),
gocron.NewTask(daily.job, ctx),
gocron.NewTask(newWorker(cfg, evtypes.Daily).job, ctx),
)
if err != nil {
panic(fmt.Errorf("failed to initialize daily job: %w", err))
}
_, err = scheduler.NewJob(
gocron.WeeklyJob(1, gocron.NewWeekdays(time.Monday), atUTC),
gocron.NewTask(weekly.job, ctx),
gocron.NewTask(newWorker(cfg, evtypes.Weekly).job, ctx),
)
if err != nil {
panic(fmt.Errorf("failed to initialize weekly job: %w", err))
Expand All @@ -74,8 +67,8 @@ func newWorker(cfg config.Config, freq evtypes.Frequency) *worker {
return &worker{
name: name,
freq: freq,
types: cfg.EventTypes(),
q: pg.NewEvents(cfg.DB().Clone()),
types: cfg.EventTypes(),
log: cfg.Log().WithField("who", name),
}
}
Expand All @@ -91,31 +84,25 @@ func (w *worker) job(ctx context.Context) {
Debug("Reopening claimed events")

running.WithThreshold(ctx, w.log, w.name, func(context.Context) (bool, error) {
if err := w.reopenEvents(types, false); err != nil {
if err := w.reopenEvents(types); err != nil {
return false, fmt.Errorf("reopen events: %w", err)
}
return true, nil
}, retryPeriod, retryPeriod, 12)
}

func (w *worker) reopenEvents(types []string, initRun bool) error {
q := w.q.New().FilterByType(types...)

if initRun {
filter := w.beforeTimeFilter()
w.log.WithField("event_types", types).
Debugf("Reopening claimed events before %d", filter)
q.FilterByUpdatedAtBefore(filter)
}
func (w *worker) reopenEvents(types []string) error {
log := w.log.WithField("event_types", types)

events, err := q.SelectReopenable()
events, err := w.q.New().FilterByType(types...).SelectReopenable()
if err != nil {
return fmt.Errorf("select reopenable events [types=%v]: %w", types, err)
}
if len(events) == 0 {
w.log.Info("No events to reopen")
log.Info("No events to reopen: no claimed events found for provided types")
return nil
}
log.Infof("%d (DID, type) pairs to reopen: %v", len(events), events)

err = w.q.New().Insert(prepareForReopening(events)...)
if err != nil {
Expand Down

0 comments on commit bd4778f

Please sign in to comment.