From f71b1e4701130a7101d545862a69fe364c1b890d Mon Sep 17 00:00:00 2001 From: trung Date: Wed, 5 Jul 2023 11:32:26 +0700 Subject: [PATCH 01/11] refactor: add GetStakingAddressesByTimeRangeDirectly --- .../httpapi/handlers/report_dashboard.go | 2 +- .../report_dashboard/view/report_dashboard.go | 103 ++++++++++++------ 2 files changed, 70 insertions(+), 35 deletions(-) diff --git a/infrastructure/httpapi/handlers/report_dashboard.go b/infrastructure/httpapi/handlers/report_dashboard.go index 1dad2b7..d2c3137 100644 --- a/infrastructure/httpapi/handlers/report_dashboard.go +++ b/infrastructure/httpapi/handlers/report_dashboard.go @@ -114,7 +114,7 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht } reportDashboardOverall.Overall.TotalUpToDateAddresses = totalUpToDateAddresses - totalActiveAddresses, err := handler.reportDashboardView.GetActiveAddressesByTimeRange(fromDate, toDate) + totalActiveAddresses, err := handler.reportDashboardView.GetActiveAddressesByTimeRangeDirectly(fromDate, toDate) if err != nil { handler.logger.Errorf("error get active addresses by time range: %v", err) prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) diff --git a/projection/report_dashboard/view/report_dashboard.go b/projection/report_dashboard/view/report_dashboard.go index 3826bc2..db2743e 100644 --- a/projection/report_dashboard/view/report_dashboard.go +++ b/projection/report_dashboard/view/report_dashboard.go @@ -60,40 +60,6 @@ func (view *ReportDashboard) UpdateReportDashboardByDate(date string) (string, e return "OK", nil } -func (impl *ReportDashboard) GetActiveAddressesByTimeRange(from string, to string) (int64, error) { - layout := "2006-01-02" - fromDateTime, err := time.Parse(layout, from) - if err != nil { - return -1, err - } - fromDate := fromDateTime.Truncate(24 * time.Hour).UnixNano() - - toDateTime, err := time.Parse(layout, to) - if err != nil { - return -1, err - } - toDate := toDateTime.Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() - - rawQuery := fmt.Sprintf("SELECT COUNT(from_address) "+ - "FROM "+ - "(SELECT DISTINCT (from_address) "+ - "FROM view_transactions "+ - "WHERE block_time >= %d "+ - "AND block_time < %d "+ - ") AS tmp", fromDate, toDate) - - var totalActiveAddresses int64 - if err = impl.rdbHandle.QueryRow(rawQuery).Scan( - &totalActiveAddresses, - ); err != nil { - if errors.Is(err, rdb.ErrNoRows) { - return -1, rdb.ErrNoRows - } - return -1, fmt.Errorf("error scanning active addresses by time range row: %v: %w", err, rdb.ErrQuery) - } - return totalActiveAddresses, nil -} - func (impl *ReportDashboard) GetReportDashboardByTimeRange(from string, to string) (ReportDashboardOverall, error) { layout := "2006-01-02" fromDateTime, err := time.Parse(layout, from) @@ -188,6 +154,75 @@ func (impl *ReportDashboard) GetReportDashboardByTimeRange(from string, to strin return reportDashboardOverall, nil } +func (impl *ReportDashboard) GetActiveAddressesByTimeRangeDirectly(from string, to string) (int64, error) { + layout := "2006-01-02" + fromDateTime, err := time.Parse(layout, from) + if err != nil { + return -1, err + } + fromDate := fromDateTime.Truncate(24 * time.Hour).UnixNano() + + toDateTime, err := time.Parse(layout, to) + if err != nil { + return -1, err + } + toDate := toDateTime.Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + + rawQuery := fmt.Sprintf("SELECT COUNT(from_address) "+ + "FROM "+ + "(SELECT DISTINCT (from_address) "+ + "FROM view_transactions "+ + "WHERE block_time >= %d "+ + "AND block_time < %d "+ + ") AS tmp", fromDate, toDate) + + var totalActiveAddresses int64 + if err = impl.rdbHandle.QueryRow(rawQuery).Scan( + &totalActiveAddresses, + ); err != nil { + if errors.Is(err, rdb.ErrNoRows) { + return -1, rdb.ErrNoRows + } + return -1, fmt.Errorf("error scanning active addresses by time range row: %v: %w", err, rdb.ErrQuery) + } + return totalActiveAddresses, nil +} + +func (impl *ReportDashboard) GetStakingAddressesByTimeRangeDirectly(from string, to string) (int64, error) { + layout := "2006-01-02" + fromDateTime, err := time.Parse(layout, from) + if err != nil { + return -1, err + } + fromDate := fromDateTime.Truncate(24 * time.Hour).UnixNano() + + toDateTime, err := time.Parse(layout, to) + if err != nil { + return -1, err + } + toDate := toDateTime.Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + + rawQuery := fmt.Sprintf("SELECT COUNT (*) FROM(SELECT DISTINCT CAST(value ->> 'content' AS jsonb) ->> 'delegatorAddress' "+ + "FROM "+ + "view_transactions, "+ + "jsonb_array_elements(view_transactions.messages) elems "+ + "WHERE "+ + "block_time >= %d AND "+ + "block_time < %d AND "+ + "value->>'type'='%s') AS tmp", fromDate, toDate, "/cosmos.staking.v1beta1.MsgDelegate") + + var totalStakingAddresses int64 + if err = impl.rdbHandle.QueryRow(rawQuery).Scan( + &totalStakingAddresses, + ); err != nil { + if errors.Is(err, rdb.ErrNoRows) { + return -1, rdb.ErrNoRows + } + return -1, fmt.Errorf("error scanning staking addresses by time range row: %v: %w", err, rdb.ErrQuery) + } + return totalStakingAddresses, nil +} + type ReportDashboardData struct { DateTime string `json:"dateTime,omitempty"` TotalTxOfRedeemedCoupons int64 `json:"totalTxOfRedeemedCoupons"` From 0cf66df62154f8885da84cc94dbcf0ded5bcdcd1 Mon Sep 17 00:00:00 2001 From: trung Date: Wed, 5 Jul 2023 11:44:30 +0700 Subject: [PATCH 02/11] refactor: add GetAddressesOfRedeemedCouponsByTimeRangeDirectly --- .../report_dashboard/view/report_dashboard.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/projection/report_dashboard/view/report_dashboard.go b/projection/report_dashboard/view/report_dashboard.go index db2743e..744aaef 100644 --- a/projection/report_dashboard/view/report_dashboard.go +++ b/projection/report_dashboard/view/report_dashboard.go @@ -223,6 +223,40 @@ func (impl *ReportDashboard) GetStakingAddressesByTimeRangeDirectly(from string, return totalStakingAddresses, nil } +func (impl *ReportDashboard) GetAddressesOfRedeemedCouponsByTimeRangeDirectly(from string, to string) (int64, error) { + layout := "2006-01-02" + fromDateTime, err := time.Parse(layout, from) + if err != nil { + return -1, err + } + fromDate := fromDateTime.Truncate(24 * time.Hour).UnixNano() + + toDateTime, err := time.Parse(layout, to) + if err != nil { + return -1, err + } + toDate := toDateTime.Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + + rawQuery := fmt.Sprintf("SELECT COUNT(*) "+ + "FROM (SELECT DISTINCT from_address "+ + "FROM view_transactions "+ + "WHERE "+ + "block_time >= %d AND "+ + "block_time < %d AND "+ + "tx_type = '%s') AS dt", fromDate, toDate, "exchangeWithValue") + + var totalAddressesOfRedeemedCoupons int64 + if err = impl.rdbHandle.QueryRow(rawQuery).Scan( + &totalAddressesOfRedeemedCoupons, + ); err != nil { + if errors.Is(err, rdb.ErrNoRows) { + return -1, rdb.ErrNoRows + } + return -1, fmt.Errorf("error scanning addresses of redeemed coupons by time range row: %v: %w", err, rdb.ErrQuery) + } + return totalAddressesOfRedeemedCoupons, nil +} + type ReportDashboardData struct { DateTime string `json:"dateTime,omitempty"` TotalTxOfRedeemedCoupons int64 `json:"totalTxOfRedeemedCoupons"` From 2124353bf7d058cf6310c0d1c4ab56d065fa1b55 Mon Sep 17 00:00:00 2001 From: trung Date: Wed, 5 Jul 2023 19:06:46 +0700 Subject: [PATCH 03/11] fix: count total addresses directly --- .../httpapi/handlers/report_dashboard.go | 18 ++++++++++++++++++ .../report_dashboard/view/report_dashboard.go | 7 +++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/infrastructure/httpapi/handlers/report_dashboard.go b/infrastructure/httpapi/handlers/report_dashboard.go index d2c3137..3c16652 100644 --- a/infrastructure/httpapi/handlers/report_dashboard.go +++ b/infrastructure/httpapi/handlers/report_dashboard.go @@ -123,6 +123,24 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht } reportDashboardOverall.Overall.TotalActiveAddresses = totalActiveAddresses + totalStakingAddresses, err := handler.reportDashboardView.GetStakingAddressesByTimeRangeDirectly(fromDate, toDate) + if err != nil { + handler.logger.Errorf("error get staking addresses by time range: %v", err) + prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) + httpapi.InternalServerError(ctx) + return + } + reportDashboardOverall.Overall.TotalStakingAddresses = totalStakingAddresses + + totalRedeemedCouponsAddresses, err := handler.reportDashboardView.GetAddressesOfRedeemedCouponsByTimeRangeDirectly(fromDate, toDate) + if err != nil { + handler.logger.Errorf("error get redeemed coupons addresses by time range: %v", err) + prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) + httpapi.InternalServerError(ctx) + return + } + reportDashboardOverall.Overall.TotalRedeemedCouponAddresses = totalRedeemedCouponsAddresses + prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(200), "GET", time.Since(startTime).Milliseconds()) handler.astraCache.Set(cacheKey, reportDashboardOverall, utils.TIME_CACHE_LONG) httpapi.SuccessNotWrappedResult(ctx, reportDashboardOverall) diff --git a/projection/report_dashboard/view/report_dashboard.go b/projection/report_dashboard/view/report_dashboard.go index 744aaef..abb2c63 100644 --- a/projection/report_dashboard/view/report_dashboard.go +++ b/projection/report_dashboard/view/report_dashboard.go @@ -77,7 +77,7 @@ func (impl *ReportDashboard) GetReportDashboardByTimeRange(from string, to strin rawQuery := fmt.Sprintf("SELECT rd.date_time, rd.total_transaction_of_redeemed_coupons, rd.total_redeemed_coupon_addresses, "+ "rd.total_asa_of_redeemed_coupons, rd.total_staking_transactions, rd.total_staking_addresses, "+ "rd.total_asa_staked, rd.total_new_addresses, rd.total_asa_withdrawn_from_tiki, rd.total_asa_on_chain_rewards, "+ - "cs.number_of_transactions "+ + "cs.number_of_transactions, cs.active_addresses "+ "FROM report_dashboard AS rd "+ "INNER JOIN chain_stats AS cs ON rd.date_time = cs.date_time "+ "WHERE rd.date_time >= %d AND rd.date_time < %d ORDER BY rd.date_time ASC", fromDate, toDate) @@ -110,6 +110,7 @@ func (impl *ReportDashboard) GetReportDashboardByTimeRange(from string, to strin &totalAsaWithdrawnFromTiki, &totalAsaOnchainRewards, &result.TotalTransactions, + &result.TotalActiveAddresses, ); err != nil { if errors.Is(err, rdb.ErrNoRows) { return ReportDashboardOverall{}, rdb.ErrNoRows @@ -144,8 +145,6 @@ func (impl *ReportDashboard) GetReportDashboardByTimeRange(from string, to strin reportDashboardOverall.Overall.TotalAsaOnchainRewards += reportDashboardData.TotalAsaOnchainRewards reportDashboardOverall.Overall.TotalNewAddresses += reportDashboardData.TotalNewAddresses - reportDashboardOverall.Overall.TotalRedeemedCouponAddresses += reportDashboardData.TotalRedeemedCouponAddresses - reportDashboardOverall.Overall.TotalStakingAddresses += reportDashboardData.TotalStakingAddresses reportDashboardOverall.Overall.TotalStakingTransactions += reportDashboardData.TotalStakingTransactions reportDashboardOverall.Overall.TotalTransactions += reportDashboardData.TotalTransactions reportDashboardOverall.Overall.TotalTxOfRedeemedCoupons += reportDashboardData.TotalTxOfRedeemedCoupons @@ -269,7 +268,7 @@ type ReportDashboardData struct { TotalAsaWithdrawnFromTiki float64 `json:"totalAsaWithdrawnFromTiki"` TotalAsaOnchainRewards float64 `json:"totalAsaOnchainRewards"` TotalTransactions int64 `json:"totalTransactions"` - TotalActiveAddresses int64 `json:"totalActiveAddresses,omitempty"` + TotalActiveAddresses int64 `json:"totalActiveAddresses"` TotalUpToDateAddresses int64 `json:"totalUpToDateAddresses,omitempty"` TotalUpToDateTransactions int64 `json:"totalUpToDateTransactions,omitempty"` } From eb32b411aaa6bab1d96f39a1f8c8eeec97249998 Mon Sep 17 00:00:00 2001 From: trung Date: Wed, 5 Jul 2023 21:10:02 +0700 Subject: [PATCH 04/11] refactor: update report_dashboard migration --- migrations/20230627092718_report_dashboard.up.sql | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/migrations/20230627092718_report_dashboard.up.sql b/migrations/20230627092718_report_dashboard.up.sql index 8ea91ea..a9aab2f 100644 --- a/migrations/20230627092718_report_dashboard.up.sql +++ b/migrations/20230627092718_report_dashboard.up.sql @@ -22,4 +22,11 @@ CREATE TABLE report_dashboard ( -- total_asa_withdrawn_from_tiki VARCHAR DEFAULT '' NOT NULL, total_asa_on_chain_rewards VARCHAR DEFAULT '' NOT NULL + -- Group by + total_active_addresses_weekly INT DEFAULT 0 NOT NULL, + total_active_addresses_monthly INT DEFAULT 0 NOT NULL, + total_staking_addresses_weekly INT DEFAULT 0 NOT NULL, + total_staking_addresses_monthly INT DEFAULT 0 NOT NULL, + total_redeemed_coupons_addresses_weekly INT DEFAULT 0 NOT NULL, + total_redeemed_coupons_addresses_monthly INT DEFAULT 0 NOT NULL; ); \ No newline at end of file From 68a16f77951685b3cc6bc62dc15d1dd10cfee4b5 Mon Sep 17 00:00:00 2001 From: trung Date: Wed, 5 Jul 2023 22:57:02 +0700 Subject: [PATCH 05/11] refactor: add cronjob update addresses weekly --- .../rdbreportdashboard/rdbreportdashboard.go | 116 ++++++++++++++++++ bootstrap/app.go | 101 ++++++++++++--- 2 files changed, 199 insertions(+), 18 deletions(-) diff --git a/appinterface/rdbreportdashboard/rdbreportdashboard.go b/appinterface/rdbreportdashboard/rdbreportdashboard.go index 556f842..ba86705 100644 --- a/appinterface/rdbreportdashboard/rdbreportdashboard.go +++ b/appinterface/rdbreportdashboard/rdbreportdashboard.go @@ -313,6 +313,50 @@ func (impl *RDbReportDashboard) UpdateTotalAddressesOfRedeemedCouponsWithRDbHand return nil } +func (impl *RDbReportDashboard) UpdateTotalAddressesOfRedeemedCouponsWeeklyWithRDbHandle(currentDate int64, nextDate int64) error { + startTime := time.Now() + recordMethod := "UpdateTotalAddressesOfRedeemedCouponsWithRDbHandle" + + if err := impl.init(currentDate); err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error initializing report dashboard %v", err) + } + + rawQuery := fmt.Sprintf( + "COUNT(*) "+ + "FROM (SELECT DISTINCT from_address "+ + "FROM view_transactions "+ + "WHERE "+ + "block_time >= %d AND "+ + "block_time < %d AND "+ + "tx_type = '%s') AS dt", currentDate, nextDate, "exchangeWithValue") + + addressesOfRedeemedCouponsCountSubQuery := impl.selectRDbHandle.StmtBuilder.Select(rawQuery) + sql, args, err := impl.selectRDbHandle.StmtBuilder.Update( + impl.table, + ).Set( + "total_redeemed_coupon_addresses_weekly", impl.selectRDbHandle.StmtBuilder.SubQuery(addressesOfRedeemedCouponsCountSubQuery), + ).Where( + "date_time = ?", currentDate, + ).ToSql() + if err != nil { + return fmt.Errorf("error building total addresses of redeemed coupons weekly update SQL: %v", err) + } + + execResult, err := impl.selectRDbHandle.Exec(sql, args...) + if err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error executing total addresses of redeemed coupons weekly update SQL: %v", err) + } + if execResult.RowsAffected() == 0 { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return errors.New("error executing total addresses of redeemed coupons weekly update SQL: no rows affected") + } + + prometheus.RecordApiExecTime(recordMethod, SUCCESS, "cronjob", time.Since(startTime).Milliseconds()) + return nil +} + func (impl *RDbReportDashboard) UpdateTotalAstraStakedWithRDbHandle(currentDate int64, nextDate int64) error { startTime := time.Now() recordMethod := "UpdateTotalAstraStakedWithRDbHandle" @@ -447,6 +491,78 @@ func (impl *RDbReportDashboard) UpdateTotalStakingAddressesWithRDbHandle(current return nil } +func (impl *RDbReportDashboard) UpdateTotalStakingAddressesWeeklyWithRDbHandle(currentDate int64, nextDate int64) error { + startTime := time.Now() + recordMethod := "UpdateTotalStakingAddressesWeeklyWithRDbHandle" + + if err := impl.init(currentDate); err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error initializing report dashboard %v", err) + } + + rawQuery := fmt.Sprintf( + "COUNT (*) FROM(SELECT DISTINCT CAST(value ->> 'content' AS jsonb) ->> 'delegatorAddress' "+ + "FROM "+ + "view_transactions, "+ + "jsonb_array_elements(view_transactions.messages) elems "+ + "WHERE "+ + "block_time >= %d AND "+ + "block_time < %d AND "+ + "value->>'type'='%s') AS tmp", currentDate, nextDate, "/cosmos.staking.v1beta1.MsgDelegate") + + addressesStakedCountSubQuery := impl.selectRDbHandle.StmtBuilder.Select(rawQuery) + sql, args, err := impl.selectRDbHandle.StmtBuilder.Update( + impl.table, + ).Set( + "total_staking_addresses_weekly", impl.selectRDbHandle.StmtBuilder.SubQuery(addressesStakedCountSubQuery), + ).Where( + "date_time = ?", currentDate, + ).ToSql() + if err != nil { + return fmt.Errorf("error building total staking addresses weekly update SQL: %v", err) + } + + execResult, err := impl.selectRDbHandle.Exec(sql, args...) + if err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error executing total staking addresses weekly update SQL: %v", err) + } + if execResult.RowsAffected() == 0 { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return errors.New("error executing total staking addresses weekly update SQL: no rows affected") + } + + prometheus.RecordApiExecTime(recordMethod, SUCCESS, "cronjob", time.Since(startTime).Milliseconds()) + return nil +} + +func (impl *RDbReportDashboard) UpdateTotalActiveAddressesWeeklyWithRDbHandle(currentDate int64, nextDate int64) error { + startTime := time.Now() + recordMethod := "UpdateTotalActiveAddressesWeeklyWithRDbHandle" + + if err := impl.init(currentDate); err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error initializing report dashboard %v", err) + } + + rawQuery := "UPDATE report_dashboard " + + "SET active_addresses_weekly = (SELECT COUNT(*) FROM (SELECT DISTINCT (from_address) FROM view_transactions WHERE block_time >= $1 AND block_time < $2) AS temp) " + + "WHERE date_time = $1" + + execResult, err := impl.selectRDbHandle.Exec(rawQuery, currentDate, nextDate) + if err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error executing total active addresses weekly update SQL: %v", err) + } + if execResult.RowsAffected() == 0 { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return errors.New("error executing total active addresses weekly update SQL: no rows affected") + } + + prometheus.RecordApiExecTime(recordMethod, SUCCESS, "cronjob", time.Since(startTime).Milliseconds()) + return nil +} + func (impl *RDbReportDashboard) UpdateTotalNewAddressesWithRDbHandle(currentDate int64, prevDate int64) error { startTime := time.Now() recordMethod := "UpdateTotalNewAddressesWithRDbHandle" diff --git a/bootstrap/app.go b/bootstrap/app.go index b83cc86..396826a 100644 --- a/bootstrap/app.go +++ b/bootstrap/app.go @@ -246,8 +246,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { // @every 0h0m5s // 59 59 0-23 * * * s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(1 * time.Second) @@ -263,8 +264,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() tikiAddress := a.config.CronjobReportDashboard.TikiAddress i = 0 var err error @@ -281,8 +283,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(5 * time.Second) @@ -298,8 +301,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(7 * time.Second) @@ -315,8 +319,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(9 * time.Second) @@ -332,8 +337,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(11 * time.Second) @@ -349,8 +355,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(13 * time.Second) @@ -366,8 +373,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - nextDate := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + nextDate := currentTime.Add(24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(15 * time.Second) @@ -383,8 +391,9 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { }) s.AddFunc("59 59 0-23 * * *", func() { - currentDate := time.Now().Truncate(24 * time.Hour).UnixNano() - prevDate := time.Now().Truncate(24 * time.Hour).Add(-24 * time.Hour).UnixNano() + currentTime := time.Now().Truncate(24 * time.Hour) + currentDate := currentTime.UnixNano() + prevDate := currentTime.Add(-24 * time.Hour).UnixNano() i = 0 var err error time.Sleep(17 * time.Second) @@ -399,6 +408,62 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { } }) + // Weekly update + // At 00:00 on Sunday + s.AddFunc("@weekly", func() { + endWeekTime := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour) + endWeekDate := endWeekTime.UnixNano() + startWeekDate := endWeekTime.Add(-7 * 24 * time.Hour).UnixNano() + i = 0 + var err error + time.Sleep(19 * time.Second) + for i < retry { + err = rdbReportDashboard.UpdateTotalAddressesOfRedeemedCouponsWeeklyWithRDbHandle(startWeekDate, endWeekDate) + if err == nil { + break + } + a.logger.Infof("failed to run UpdateTotalAddressesOfRedeemedCouponsWeeklyWithRDbHandle cronjob: %v", err) + time.Sleep(time.Duration(delayTime) * time.Second) + i += 1 + } + }) + + s.AddFunc("@weekly", func() { + endWeekTime := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour) + endWeekDate := endWeekTime.UnixNano() + startWeekDate := endWeekTime.Add(-7 * 24 * time.Hour).UnixNano() + i = 0 + var err error + time.Sleep(21 * time.Second) + for i < retry { + err = rdbReportDashboard.UpdateTotalStakingAddressesWeeklyWithRDbHandle(startWeekDate, endWeekDate) + if err == nil { + break + } + a.logger.Infof("failed to run UpdateTotalStakingAddressesWeeklyWithRDbHandle cronjob: %v", err) + time.Sleep(time.Duration(delayTime) * time.Second) + i += 1 + } + }) + + s.AddFunc("@weekly", func() { + endWeekTime := time.Now().Truncate(24 * time.Hour).Add(24 * time.Hour) + endWeekDate := endWeekTime.UnixNano() + startWeekDate := endWeekTime.Add(-7 * 24 * time.Hour).UnixNano() + i = 0 + var err error + time.Sleep(23 * time.Second) + for i < retry { + err = rdbReportDashboard.UpdateTotalActiveAddressesWeeklyWithRDbHandle(startWeekDate, endWeekDate) + if err == nil { + break + } + a.logger.Infof("failed to run UpdateTotalActiveAddressesWeeklyWithRDbHandle cronjob: %v", err) + time.Sleep(time.Duration(delayTime) * time.Second) + i += 1 + } + }) + s.Start() } } From ec57fc87ddbb63f104d9cfe993a10825c313bdac Mon Sep 17 00:00:00 2001 From: trung Date: Thu, 6 Jul 2023 09:27:39 +0700 Subject: [PATCH 06/11] add cronjob update total addresses monthly --- .../rdbreportdashboard/rdbreportdashboard.go | 118 +++++++++++++++++- bootstrap/app.go | 56 +++++++++ 2 files changed, 173 insertions(+), 1 deletion(-) diff --git a/appinterface/rdbreportdashboard/rdbreportdashboard.go b/appinterface/rdbreportdashboard/rdbreportdashboard.go index ba86705..a95d12c 100644 --- a/appinterface/rdbreportdashboard/rdbreportdashboard.go +++ b/appinterface/rdbreportdashboard/rdbreportdashboard.go @@ -315,7 +315,7 @@ func (impl *RDbReportDashboard) UpdateTotalAddressesOfRedeemedCouponsWithRDbHand func (impl *RDbReportDashboard) UpdateTotalAddressesOfRedeemedCouponsWeeklyWithRDbHandle(currentDate int64, nextDate int64) error { startTime := time.Now() - recordMethod := "UpdateTotalAddressesOfRedeemedCouponsWithRDbHandle" + recordMethod := "UpdateTotalAddressesOfRedeemedCouponsWeeklyWithRDbHandle" if err := impl.init(currentDate); err != nil { prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) @@ -357,6 +357,50 @@ func (impl *RDbReportDashboard) UpdateTotalAddressesOfRedeemedCouponsWeeklyWithR return nil } +func (impl *RDbReportDashboard) UpdateTotalAddressesOfRedeemedCouponsMonthlyWithRDbHandle(currentDate int64, nextDate int64) error { + startTime := time.Now() + recordMethod := "UpdateTotalAddressesOfRedeemedCouponsMonthlyWithRDbHandle" + + if err := impl.init(currentDate); err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error initializing report dashboard %v", err) + } + + rawQuery := fmt.Sprintf( + "COUNT(*) "+ + "FROM (SELECT DISTINCT from_address "+ + "FROM view_transactions "+ + "WHERE "+ + "block_time >= %d AND "+ + "block_time < %d AND "+ + "tx_type = '%s') AS dt", currentDate, nextDate, "exchangeWithValue") + + addressesOfRedeemedCouponsCountSubQuery := impl.selectRDbHandle.StmtBuilder.Select(rawQuery) + sql, args, err := impl.selectRDbHandle.StmtBuilder.Update( + impl.table, + ).Set( + "total_redeemed_coupon_addresses_monthly", impl.selectRDbHandle.StmtBuilder.SubQuery(addressesOfRedeemedCouponsCountSubQuery), + ).Where( + "date_time = ?", currentDate, + ).ToSql() + if err != nil { + return fmt.Errorf("error building total addresses of redeemed coupons monthly update SQL: %v", err) + } + + execResult, err := impl.selectRDbHandle.Exec(sql, args...) + if err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error executing total addresses of redeemed coupons monthly update SQL: %v", err) + } + if execResult.RowsAffected() == 0 { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return errors.New("error executing total addresses of redeemed coupons monthly update SQL: no rows affected") + } + + prometheus.RecordApiExecTime(recordMethod, SUCCESS, "cronjob", time.Since(startTime).Milliseconds()) + return nil +} + func (impl *RDbReportDashboard) UpdateTotalAstraStakedWithRDbHandle(currentDate int64, nextDate int64) error { startTime := time.Now() recordMethod := "UpdateTotalAstraStakedWithRDbHandle" @@ -536,6 +580,51 @@ func (impl *RDbReportDashboard) UpdateTotalStakingAddressesWeeklyWithRDbHandle(c return nil } +func (impl *RDbReportDashboard) UpdateTotalStakingAddressesMonthlyWithRDbHandle(currentDate int64, nextDate int64) error { + startTime := time.Now() + recordMethod := "UpdateTotalStakingAddressesMonthlyWithRDbHandle" + + if err := impl.init(currentDate); err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error initializing report dashboard %v", err) + } + + rawQuery := fmt.Sprintf( + "COUNT (*) FROM(SELECT DISTINCT CAST(value ->> 'content' AS jsonb) ->> 'delegatorAddress' "+ + "FROM "+ + "view_transactions, "+ + "jsonb_array_elements(view_transactions.messages) elems "+ + "WHERE "+ + "block_time >= %d AND "+ + "block_time < %d AND "+ + "value->>'type'='%s') AS tmp", currentDate, nextDate, "/cosmos.staking.v1beta1.MsgDelegate") + + addressesStakedCountSubQuery := impl.selectRDbHandle.StmtBuilder.Select(rawQuery) + sql, args, err := impl.selectRDbHandle.StmtBuilder.Update( + impl.table, + ).Set( + "total_staking_addresses_monthly", impl.selectRDbHandle.StmtBuilder.SubQuery(addressesStakedCountSubQuery), + ).Where( + "date_time = ?", currentDate, + ).ToSql() + if err != nil { + return fmt.Errorf("error building total staking addresses monthly update SQL: %v", err) + } + + execResult, err := impl.selectRDbHandle.Exec(sql, args...) + if err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error executing total staking addresses monthly update SQL: %v", err) + } + if execResult.RowsAffected() == 0 { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return errors.New("error executing total staking addresses monthly update SQL: no rows affected") + } + + prometheus.RecordApiExecTime(recordMethod, SUCCESS, "cronjob", time.Since(startTime).Milliseconds()) + return nil +} + func (impl *RDbReportDashboard) UpdateTotalActiveAddressesWeeklyWithRDbHandle(currentDate int64, nextDate int64) error { startTime := time.Now() recordMethod := "UpdateTotalActiveAddressesWeeklyWithRDbHandle" @@ -563,6 +652,33 @@ func (impl *RDbReportDashboard) UpdateTotalActiveAddressesWeeklyWithRDbHandle(cu return nil } +func (impl *RDbReportDashboard) UpdateTotalActiveAddressesMonthlyWithRDbHandle(currentDate int64, nextDate int64) error { + startTime := time.Now() + recordMethod := "UpdateTotalActiveAddressesMonthlyWithRDbHandle" + + if err := impl.init(currentDate); err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error initializing report dashboard %v", err) + } + + rawQuery := "UPDATE report_dashboard " + + "SET active_addresses_monthly = (SELECT COUNT(*) FROM (SELECT DISTINCT (from_address) FROM view_transactions WHERE block_time >= $1 AND block_time < $2) AS temp) " + + "WHERE date_time = $1" + + execResult, err := impl.selectRDbHandle.Exec(rawQuery, currentDate, nextDate) + if err != nil { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return fmt.Errorf("error executing total active addresses monthly update SQL: %v", err) + } + if execResult.RowsAffected() == 0 { + prometheus.RecordApiExecTime(recordMethod, FAIL, "cronjob", time.Since(startTime).Milliseconds()) + return errors.New("error executing total active addresses monthly update SQL: no rows affected") + } + + prometheus.RecordApiExecTime(recordMethod, SUCCESS, "cronjob", time.Since(startTime).Milliseconds()) + return nil +} + func (impl *RDbReportDashboard) UpdateTotalNewAddressesWithRDbHandle(currentDate int64, prevDate int64) error { startTime := time.Now() recordMethod := "UpdateTotalNewAddressesWithRDbHandle" diff --git a/bootstrap/app.go b/bootstrap/app.go index 396826a..f40d699 100644 --- a/bootstrap/app.go +++ b/bootstrap/app.go @@ -464,6 +464,62 @@ func (a *app) RunCronJobsReportDashboard(rdbHandle *rdb.Handle) { } }) + // Monthly update + // At 00:00 on the first day-of-month + s.AddFunc("@monthly", func() { + firstDayOfMonthTime := time.Now().Truncate(24 * time.Hour) + startMonthDate := firstDayOfMonthTime.UnixNano() + endMonthDate := firstDayOfMonthTime.AddDate(0, 1, 0).UnixNano() + i = 0 + var err error + time.Sleep(25 * time.Second) + for i < retry { + err = rdbReportDashboard.UpdateTotalAddressesOfRedeemedCouponsMonthlyWithRDbHandle(startMonthDate, endMonthDate) + if err == nil { + break + } + a.logger.Infof("failed to run UpdateTotalAddressesOfRedeemedCouponsMonthlyWithRDbHandle cronjob: %v", err) + time.Sleep(time.Duration(delayTime) * time.Second) + i += 1 + } + }) + + s.AddFunc("@monthly", func() { + firstDayOfMonthTime := time.Now().Truncate(24 * time.Hour) + startMonthDate := firstDayOfMonthTime.UnixNano() + endMonthDate := firstDayOfMonthTime.AddDate(0, 1, 0).UnixNano() + i = 0 + var err error + time.Sleep(27 * time.Second) + for i < retry { + err = rdbReportDashboard.UpdateTotalStakingAddressesMonthlyWithRDbHandle(startMonthDate, endMonthDate) + if err == nil { + break + } + a.logger.Infof("failed to run UpdateTotalStakingAddressesMonthlyWithRDbHandle cronjob: %v", err) + time.Sleep(time.Duration(delayTime) * time.Second) + i += 1 + } + }) + + s.AddFunc("@monthly", func() { + firstDayOfMonthTime := time.Now().Truncate(24 * time.Hour) + startMonthDate := firstDayOfMonthTime.UnixNano() + endMonthDate := firstDayOfMonthTime.AddDate(0, 1, 0).UnixNano() + i = 0 + var err error + time.Sleep(29 * time.Second) + for i < retry { + err = rdbReportDashboard.UpdateTotalActiveAddressesMonthlyWithRDbHandle(startMonthDate, endMonthDate) + if err == nil { + break + } + a.logger.Infof("failed to run UpdateTotalActiveAddressesMonthlyWithRDbHandle cronjob: %v", err) + time.Sleep(time.Duration(delayTime) * time.Second) + i += 1 + } + }) + s.Start() } } From 39d8f8a2831f1d4eceee5012f77e12547f221278 Mon Sep 17 00:00:00 2001 From: trung Date: Thu, 6 Jul 2023 09:34:09 +0700 Subject: [PATCH 07/11] fix: wrong column name --- appinterface/rdbreportdashboard/rdbreportdashboard.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/appinterface/rdbreportdashboard/rdbreportdashboard.go b/appinterface/rdbreportdashboard/rdbreportdashboard.go index a95d12c..cb4c454 100644 --- a/appinterface/rdbreportdashboard/rdbreportdashboard.go +++ b/appinterface/rdbreportdashboard/rdbreportdashboard.go @@ -635,7 +635,7 @@ func (impl *RDbReportDashboard) UpdateTotalActiveAddressesWeeklyWithRDbHandle(cu } rawQuery := "UPDATE report_dashboard " + - "SET active_addresses_weekly = (SELECT COUNT(*) FROM (SELECT DISTINCT (from_address) FROM view_transactions WHERE block_time >= $1 AND block_time < $2) AS temp) " + + "SET total_active_addresses_weekly = (SELECT COUNT(*) FROM (SELECT DISTINCT (from_address) FROM view_transactions WHERE block_time >= $1 AND block_time < $2) AS temp) " + "WHERE date_time = $1" execResult, err := impl.selectRDbHandle.Exec(rawQuery, currentDate, nextDate) @@ -662,7 +662,7 @@ func (impl *RDbReportDashboard) UpdateTotalActiveAddressesMonthlyWithRDbHandle(c } rawQuery := "UPDATE report_dashboard " + - "SET active_addresses_monthly = (SELECT COUNT(*) FROM (SELECT DISTINCT (from_address) FROM view_transactions WHERE block_time >= $1 AND block_time < $2) AS temp) " + + "SET total_active_addresses_monthly = (SELECT COUNT(*) FROM (SELECT DISTINCT (from_address) FROM view_transactions WHERE block_time >= $1 AND block_time < $2) AS temp) " + "WHERE date_time = $1" execResult, err := impl.selectRDbHandle.Exec(rawQuery, currentDate, nextDate) From dae945a177adba30a7fd8a782245d7cfea3ac16c Mon Sep 17 00:00:00 2001 From: trung Date: Fri, 7 Jul 2023 09:50:13 +0700 Subject: [PATCH 08/11] refactor: remove unused evm untils --- bootstrap/app.go | 2 +- infrastructure/kafka/consumer/worker/internal_txs_consumer.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/bootstrap/app.go b/bootstrap/app.go index f40d699..1a99fb0 100644 --- a/bootstrap/app.go +++ b/bootstrap/app.go @@ -122,7 +122,7 @@ func (a *app) Run() { if a.config.KafkaService.EnableConsumer { sigchan := make(chan os.Signal, 1) go func() { - if runErr := worker_consumer.RunInternalTxsConsumer(a.rdbConn.ToHandle(), a.config, a.logger, a.evmUtil, sigchan); runErr != nil { + if runErr := worker_consumer.RunInternalTxsConsumer(a.rdbConn.ToHandle(), a.config, a.logger, sigchan); runErr != nil { a.logger.Panicf("%v", runErr) } }() diff --git a/infrastructure/kafka/consumer/worker/internal_txs_consumer.go b/infrastructure/kafka/consumer/worker/internal_txs_consumer.go index eff4389..c2031e5 100644 --- a/infrastructure/kafka/consumer/worker/internal_txs_consumer.go +++ b/infrastructure/kafka/consumer/worker/internal_txs_consumer.go @@ -17,7 +17,6 @@ import ( "github.com/AstraProtocol/astra-indexing/external/tmcosmosutils" "github.com/AstraProtocol/astra-indexing/external/utctime" utils "github.com/AstraProtocol/astra-indexing/infrastructure" - "github.com/AstraProtocol/astra-indexing/internal/evm" "github.com/AstraProtocol/astra-indexing/projection/account_transaction" accountTransactionView "github.com/AstraProtocol/astra-indexing/projection/account_transaction/view" transactionView "github.com/AstraProtocol/astra-indexing/projection/transaction/view" @@ -33,7 +32,7 @@ var rewardType = map[string]bool{ "exchangeWithValue": true, } -func RunInternalTxsConsumer(rdbHandle *rdb.Handle, config *config.Config, logger applogger.Logger, evmUtil evm.EvmUtils, sigchan chan os.Signal) error { +func RunInternalTxsConsumer(rdbHandle *rdb.Handle, config *config.Config, logger applogger.Logger, sigchan chan os.Signal) error { signal.Notify(sigchan, os.Interrupt) rdbAccountTransactionsView := accountTransactionView.NewAccountTransactions(rdbHandle) From 6b6641149d4864020e26cc1a2953666859c4de3b Mon Sep 17 00:00:00 2001 From: trung Date: Fri, 7 Jul 2023 17:06:25 +0700 Subject: [PATCH 09/11] refactor: report_dashboard handle groupBy --- .../httpapi/handlers/report_dashboard.go | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/infrastructure/httpapi/handlers/report_dashboard.go b/infrastructure/httpapi/handlers/report_dashboard.go index 3c16652..1fae7a3 100644 --- a/infrastructure/httpapi/handlers/report_dashboard.go +++ b/infrastructure/httpapi/handlers/report_dashboard.go @@ -65,6 +65,9 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht var fromDate string var toDate string + var groupBy string + var reportDashboardOverall report_dashboard_view.ReportDashboardOverall + var err error if string(ctx.QueryArgs().Peek("fromDate")) == "" { prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) @@ -79,21 +82,32 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht fromDate = string(ctx.QueryArgs().Peek("fromDate")) - cacheKey := fmt.Sprintf("GetReportDashboardByTimeRange%s%s", fromDate, toDate) + groupBy = string(ctx.QueryArgs().Peek("groupBy")) + if groupBy != "" { + if groupBy != "weekly" && groupBy != "monthly" { + prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) + httpapi.BadRequest(ctx, fmt.Errorf("groupBy %s is not supported", groupBy)) + return + } + } + + cacheKey := fmt.Sprintf("GetReportDashboardByTimeRange%s%s%s", fromDate, toDate, groupBy) var reportDashboardOverallCache report_dashboard_view.ReportDashboardOverall - err := handler.astraCache.Get(cacheKey, &reportDashboardOverallCache) + err = handler.astraCache.Get(cacheKey, &reportDashboardOverallCache) if err == nil { prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(200), "GET", time.Since(startTime).Milliseconds()) httpapi.SuccessNotWrappedResult(ctx, reportDashboardOverallCache) return } - reportDashboardOverall, err := handler.reportDashboardView.GetReportDashboardByTimeRange(fromDate, toDate) - if err != nil { - handler.logger.Errorf("error get report dashboard by time range: %v", err) - prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) - httpapi.InternalServerError(ctx) - return + if groupBy == "" { + reportDashboardOverall, err = handler.reportDashboardView.GetReportDashboardByTimeRange(fromDate, toDate) + if err != nil { + handler.logger.Errorf("error get report dashboard by time range: %v", err) + prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) + httpapi.InternalServerError(ctx) + return + } } totalUpToDateTransactions, err := handler.transactionsTotalView.FindBy("-") @@ -116,7 +130,7 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht totalActiveAddresses, err := handler.reportDashboardView.GetActiveAddressesByTimeRangeDirectly(fromDate, toDate) if err != nil { - handler.logger.Errorf("error get active addresses by time range: %v", err) + handler.logger.Errorf("error get active addresses by time range directly: %v", err) prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) httpapi.InternalServerError(ctx) return @@ -125,7 +139,7 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht totalStakingAddresses, err := handler.reportDashboardView.GetStakingAddressesByTimeRangeDirectly(fromDate, toDate) if err != nil { - handler.logger.Errorf("error get staking addresses by time range: %v", err) + handler.logger.Errorf("error get staking addresses by time range directly: %v", err) prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) httpapi.InternalServerError(ctx) return @@ -134,7 +148,7 @@ func (handler *ReportDashboardHandler) GetReportDashboardByTimeRange(ctx *fastht totalRedeemedCouponsAddresses, err := handler.reportDashboardView.GetAddressesOfRedeemedCouponsByTimeRangeDirectly(fromDate, toDate) if err != nil { - handler.logger.Errorf("error get redeemed coupons addresses by time range: %v", err) + handler.logger.Errorf("error get redeemed coupons addresses by time range directly: %v", err) prometheus.RecordApiExecTime(recordMethod, strconv.Itoa(fasthttp.StatusBadRequest), "GET", time.Since(startTime).Milliseconds()) httpapi.InternalServerError(ctx) return From 1099aafad2e0ba8e4a0b1bd27ef0555ab0769312 Mon Sep 17 00:00:00 2001 From: trung Date: Fri, 4 Aug 2023 10:19:54 +0700 Subject: [PATCH 10/11] refactor: add evmUtil --- infrastructure/kafka/consumer/worker/internal_txs_consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/infrastructure/kafka/consumer/worker/internal_txs_consumer.go b/infrastructure/kafka/consumer/worker/internal_txs_consumer.go index dcc4f2a..ce27230 100644 --- a/infrastructure/kafka/consumer/worker/internal_txs_consumer.go +++ b/infrastructure/kafka/consumer/worker/internal_txs_consumer.go @@ -12,6 +12,7 @@ import ( "github.com/AstraProtocol/astra-indexing/appinterface/rdb" "github.com/AstraProtocol/astra-indexing/bootstrap/config" "github.com/AstraProtocol/astra-indexing/infrastructure/kafka/consumer" + "github.com/AstraProtocol/astra-indexing/internal/evm" "github.com/segmentio/kafka-go" applogger "github.com/AstraProtocol/astra-indexing/external/logger" @@ -32,7 +33,7 @@ var rewardType = map[string]bool{ "exchangeWithValue": true, } -func RunInternalTxsConsumer(rdbHandle *rdb.Handle, config *config.Config, logger applogger.Logger, sigchan chan os.Signal) error { +func RunInternalTxsConsumer(rdbHandle *rdb.Handle, config *config.Config, logger applogger.Logger, evmUtil evm.EvmUtils, sigchan chan os.Signal) error { signal.Notify(sigchan, os.Interrupt) rdbAccountTransactionsView := accountTransactionView.NewAccountTransactions(rdbHandle) From bc94ad02d72cf23065d2e0c4042db293bf547d05 Mon Sep 17 00:00:00 2001 From: trung Date: Fri, 4 Aug 2023 10:22:36 +0700 Subject: [PATCH 11/11] refactor: add evmUtil --- bootstrap/app.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bootstrap/app.go b/bootstrap/app.go index 8644053..236f4c2 100644 --- a/bootstrap/app.go +++ b/bootstrap/app.go @@ -122,7 +122,7 @@ func (a *app) Run() { if a.config.KafkaService.EnableConsumer { sigchan := make(chan os.Signal, 1) go func() { - if runErr := worker_consumer.RunInternalTxsConsumer(a.rdbConn.ToHandle(), a.config, a.logger, sigchan); runErr != nil { + if runErr := worker_consumer.RunInternalTxsConsumer(a.rdbConn.ToHandle(), a.config, a.logger, a.evmUtil, sigchan); runErr != nil { a.logger.Panicf("%v", runErr) } }()