Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bcs-monitor升级thanos版本 #2342

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions bcs-services/bcs-monitor/cmd/bcs-monitor/storegw.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func runStoreGW(ctx context.Context, g *run.Group, opt *option) error {
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
endpoints = query.NewEndpointSet(kitLogger, opt.reg,
endpoints = query.NewEndpointSet(time.Now, kitLogger, opt.reg,
func() (specs []*query.GRPCEndpointSpec) {
for _, addr := range gw.GetStoreAddrs() {
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
Expand All @@ -123,6 +123,7 @@ func runStoreGW(ctx context.Context, g *run.Group, opt *option) error {
},
dialOpts,
time.Second*30,
time.Second*30,
)

var cancel context.CancelFunc
Expand Down Expand Up @@ -159,10 +160,10 @@ func runStoreGW(ctx context.Context, g *run.Group, opt *option) error {
func registryProxyStore(g *run.Group, kitLogger gokit.Logger, opt *option, endpoints *query.EndpointSet) {

proxyStore := store.NewProxyStore(kitLogger, opt.reg, endpoints.GetStoreClients, component.Query, nil,
time.Minute*2)
time.Minute*2, store.LazyRetrieval)
grpcProbe := prober.NewGRPC()
grpcSrv := grpcserver.New(kitLogger, opt.reg, nil, nil, nil, component.Store, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(proxyStore)),
grpcserver.WithServer(store.RegisterStoreServer(proxyStore, kitLogger)),
grpcserver.WithListen(utils.GetListenAddr(bindAddress, grpcPort)),
grpcserver.WithGracePeriod(time.Duration(0)),
grpcserver.WithMaxConnAge(time.Minute*5), // 5分钟主动重连, pod 扩容等需要
Expand Down
133 changes: 84 additions & 49 deletions bcs-services/bcs-monitor/go.mod

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions bcs-services/bcs-monitor/pkg/api/metrics/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
package metrics

import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/Tencent/bk-bcs/bcs-services/bcs-monitor/pkg/component/promclient"
"github.com/Tencent/bk-bcs/bcs-services/bcs-monitor/pkg/rest"
bcstesting "github.com/Tencent/bk-bcs/bcs-services/bcs-monitor/pkg/testing"
)
Expand All @@ -42,3 +46,24 @@ func TestGetClusterOverview(t *testing.T) {
}
wg.Wait()
}

// QueryRange的基准测试,需要启动query, storegw
// 需要配置环境变量 eg:export TEST_CONFIG_FILE=D:/project_config/bcs-monitor/bcs_monitor.yml
func BenchmarkQueryRange(b *testing.B) {
for i := 0; i < b.N; i++ {
ctx := context.Background()
header := http.Header{}
// query配置的启动端口
rawURL := "http://127.0.0.1:10902"
// 前提是先把数据打入prometheus,才能查得到
promql := "myMetric{cluster_id=\"1\", provider=\"PROMETHEUS\", bcs_cluster_id=\"1\"}"
endTime := time.Now()
startTime := endTime.Add(-time.Hour)
// 默认只返回 60 个点
stepTime := endTime.Sub(startTime) / 60
_, err := promclient.QueryRange(ctx, rawURL, header, promql, startTime, endTime, stepTime)
if err != nil {
fmt.Println(err)
}
}
}
1 change: 1 addition & 0 deletions bcs-services/bcs-monitor/pkg/query/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
unhealthyStoreTimeout = time.Minute * 5 // 健康检查的时间
storeResponseTimeout = time.Second * 20 // 查询store的超时时间
defaultEvaluationInterval = time.Minute * 1 // 自查询的默认处理间隔。这里用不到
endpointInfoTimeout = time.Minute * 5 // thanos端点超时时间
)

var queryReplicaLabels = []string{"prometheus_replica"}
5 changes: 4 additions & 1 deletion bcs-services/bcs-monitor/pkg/query/discovery_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func resolveStoreProvider(ctxm context.Context, g *run.Group, dnsStoreProvider *
func getEndpoints(kitLogger gokit.Logger, reg *prometheus.Registry, strictStoreList []string,
dnsStoreProvider *dns.Provider, dialOpts []grpc.DialOption) *query.EndpointSet {
return query.NewEndpointSet(
time.Now,
kitLogger,
reg,
func() (specs []*query.GRPCEndpointSpec) {
Expand All @@ -177,6 +178,8 @@ func getEndpoints(kitLogger gokit.Logger, reg *prometheus.Registry, strictStoreL
},
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout, // thanos端点超时时间

)
}

Expand Down Expand Up @@ -373,7 +376,7 @@ func NewHTTPSDClient(ctx context.Context, kitLogger gokit.Logger, conf httpdisco
// Run File Service Discovery and update the store set when the files are modified.
u.Host = addr
conf.URL = u.String()
sdClient, err := httpdiscovery.NewDiscovery(&conf, kitLogger)
sdClient, err := httpdiscovery.NewDiscovery(&conf, kitLogger, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-monitor/pkg/query/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
func NewQueryableCreator(reg *prometheus.Registry, kitLogger gokit.Logger,
discoveryClient *DiscoveryClient) query.QueryableCreator {
proxy := store.NewProxyStore(kitLogger, reg, discoveryClient.Endpoints().GetStoreClients, component.Query, nil,
storeResponseTimeout)
storeResponseTimeout, store.LazyRetrieval)

queryableCreator := query.NewQueryableCreator(
kitLogger,
Expand Down
65 changes: 62 additions & 3 deletions bcs-services/bcs-monitor/pkg/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package query

import (
"context"
"math"
"path"
"time"

Expand All @@ -26,7 +27,9 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/promql"
v1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
Expand All @@ -35,6 +38,7 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/ui"

"github.com/Tencent/bk-bcs/bcs-services/bcs-monitor/pkg/config"
Expand Down Expand Up @@ -113,11 +117,25 @@ func NewQueryAPI(
prefix = path.Join(config.G.Web.RoutePrefix, config.QueryServicePrefix)
}
ui.NewQueryUI(kitLogger, discoveryClient.Endpoints(), prefix, "", "").Register(router, ins)

engineOpts := promql.EngineOpts{
Logger: kitLogger,
Reg: reg,
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
}
lookBackDeltaCreator := LookBackDeltaFactory(engineOpts, dynamicLookbackDelta)
queryTelemetryDurationQuantiles := []float64{0.1, 0.25, 0.75, 1.25, 1.75, 2.5, 3, 5, 10}
queryTelemetrySamplesQuantiles := []int64{100, 1000, 10000, 100000, 1000000}
queryTelemetrySeriesQuantiles := []int64{10, 100, 1000, 10000, 100000}
api := v1.NewQueryAPI(
kitLogger,
discoveryClient.Endpoints().GetEndpointStatus,
queryEngine,
queryEngine(math.MaxInt32),
lookBackDeltaCreator,
queryableCreator,
NewEmptyRuleClient(),
NewEmptyTargetClient(),
Expand All @@ -138,7 +156,13 @@ func NewQueryAPI(
false, // disableCORS
gate.New(
extprom.WrapRegistererWithPrefix("bcs_monitor_query_concurrent_", reg),
maxConcurrentQueries,
maxConcurrentQueries, "bcs_monitor_query",
),
store.NewSeriesStatsAggregator(
reg,
queryTelemetryDurationQuantiles,
queryTelemetrySamplesQuantiles,
queryTelemetrySeriesQuantiles,
),
reg,
)
Expand Down Expand Up @@ -183,3 +207,38 @@ func (a *QueryAPI) Close(err error) {
a.statusProber.NotReady(err)
a.srv.Shutdown(err)
}

// LookBackDeltaFactory creates from 1 to 3 lookback deltas depending on
// dynamicLookbackDelta and eo.LookbackDelta and returns a function
// that returns appropriate lookback delta for given maxSourceResolutionMillis.
func LookBackDeltaFactory(eo promql.EngineOpts, dynamicLookbackDelta bool) func(int64) time.Duration {
resolutions := []int64{downsample.ResLevel0}
if dynamicLookbackDelta {
resolutions = []int64{downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2}
}
var (
lds = make([]time.Duration, len(resolutions))
ld = eo.LookbackDelta.Milliseconds()
)

lookBackDelta := eo.LookbackDelta
for i, r := range resolutions {
if ld < r {
lookBackDelta = time.Duration(r) * time.Millisecond
}

lds[i] = lookBackDelta
}
return func(maxSourceResolutionMillis int64) time.Duration {
for i := len(resolutions) - 1; i >= 1; i-- {
left := resolutions[i-1]
if resolutions[i-1] < ld {
left = ld
}
if left < maxSourceResolutionMillis {
return lds[i]
}
}
return lds[0]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package clientutil
import (
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestGetCluterID(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-monitor/pkg/storegw/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewStore(ctx context.Context, logger log.Logger, reg *prometheus.Registry,
nilLogger := &NilLogger{}

g := grpcserver.New(nilLogger, _reg, nil, nil, nil, component.Store, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(storeSvr)),
grpcserver.WithServer(store.RegisterStoreServer(storeSvr, logger)),
grpcserver.WithListen(address),
grpcserver.WithGracePeriod(time.Duration(0)),
)
Expand Down