Skip to content

Commit

Permalink
filter out blocks marked for deletion in the bucket index ids fetcher…
Browse files Browse the repository at this point in the history
… too, so we are protected even when apply ignoreDeletionMarkerFilter the block has been deleted (cortexproject#5712)
  • Loading branch information
wenxu1024 authored Dec 19, 2023
1 parent c9d6ca8 commit aef8ad3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
10 changes: 7 additions & 3 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,14 +821,18 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)

var blockIDsFetcher block.BlockIDsFetcher
var fetcherULogger log.Logger
if c.storageCfg.BucketStore.BucketIndex.Enabled {
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(ulogger, c.bucketClient, userID, c.limits)
fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BucketIndexBlockIDsFetcher")
blockIDsFetcher = bucketindex.NewBlockIDsFetcher(fetcherULogger, c.bucketClient, userID, c.limits)

} else {
blockIDsFetcher = block.NewBaseBlockIDsFetcher(ulogger, bucket)
fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BaseBlockIDsFetcher")
blockIDsFetcher = block.NewBaseBlockIDsFetcher(fetcherULogger, bucket)
}

fetcher, err := block.NewMetaFetcher(
ulogger,
fetcherULogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
blockIDsFetcher,
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/tsdb/bucketindex/block_ids_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,16 @@ func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch ch
return nil, err
}

// Sent the active block ids
blocksMarkedForDeletion := idx.BlockDeletionMarks.GetULIDs()
blocksMarkedForDeletionMap := make(map[ulid.ULID]struct{})
for _, block := range blocksMarkedForDeletion {
blocksMarkedForDeletionMap[block] = struct{}{}
}
// Sent the ids of blocks not marked for deletion
for _, b := range idx.Blocks {
if _, ok := blocksMarkedForDeletionMap[b.ID]; ok {
continue
}
select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBlockIDsFetcher_Fetch(t *testing.T) {
blockIdsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
close(ch)
wg.Wait()
require.Equal(t, []ulid.ULID{block1.ID, block2.ID, block3.ID}, blockIds)
require.Equal(t, []ulid.ULID{block3.ID}, blockIds)
}

func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) {
Expand Down

0 comments on commit aef8ad3

Please sign in to comment.