From a1b195476d06e449973737385553a0cdd36e3128 Mon Sep 17 00:00:00 2001 From: Daniel Blando Date: Wed, 20 Dec 2023 13:38:04 -0800 Subject: [PATCH] Filter empty labels from sharding key (#5717) * Filter empty labels from sharding key Signed-off-by: Daniel Deluiggi * Update changelog Signed-off-by: Daniel Deluiggi --------- Signed-off-by: Daniel Deluiggi --- CHANGELOG.md | 2 + pkg/distributor/distributor.go | 6 +- pkg/distributor/distributor_test.go | 89 ++++++++++++++++++++++++++++- 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6bd2071b8..9a5cff31d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 * [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686 * [ENHANCEMENT] Query Frontend: Log number of split queries in `query stats` log. #5703 +* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 + ## 1.16.0 2023-11-20 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d6f269a262..938584eba3 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -466,8 +466,10 @@ func shardByUser(userID string) uint32 { func shardByAllLabels(userID string, labels []cortexpb.LabelAdapter) uint32 { h := shardByUser(userID) for _, label := range labels { - h = ingester_client.HashAdd32(h, label.Name) - h = ingester_client.HashAdd32(h, label.Value) + if len(label.Value) > 0 { + h = ingester_client.HashAdd32(h, label.Name) + h = ingester_client.HashAdd32(h, label.Value) + } } return h } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index db54d8ccba..fd7efa062b 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2391,6 +2391,7 @@ type prepConfig struct { replicationFactor int enableTracker bool errFail error + tokens [][]uint32 } func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) { @@ -2417,6 +2418,12 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] ingesterDescs := map[string]ring.InstanceDesc{} ingestersByAddr := map[string]*mockIngester{} for i := range ingesters { + var tokens []uint32 + if len(cfg.tokens) > i { + tokens = cfg.tokens[i] + } else { + tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)} + } addr := fmt.Sprintf("%d", i) ingesterDescs[addr] = ring.InstanceDesc{ Addr: addr, @@ -2424,7 +2431,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, [] State: ring.ACTIVE, Timestamp: time.Now().Unix(), RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(), - Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}, + Tokens: tokens, } ingestersByAddr[addr] = ingesters[i] } @@ -3303,6 +3310,86 @@ func TestDistributor_Push_Relabel(t *testing.T) { } } +func TestDistributor_Push_EmptyLabel(t *testing.T) { + t.Parallel() + ctx := user.InjectOrgID(context.Background(), "pushEmptyLabel") + type testcase struct { + name string + inputSeries []labels.Labels + expectedSeries labels.Labels + } + + cases := []testcase{ + { + name: "with empty label", + inputSeries: []labels.Labels{ + { //Token 1106054332 without filtering + {Name: "__name__", Value: "foo"}, + {Name: "empty", Value: ""}, + }, + { //Token 3827924124 without filtering + {Name: "__name__", Value: "foo"}, + {Name: "changHash", Value: ""}, + }, + }, + expectedSeries: labels.Labels{ + //Token 1797290973 + {Name: "__name__", Value: "foo"}, + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + + token := [][]uint32{ + {1}, + {2}, + {3}, + {1106054333}, + {5}, + {6}, + {7}, + {8}, + {9}, + {3827924125}, + } + + ds, ingesters, _, _ := prepare(t, prepConfig{ + numIngesters: 10, + happyIngesters: 10, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + replicationFactor: 1, + shuffleShardSize: 10, + tokens: token, + }) + + // Push the series to the distributor + req := mockWriteRequest(tc.inputSeries, 1, 1) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + ingesterWithSeries := 0 + for i := range ingesters { + timeseries := ingesters[i].series() + if len(timeseries) > 0 { + ingesterWithSeries++ + } + } + assert.Equal(t, 1, ingesterWithSeries) + }) + } +} + func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) { t.Parallel() metricRelabelConfigs := []*relabel.Config{