From 3c231a1d02dc746992425736e81edd0f66682209 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 20 Nov 2023 12:35:25 -0800 Subject: [PATCH] fix ingester testcase Signed-off-by: Ben Ye --- pkg/ingester/ingester.go | 8 +++++--- pkg/ingester/ingester_test.go | 8 ++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cf1656c83c..8e5ca5f76a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -256,7 +256,8 @@ type userTSDB struct { lastUpdate atomic.Int64 // Thanos shipper used to ship blocks to the storage. - shipper Shipper + shipper Shipper + shipperMetadataFilePath string // When deletion marker is found for the tenant (checked before shipping), // shipping stops and TSDB is closed before reaching idle timeout time (if enabled). @@ -435,7 +436,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { // updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks. func (u *userTSDB) updateCachedShippedBlocks() error { - shipperMeta, err := shipper.ReadMetaFile(u.db.Dir()) + shipperMeta, err := shipper.ReadMetaFile(u.shipperMetadataFilePath) if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) { // If the meta file doesn't exist it means the shipper hasn't run yet. shipperMeta = &shipper.Meta{} @@ -606,7 +607,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer } } -// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage. +// New returns a new Ingester that uses Cortex block storage instead of chunks storage. func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { defaultInstanceLimits = &cfg.DefaultLimits if cfg.ingesterClientFactory == nil { @@ -2052,6 +2053,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { metadata.NoneFunc, "", ) + userDB.shipperMetadataFilePath = filepath.Join(userDB.db.Dir(), filepath.Clean(shipper.DefaultMetaFilename)) // Initialise the shipper blocks cache. if err := userDB.updateCachedShippedBlocks(); err != nil { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 132003f4ff..78dd53f7b9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2821,7 +2821,7 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) { require.Equal(t, len(db.getCachedShippedBlocks()), 0) shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000") - require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{ + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, Uploaded: []ulid.ULID{shippedBlock}, })) @@ -2858,7 +2858,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP // Mock the shipper meta (no blocks). db := i.getTSDB(userID) - require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{ + require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, })) @@ -3788,7 +3788,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { `, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds")) // Saying that we have shipped the second block, so only that should get deleted. - require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{ + require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID}, })) @@ -3816,7 +3816,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { `, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds")) // Shipping 2 more blocks, hence all the blocks from first round. - require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{ + require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{ Version: shipper.MetaVersion1, Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID}, }))