Skip to content

Commit

Permalink
fix ingester testcase
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Nov 20, 2023
1 parent 5d677f5 commit 3c231a1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
8 changes: 5 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ type userTSDB struct {
lastUpdate atomic.Int64

// Thanos shipper used to ship blocks to the storage.
shipper Shipper
shipper Shipper
shipperMetadataFilePath string

// When deletion marker is found for the tenant (checked before shipping),
// shipping stops and TSDB is closed before reaching idle timeout time (if enabled).
Expand Down Expand Up @@ -435,7 +436,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {

// updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks.
func (u *userTSDB) updateCachedShippedBlocks() error {
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
shipperMeta, err := shipper.ReadMetaFile(u.shipperMetadataFilePath)
if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) {
// If the meta file doesn't exist it means the shipper hasn't run yet.
shipperMeta = &shipper.Meta{}
Expand Down Expand Up @@ -606,7 +607,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
}
}

// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.
// New returns a new Ingester that uses Cortex block storage instead of chunks storage.
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
defaultInstanceLimits = &cfg.DefaultLimits
if cfg.ingesterClientFactory == nil {
Expand Down Expand Up @@ -2052,6 +2053,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
metadata.NoneFunc,
"",
)
userDB.shipperMetadataFilePath = filepath.Join(userDB.db.Dir(), filepath.Clean(shipper.DefaultMetaFilename))

// Initialise the shipper blocks cache.
if err := userDB.updateCachedShippedBlocks(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) {
require.Equal(t, len(db.getCachedShippedBlocks()), 0)
shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000")

require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{shippedBlock},
}))
Expand Down Expand Up @@ -2858,7 +2858,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP

// Mock the shipper meta (no blocks).
db := i.getTSDB(userID)
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
}))

Expand Down Expand Up @@ -3788,7 +3788,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Saying that we have shipped the second block, so only that should get deleted.
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
}))
Expand Down Expand Up @@ -3816,7 +3816,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Shipping 2 more blocks, hence all the blocks from first round.
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
}))
Expand Down

0 comments on commit 3c231a1

Please sign in to comment.