diff --git a/config.yaml b/config.yaml index 3775b36..c2c8b36 100644 --- a/config.yaml +++ b/config.yaml @@ -23,6 +23,11 @@ event_types: description: Lorem ipsum dolor sit amet frequency: one-time expires_at: 2020-01-01T00:00:00Z + - name: free_weekly + title: Free weekly points + reward: 100 + frequency: weekly + description: Lorem ipsum dolor sit amet - name: daily_login title: Daily login reward: 5 diff --git a/docs/spec/components/schemas/Balance.yaml b/docs/spec/components/schemas/Balance.yaml index 155e908..bc62bb9 100644 --- a/docs/spec/components/schemas/Balance.yaml +++ b/docs/spec/components/schemas/Balance.yaml @@ -8,6 +8,7 @@ allOf: type: object required: - amount + - is_verified - created_at - updated_at properties: @@ -15,6 +16,10 @@ allOf: type: integer description: Amount of points example: 580 + is_verified: + type: boolean + description: Whether the user has scanned passport + example: true created_at: type: integer description: Unix timestamp of balance creation diff --git a/internal/assets/migrations/001_initial.sql b/internal/assets/migrations/001_initial.sql index 4a4de96..bddf0bd 100644 --- a/internal/assets/migrations/001_initial.sql +++ b/internal/assets/migrations/001_initial.sql @@ -5,10 +5,12 @@ AS $$ BEGIN NEW.updated_at = EXTRACT('EPOCH' FROM NOW()); RETURN NEW; END; $$; CREATE TABLE IF NOT EXISTS balances ( - did text PRIMARY KEY, - amount integer not null default 0, - created_at integer not null default EXTRACT('EPOCH' FROM NOW()), - updated_at integer not null default EXTRACT('EPOCH' FROM NOW()) + did text PRIMARY KEY, + amount integer not null default 0, + created_at integer not null default EXTRACT('EPOCH' FROM NOW()), + updated_at integer not null default EXTRACT('EPOCH' FROM NOW()), + passport_hash text UNIQUE, + passport_expires timestamp without time zone ); CREATE INDEX IF NOT EXISTS balances_amount_index ON balances using btree (amount); diff --git a/internal/cli/main.go b/internal/cli/main.go index c776c23..ad10161 100644 --- a/internal/cli/main.go +++ b/internal/cli/main.go @@ -55,8 +55,8 @@ func Run(args []string) bool { switch cmd { case serviceCmd.FullCommand(): run(func(context.Context, config.Config) { sbtcheck.Run(ctx, cfg) }) - run(reopener.Run) run(service.Run) + run(reopener.Run) case migrateUpCmd.FullCommand(): err = MigrateUp(cfg) case migrateDownCmd.FullCommand(): diff --git a/internal/data/balances.go b/internal/data/balances.go index e7f3ed4..d1f0f90 100644 --- a/internal/data/balances.go +++ b/internal/data/balances.go @@ -1,21 +1,27 @@ package data import ( + "database/sql" + "time" + "gitlab.com/distributed_lab/kit/pgdb" ) type Balance struct { - DID string `db:"did"` - Amount int32 `db:"amount"` - CreatedAt int32 `db:"created_at"` - UpdatedAt int32 `db:"updated_at"` - Rank *int `db:"rank"` + DID string `db:"did"` + Amount int32 `db:"amount"` + CreatedAt int32 `db:"created_at"` + UpdatedAt int32 `db:"updated_at"` + PassportHash sql.NullString `db:"passport_hash"` + PassportExpires sql.NullTime `db:"passport_expires"` + Rank *int `db:"rank"` } type BalancesQ interface { New() BalancesQ Insert(did string) error UpdateAmountBy(points int32) error + SetPassport(hash string, exp time.Time) error Page(*pgdb.OffsetPageParams) BalancesQ Select() ([]Balance, error) diff --git a/internal/data/events.go b/internal/data/events.go index 51f8601..d499251 100644 --- a/internal/data/events.go +++ b/internal/data/events.go @@ -30,17 +30,30 @@ type Event struct { PointsAmount sql.NullInt32 `db:"points_amount"` } +// ReopenableEvent is a pair that is sufficient to build a new open event with a specific type for a user +type ReopenableEvent struct { + UserDID string `db:"user_did"` + Type string `db:"type"` +} + type EventsQ interface { New() EventsQ Insert(...Event) error Update(status EventStatus, meta json.RawMessage, points *int32) (*Event, error) - Reopen() (count uint, err error) Transaction(func() error) error Page(*pgdb.CursorPageParams) EventsQ Select() ([]Event, error) Get() (*Event, error) + // Count returns the total number of events that match filters. Page is not + // applied to this method. Count() (int, error) + // SelectReopenable returns events matching criteria: there are no open or + // fulfilled events of this type for a specific user. + SelectReopenable() ([]ReopenableEvent, error) + // SelectAbsentTypes returns events matching criteria: there are no events of + // this type for a specific user. Filters are not applied to this selection. + SelectAbsentTypes(allTypes ...string) ([]ReopenableEvent, error) FilterByID(string) EventsQ FilterByUserDID(string) EventsQ diff --git a/internal/data/evtypes/main.go b/internal/data/evtypes/main.go index 8a42c8a..d58af0a 100644 --- a/internal/data/evtypes/main.go +++ b/internal/data/evtypes/main.go @@ -1,7 +1,6 @@ package evtypes import ( - "database/sql" "time" "github.com/rarimo/rarime-points-svc/internal/data" @@ -22,7 +21,10 @@ const ( Custom Frequency = "custom" ) -const TypeGetPoH = "get_poh" +const ( + TypeGetPoH = "get_poh" + TypeFreeWeekly = "free_weekly" +) type Types struct { inner map[string]resources.EventStaticMeta @@ -45,15 +47,15 @@ func (t Types) PrepareOpenEvents(userDID string) []data.Event { evTypes := t.List() events := make([]data.Event, len(evTypes)) - for i, evType := range evTypes { + for i, et := range evTypes { events[i] = data.Event{ UserDID: userDID, - Type: evType.Name, + Type: et.Name, Status: data.EventOpen, - PointsAmount: sql.NullInt32{ - Int32: evType.Reward, - Valid: true, - }, + } + + if et.Name == TypeFreeWeekly { + events[i].Status = data.EventFulfilled } } diff --git a/internal/data/pg/balances.go b/internal/data/pg/balances.go index 72b9c6b..759a10c 100644 --- a/internal/data/pg/balances.go +++ b/internal/data/pg/balances.go @@ -4,6 +4,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/Masterminds/squirrel" "github.com/rarimo/rarime-points-svc/internal/data" @@ -50,6 +51,18 @@ func (q *balances) UpdateAmountBy(points int32) error { return nil } +func (q *balances) SetPassport(hash string, exp time.Time) error { + stmt := q.updater. + Set("passport_hash", hash). + Set("passport_expires", exp) + + if err := q.db.Exec(stmt); err != nil { + return fmt.Errorf("set passport hash and expires: %w", err) + } + + return nil +} + func (q *balances) Page(page *pgdb.OffsetPageParams) data.BalancesQ { q.selector = page.ApplyTo(q.selector, "amount") return q diff --git a/internal/data/pg/events.go b/internal/data/pg/events.go index c67d1a7..bfd6688 100644 --- a/internal/data/pg/events.go +++ b/internal/data/pg/events.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "github.com/Masterminds/squirrel" "github.com/rarimo/rarime-points-svc/internal/data" @@ -14,18 +15,20 @@ import ( const eventsTable = "events" type events struct { - db *pgdb.DB - selector squirrel.SelectBuilder - updater squirrel.UpdateBuilder - counter squirrel.SelectBuilder + db *pgdb.DB + selector squirrel.SelectBuilder + updater squirrel.UpdateBuilder + counter squirrel.SelectBuilder + reopenable squirrel.SelectBuilder } func NewEvents(db *pgdb.DB) data.EventsQ { return &events{ - db: db, - selector: squirrel.Select("*").From(eventsTable), - updater: squirrel.Update(eventsTable), - counter: squirrel.Select("count(id) AS count").From(eventsTable), + db: db, + selector: squirrel.Select("*").From(eventsTable), + updater: squirrel.Update(eventsTable), + counter: squirrel.Select("count(id) AS count").From(eventsTable), + reopenable: squirrel.Select("user_did", "type").Distinct().From(eventsTable + " e1"), } } @@ -76,25 +79,8 @@ func (q *events) Update(status data.EventStatus, meta json.RawMessage, points *i return &res, nil } -func (q *events) Reopen() (count uint, err error) { - stmt := q.updater.SetMap(map[string]any{"status": data.EventOpen}) - defer func() { - if errors.Is(err, sql.ErrNoRows) { - err = nil - } - }() - - res, err := q.db.ExecWithResult(stmt) - if err != nil { - return 0, fmt.Errorf("update status to open with result: %w", err) - } - - rows, err := res.RowsAffected() - if err != nil { - return 0, fmt.Errorf("get rows affected: %w", err) - } - - return uint(rows), nil +func (q *events) Transaction(f func() error) error { + return q.db.Transaction(f) } func (q *events) Page(page *pgdb.CursorPageParams) data.EventsQ { @@ -102,10 +88,6 @@ func (q *events) Page(page *pgdb.CursorPageParams) data.EventsQ { return q } -func (q *events) Transaction(f func() error) error { - return q.db.Transaction(f) -} - func (q *events) Select() ([]data.Event, error) { var res []data.Event @@ -141,6 +123,49 @@ func (q *events) Count() (int, error) { return res.Count, nil } +func (q *events) SelectReopenable() ([]data.ReopenableEvent, error) { + subq := fmt.Sprintf(`NOT EXISTS ( + SELECT 1 FROM %s e2 + WHERE e2.user_did = e1.user_did + AND e2.type = e1.type + AND e2.status IN (?, ?))`, eventsTable) + stmt := q.reopenable.Where(subq, data.EventOpen, data.EventFulfilled) + + var res []data.ReopenableEvent + if err := q.db.Select(&res, stmt); err != nil { + return nil, fmt.Errorf("select reopenable events: %w", err) + } + + return res, nil +} + +func (q *events) SelectAbsentTypes(allTypes ...string) ([]data.ReopenableEvent, error) { + values := make([]string, len(allTypes)) + for i, t := range allTypes { + values[i] = fmt.Sprintf("('%s')", t) + } + + query := fmt.Sprintf(` + WITH types(type) AS ( + VALUES %s + ) + SELECT u.user_did, t.type + FROM ( + SELECT DISTINCT user_did FROM %s + ) u + CROSS JOIN types t + LEFT JOIN %s e ON e.user_did = u.user_did AND e.type = t.type + WHERE e.type IS NULL; + `, strings.Join(values, ", "), eventsTable, eventsTable) + + var res []data.ReopenableEvent + if err := q.db.SelectRaw(&res, query); err != nil { + return nil, fmt.Errorf("select absent types for each did: %w", err) + } + + return res, nil +} + func (q *events) FilterByID(id string) data.EventsQ { return q.applyCondition(squirrel.Eq{"id": id}) } @@ -171,5 +196,6 @@ func (q *events) applyCondition(cond squirrel.Sqlizer) data.EventsQ { q.selector = q.selector.Where(cond) q.updater = q.updater.Where(cond) q.counter = q.counter.Where(cond) + q.reopenable = q.reopenable.Where(cond) return q } diff --git a/internal/service/handlers/get_balance.go b/internal/service/handlers/get_balance.go index fdcf138..489bad0 100644 --- a/internal/service/handlers/get_balance.go +++ b/internal/service/handlers/get_balance.go @@ -33,10 +33,11 @@ func newBalanceModel(balance data.Balance) resources.Balance { Type: resources.BALANCE, }, Attributes: resources.BalanceAttributes{ - Amount: balance.Amount, - CreatedAt: balance.CreatedAt, - UpdatedAt: balance.UpdatedAt, - Rank: balance.Rank, + Amount: balance.Amount, + IsVerified: balance.PassportHash.Valid, + CreatedAt: balance.CreatedAt, + UpdatedAt: balance.UpdatedAt, + Rank: balance.Rank, }, } } diff --git a/internal/service/handlers/withdraw.go b/internal/service/handlers/withdraw.go index dab2a58..963887c 100644 --- a/internal/service/handlers/withdraw.go +++ b/internal/service/handlers/withdraw.go @@ -3,6 +3,7 @@ package handlers import ( "fmt" "net/http" + "time" cosmos "github.com/cosmos/cosmos-sdk/types" bank "github.com/cosmos/cosmos-sdk/x/bank/types" @@ -21,7 +22,7 @@ func Withdraw(w http.ResponseWriter, r *http.Request) { return } - if !isEnoughPoints(req, w, r) { + if !isEligibleToWithdraw(req, w, r) { return } @@ -80,19 +81,29 @@ func newWithdrawResponse(w data.Withdrawal, balance data.Balance) *resources.Wit return &resp } -func isEnoughPoints(req resources.WithdrawRequest, w http.ResponseWriter, r *http.Request) bool { +func isEligibleToWithdraw(req resources.WithdrawRequest, w http.ResponseWriter, r *http.Request) bool { balance := getBalanceByDID(req.Data.ID, false, w, r) if balance == nil { return false } - if balance.Amount < req.Data.Attributes.Amount { + render := func(field, format string, a ...any) bool { ape.RenderErr(w, problems.BadRequest(validation.Errors{ - "data/attributes/amount": fmt.Errorf("insufficient balance: %d", balance.Amount), + field: fmt.Errorf(format, a...), })...) return false } + if !balance.PassportHash.Valid { + return render("is_verified", "user must have verified passport for withdrawals") + } + if balance.PassportExpires.Time.Before(time.Now().UTC()) { + return render("is_verified", "user passport is expired") + } + if balance.Amount < req.Data.Attributes.Amount { + return render("data/attributes/amount", "insufficient balance: %d", balance.Amount) + } + return true } diff --git a/internal/service/workers/reopener/init.go b/internal/service/workers/reopener/init.go index 1e0dc24..23b0805 100644 --- a/internal/service/workers/reopener/init.go +++ b/internal/service/workers/reopener/init.go @@ -4,47 +4,118 @@ import ( "fmt" "time" + "github.com/rarimo/rarime-points-svc/internal/config" "github.com/rarimo/rarime-points-svc/internal/data" "github.com/rarimo/rarime-points-svc/internal/data/evtypes" + "github.com/rarimo/rarime-points-svc/internal/data/pg" + "gitlab.com/distributed_lab/logan/v3" ) -func (w *worker) initialRun() error { - types := w.types.NamesByFrequency(w.freq) - if len(types) == 0 { - w.log.Info("No events to reopen: all types expired or no types with frequency exist") - return nil +func initialRun(cfg config.Config) error { + var ( + q = pg.NewEvents(cfg.DB()) + log = cfg.Log().WithField("who", "reopener[initializer]") + col = &initCollector{ + q: q, + types: cfg.EventTypes(), + log: log, + } + ) + + events, err := col.collect() + if err != nil { + return fmt.Errorf("collect events: %w", err) + } + + err = q.New().Insert(prepareForReopening(events)...) + if err != nil { + return fmt.Errorf("insert events to be opened: %w", err) } - filter := w.beforeTimeFilter() - w.log.WithField("event_types", types). - Debugf("Reopening claimed events before %s", time.Unix(filter, 0).UTC()) + log.Infof("Reopened %d events on the initial run", len(events)) + return nil +} + +type initCollector struct { + q data.EventsQ + types evtypes.Types + log *logan.Entry +} - count, err := w.q.New(). - FilterByType(types...). - FilterByStatus(data.EventClaimed). - FilterByUpdatedAtBefore(filter). - Reopen() +func (c *initCollector) collect() ([]data.ReopenableEvent, error) { + var ( + now = time.Now().UTC() + monOffset = int(time.Monday - now.Weekday()) + midnight = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + weekStart = midnight.AddDate(0, 0, monOffset).Unix() + ) + daily, err := c.selectReopenable(evtypes.Daily, midnight.Unix()) if err != nil { - return fmt.Errorf("reopen events: %w", err) + return nil, fmt.Errorf("select daily events: %w", err) } - w.log.Infof("Reopened %d events on initial run", count) - return nil + weekly, err := c.selectReopenable(evtypes.Weekly, weekStart) + if err != nil { + return nil, fmt.Errorf("select weekly events: %w", err) + } + + absent, err := c.selectAbsent() + if err != nil { + return nil, fmt.Errorf("select absent events: %w", err) + } + + dw := append(daily, weekly...) + return append(dw, absent...), nil } -func (w *worker) beforeTimeFilter() int64 { - now := time.Now().UTC() - midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) +func (c *initCollector) selectReopenable(freq evtypes.Frequency, before int64) ([]data.ReopenableEvent, error) { + types := c.types.NamesByFrequency(freq) - switch w.freq { - case evtypes.Daily: - return midnight.Unix() - case evtypes.Weekly: - // current_day + (monday - current_day) = monday - offset := int(time.Monday - now.Weekday()) - return midnight.AddDate(0, 0, offset).Unix() - default: - panic(fmt.Errorf("unexpected frequency: %s", w.freq)) + res, err := c.q.New().FilterByType(types...). + FilterByUpdatedAtBefore(before). + SelectReopenable() + if err != nil { + return nil, fmt.Errorf("select reopenable events [freq=%s before=%d types=%v]: %w", freq, before, types, err) + } + + log := c.log.WithFields(logan.F{ + "frequency": freq, + "before": before, + "types": types, + }) + + if len(res) == 0 { + log.Debug("No events to reopen on initial run") + return nil, nil } + + log.Infof("%d (DID, type) pairs to reopen: %v", len(res), res) + return res, nil +} + +func (c *initCollector) selectAbsent() ([]data.ReopenableEvent, error) { + typesAll := c.types.List() + typeNames := make([]string, len(typesAll)) + + for i, t := range typesAll { + if t.NoAutoOpen { + continue + } + typeNames[i] = t.Name + } + + res, err := c.q.New().SelectAbsentTypes(typeNames...) + if err != nil { + return nil, fmt.Errorf("select events with absent types [types=%v]: %w", typeNames, err) + } + + log := c.log.WithField("types", typeNames) + if len(res) == 0 { + log.Debug("No new event types found to open for new users") + return nil, nil + } + + log.Infof("%d new (DID, type) pairs to open: %v", len(res), res) + return res, nil } diff --git a/internal/service/workers/reopener/main.go b/internal/service/workers/reopener/main.go index 82afd36..520f90f 100644 --- a/internal/service/workers/reopener/main.go +++ b/internal/service/workers/reopener/main.go @@ -19,22 +19,14 @@ const retryPeriod = 5 * time.Minute type worker struct { name string freq evtypes.Frequency - types evtypes.Types q data.EventsQ + types evtypes.Types log *logan.Entry } func Run(ctx context.Context, cfg config.Config) { - var ( - atUTC = gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0)) - daily = newWorker(cfg, evtypes.Daily) - weekly = newWorker(cfg, evtypes.Weekly) - ) - if err := daily.initialRun(); err != nil { - panic(fmt.Errorf("failed to do initial run for daily events: %w", err)) - } - if err := weekly.initialRun(); err != nil { - panic(fmt.Errorf("failed to do initial run for weekly events: %w", err)) + if err := initialRun(cfg); err != nil { + panic(fmt.Errorf("initial run failed: %w", err)) } scheduler, err := gocron.NewScheduler( @@ -45,16 +37,17 @@ func Run(ctx context.Context, cfg config.Config) { panic(fmt.Errorf("failed to initialize scheduler: %w", err)) } + atUTC := gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0)) _, err = scheduler.NewJob( gocron.DailyJob(1, atUTC), - gocron.NewTask(daily.job, ctx), + gocron.NewTask(newWorker(cfg, evtypes.Daily).job, ctx), ) if err != nil { panic(fmt.Errorf("failed to initialize daily job: %w", err)) } _, err = scheduler.NewJob( gocron.WeeklyJob(1, gocron.NewWeekdays(time.Monday), atUTC), - gocron.NewTask(weekly.job, ctx), + gocron.NewTask(newWorker(cfg, evtypes.Weekly).job, ctx), ) if err != nil { panic(fmt.Errorf("failed to initialize weekly job: %w", err)) @@ -74,8 +67,8 @@ func newWorker(cfg config.Config, freq evtypes.Frequency) *worker { return &worker{ name: name, freq: freq, - types: cfg.EventTypes(), q: pg.NewEvents(cfg.DB().Clone()), + types: cfg.EventTypes(), log: cfg.Log().WithField("who", name), } } @@ -87,19 +80,53 @@ func (w *worker) job(ctx context.Context) { w.log.Info("No events to reopen: all types expired or no types with frequency exist") return } - w.log.WithField("event_types", types).Debug("Reopening claimed events") + w.log.WithField("event_types", types). + Debug("Reopening claimed events") running.WithThreshold(ctx, w.log, w.name, func(context.Context) (bool, error) { - count, err := w.q.New(). - FilterByType(types...). - FilterByStatus(data.EventClaimed). - Reopen() - - if err != nil { + if err := w.reopenEvents(types); err != nil { return false, fmt.Errorf("reopen events: %w", err) } - - w.log.Infof("Reopened %d events", count) return true, nil }, retryPeriod, retryPeriod, 12) } + +func (w *worker) reopenEvents(types []string) error { + log := w.log.WithField("event_types", types) + + events, err := w.q.New().FilterByType(types...).SelectReopenable() + if err != nil { + return fmt.Errorf("select reopenable events [types=%v]: %w", types, err) + } + if len(events) == 0 { + log.Info("No events to reopen: no claimed events found for provided types") + return nil + } + log.Infof("%d (DID, type) pairs to reopen: %v", len(events), events) + + err = w.q.New().Insert(prepareForReopening(events)...) + if err != nil { + return fmt.Errorf("insert events for reopening: %w", err) + } + + w.log.Infof("Reopened %d events", len(events)) + return nil +} + +func prepareForReopening(events []data.ReopenableEvent) []data.Event { + res := make([]data.Event, len(events)) + + for i, ev := range events { + res[i] = data.Event{ + UserDID: ev.UserDID, + Type: ev.Type, + Status: data.EventOpen, + } + + if ev.Type == evtypes.TypeFreeWeekly { + res[i].Status = data.EventFulfilled + } + } + + return res +} diff --git a/resources/model_balance_attributes.go b/resources/model_balance_attributes.go index 6990db7..229263c 100644 --- a/resources/model_balance_attributes.go +++ b/resources/model_balance_attributes.go @@ -9,6 +9,8 @@ type BalanceAttributes struct { Amount int32 `json:"amount"` // Unix timestamp of balance creation CreatedAt int32 `json:"created_at"` + // Whether the user has scanned passport + IsVerified bool `json:"is_verified"` // Rank of the user in the full leaderboard. Returned only for the single user. Rank *int `json:"rank,omitempty"` // Unix timestamp of the last points accruing