Skip to content

Commit

Permalink
issue-2489: limit async get duration (#2490)
Browse files Browse the repository at this point in the history
* issue-2489: limit async get duration

* update

* add ydb patch

* update

* update

* better part2 ut
  • Loading branch information
yegorskii authored Nov 21, 2024
1 parent b3f0c11 commit b8e2587
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 7 deletions.
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1053,4 +1053,7 @@ message TStorageServiceConfig
// agents devices and will not suspend local devices. Instead, PURGE_HOST
// should be used for these purposes.
optional bool DiskRegistryAlwaysAllocatesLocalDisks = 388;

optional uint32 BlobStorageAsyncGetTimeoutHDD = 389;
optional uint32 BlobStorageAsyncGetTimeoutSSD = 390;
}
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ TDuration MSeconds(ui32 value)
xxx(IdleAgentDeployByCmsDelay, TDuration, Hours(1) )\
xxx(AllowLiteDiskReallocations, bool, false )\
xxx(DiskRegistryDisksNotificationTimeout, TDuration, Seconds(5) )\

xxx(BlobStorageAsyncGetTimeoutHDD, TDuration, Seconds(0) )\
xxx(BlobStorageAsyncGetTimeoutSSD, TDuration, Seconds(0) )\

// BLOCKSTORE_STORAGE_CONFIG_RW

Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ class TStorageConfig
TString GetNodeType() const;

NCloud::NProto::TConfigDispatcherSettings GetConfigDispatcherSettings() const;

TDuration GetBlobStorageAsyncGetTimeoutHDD() const;
TDuration GetBlobStorageAsyncGetTimeoutSSD() const;
};

ui64 GetAllocationUnit(
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/storage/core/disk_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ struct TSimpleDiskCounters
EPublishingPolicy::Repl,
TSimpleCounter::ECounterType::Generic,
ECounterExpirationPolicy::Permanent};
TCounter ReadBlobDeadlineCount{
EPublishingPolicy::Repl,
TSimpleCounter::ECounterType::Generic,
ECounterExpirationPolicy::Expiring};

// DiskRegistry based
TCounter HasBrokenDevice{
Expand Down Expand Up @@ -251,6 +255,7 @@ struct TSimpleDiskCounters
MakeMeta<&TSimpleDiskCounters::CompactionRangeCountPerRun>(),
MakeMeta<&TSimpleDiskCounters::UnconfirmedBlobCount>(),
MakeMeta<&TSimpleDiskCounters::ConfirmedBlobCount>(),
MakeMeta<&TSimpleDiskCounters::ReadBlobDeadlineCount>(),

MakeMeta<&TSimpleDiskCounters::HasBrokenDevice>(),
MakeMeta<&TSimpleDiskCounters::HasBrokenDeviceSilent>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class TCompactionActor final
const ui32 MaxBlocksInBlob;
const ui32 MaxAffectedBlocksPerCompaction;
const IBlockDigestGeneratorPtr BlockDigestGenerator;
const TDuration ReadBlobTimeout;

const ui64 CommitId;

Expand Down Expand Up @@ -179,6 +180,7 @@ class TCompactionActor final
ui32 maxBlocksInBlob,
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests);
Expand Down Expand Up @@ -239,6 +241,7 @@ TCompactionActor::TCompactionActor(
ui32 maxBlocksInBlob,
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests)
Expand All @@ -249,6 +252,7 @@ TCompactionActor::TCompactionActor(
, MaxBlocksInBlob(maxBlocksInBlob)
, MaxAffectedBlocksPerCompaction(maxAffectedBlocksPerCompaction)
, BlockDigestGenerator(std::move(blockDigestGenerator))
, ReadBlobTimeout(readBlobTimeout)
, CommitId(commitId)
, RangeCompactionInfos(std::move(rangeCompactionInfos))
, Requests(std::move(requests))
Expand Down Expand Up @@ -387,6 +391,10 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
current.GroupId);
}

const auto readBlobDeadline = ReadBlobTimeout ?
ctx.Now() + ReadBlobTimeout :
TInstant::Max();

for (ui32 batchIndex = 0; batchIndex < BatchRequests.size(); ++batchIndex) {
auto& batch = BatchRequests[batchIndex];
if (batch.UnchangedBlobOffsets) {
Expand Down Expand Up @@ -423,7 +431,7 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
std::move(subSgList),
batch.GroupId,
true, // async
TInstant::Max(), // deadline
readBlobDeadline, // deadline
shouldCalculateChecksums
);

Expand Down Expand Up @@ -1949,6 +1957,11 @@ void TPartitionActor::CompleteCompaction(
}
}

auto readBlobTimeout =
PartitionConfig.GetStorageMediaKind() == NProto::STORAGE_MEDIA_SSD ?
Config->GetBlobStorageAsyncGetTimeoutSSD() :
Config->GetBlobStorageAsyncGetTimeoutHDD();

auto actor = NCloud::Register<TCompactionActor>(
ctx,
args.RequestInfo,
Expand All @@ -1958,6 +1971,7 @@ void TPartitionActor::CompleteCompaction(
State->GetMaxBlocksInBlob(),
Config->GetMaxAffectedBlocksPerCompaction(),
BlockDigestGenerator,
readBlobTimeout,
args.CommitId,
std::move(rangeCompactionInfos),
std::move(requests));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ void TPartitionActor::HandleReadBlobCompleted(
return;
}

if (msg->DeadlineSeen) {
PartCounters->Simple.ReadBlobDeadlineCount.Increment(1);
}

if (State->IncrementReadBlobErrorCount()
>= Config->GetMaxReadBlobErrorsBeforeSuicide())
{
Expand Down
52 changes: 52 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <cloud/storage/core/libs/tablet/blob_id.h>

#include <contrib/ydb/core/base/blobstorage.h>
#include <contrib/ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <contrib/ydb/core/testlib/basics/storage.h>

#include <library/cpp/testing/unittest/registar.h>
Expand Down Expand Up @@ -11162,6 +11163,57 @@ Y_UNIT_TEST_SUITE(TPartitionTest)

UNIT_ASSERT_VALUES_EQUAL(0, compactionByBlockCount);
}

Y_UNIT_TEST(ShouldAbortCompactionIfReadBlobFailsWithDeadlineExceeded)
{
NProto::TStorageServiceConfig config;
config.SetBlobStorageAsyncGetTimeoutHDD(TDuration::Seconds(1).MilliSeconds());
auto runtime = PrepareTestActorRuntime(config);

TPartitionClient partition(*runtime);
partition.WaitReady();

partition.WriteBlocks(3, 33);
partition.Flush();

ui32 failedReadBlob = 0;
runtime->SetEventFilter([&]
(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev)
{
Y_UNUSED(runtime);

if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) {
const auto* msg = ev->Get<TEvBlobStorage::TEvVGet>();
if (msg->Record.GetHandleClass() == NKikimrBlobStorage::AsyncRead &&
msg->Record.GetMsgQoS().HasDeadlineSeconds())
{
return true;
}
} else if (ev->GetTypeRewrite() == TEvStatsService::EvVolumePartCounters) {
auto* msg =
ev->Get<TEvStatsService::TEvVolumePartCounters>();
failedReadBlob =
msg->DiskCounters->Simple.ReadBlobDeadlineCount.Value;
}
return false;
});

partition.SendCompactionRequest();
runtime->AdvanceCurrentTime(TDuration::Seconds(1));
auto response = partition.RecvCompactionResponse();
UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetError().GetCode());

runtime->AdvanceCurrentTime(TDuration::Seconds(15));

partition.SendToPipe(
std::make_unique<TEvPartitionPrivate::TEvUpdateCounters>());
{
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvStatsService::EvVolumePartCounters);
runtime->DispatchEvents(options);
}
UNIT_ASSERT_VALUES_EQUAL(1, failedReadBlob);
}
}

} // namespace NCloud::NBlockStore::NStorage::NPartition
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/partition/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ PEERDIR(
library/cpp/monlib/service/pages

contrib/ydb/core/base
contrib/ydb/core/blobstorage
contrib/ydb/core/node_whiteboard
contrib/ydb/core/scheme
contrib/ydb/core/tablet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TCompactionActor final
const TActorId Tablet;
const ui64 CommitId;
const TBlockRange32 BlockRange;
const TDuration ReadBlobTimeout;
TGarbageInfo GarbageInfo;
TAffectedBlobInfos AffectedBlobInfos;
ui32 BlobsSkipped;
Expand Down Expand Up @@ -107,6 +108,7 @@ class TCompactionActor final
const TActorId& tablet,
ui64 commitId,
const TBlockRange32& blockRange,
TDuration readBlobTimeout,
TGarbageInfo garbageInfo,
TAffectedBlobInfos affectedBlobInfos,
ui32 blobsSkipped,
Expand Down Expand Up @@ -162,6 +164,7 @@ TCompactionActor::TCompactionActor(
const TActorId& tablet,
ui64 commitId,
const TBlockRange32& blockRange,
TDuration readBlobTimeout,
TGarbageInfo garbageInfo,
TAffectedBlobInfos affectedBlobInfos,
ui32 blobsSkipped,
Expand All @@ -175,6 +178,7 @@ TCompactionActor::TCompactionActor(
, Tablet(tablet)
, CommitId(commitId)
, BlockRange(blockRange)
, ReadBlobTimeout(readBlobTimeout)
, GarbageInfo(std::move(garbageInfo))
, AffectedBlobInfos(std::move(affectedBlobInfos))
, BlobsSkipped(blobsSkipped)
Expand Down Expand Up @@ -269,6 +273,10 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
{
bool readBlobSent = false;

const auto readBlobDeadline = ReadBlobTimeout ?
ctx.Now() + ReadBlobTimeout :
TInstant::Max();

ui32 requestIndex = 0;
for (auto& req: ReadRequests) {
if (req.DataBlobOffsets) {
Expand All @@ -285,7 +293,7 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
req.BlobContent.GetGuardedSgList(),
req.GroupId,
true, // async
TInstant::Max() // deadline
readBlobDeadline // deadline
);

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand Down Expand Up @@ -1164,6 +1172,11 @@ void TPartitionActor::CompleteCompaction(
blobIndex++);
}

auto readBlobTimeout =
PartitionConfig.GetStorageMediaKind() == NProto::STORAGE_MEDIA_SSD ?
Config->GetBlobStorageAsyncGetTimeoutSSD() :
Config->GetBlobStorageAsyncGetTimeoutHDD();

auto actor = NCloud::Register<TCompactionActor>(
ctx,
args.RequestInfo,
Expand All @@ -1173,6 +1186,7 @@ void TPartitionActor::CompleteCompaction(
SelfId(),
args.CommitId,
args.BlockRange,
readBlobTimeout,
std::move(args.GarbageInfo),
std::move(args.AffectedBlobInfos),
args.BlobsSkipped,
Expand Down
14 changes: 14 additions & 0 deletions cloud/blockstore/libs/storage/partition2/part2_actor_readblob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class TReadBlobActor final
TInstant RequestSent;
TInstant ResponseReceived;

bool DeadlineSeen = false;

public:
TReadBlobActor(
TRequestInfoPtr requestInfo,
Expand Down Expand Up @@ -159,6 +161,10 @@ void TReadBlobActor::NotifyCompleted(
request->RequestTime = ResponseReceived - RequestSent;
request->GroupId = Request->GroupId;

if (DeadlineSeen) {
request->DeadlineSeen = true;
}

NCloud::Send(ctx, Tablet, std::move(request));
}

Expand Down Expand Up @@ -191,6 +197,10 @@ void TReadBlobActor::ReplyError(
description.data(),
response.Print(false).data());

if (response.Status == NKikimrProto::DEADLINE) {
DeadlineSeen = true;
}

auto error = MakeError(E_REJECTED, "TEvBlobStorage::TEvGet failed: " + description);
ReplyAndDie(ctx, std::make_unique<TResponse>(error));
}
Expand Down Expand Up @@ -411,6 +421,10 @@ void TPartitionActor::HandleReadBlobCompleted(
return;
}

if (msg->DeadlineSeen) {
PartCounters->Simple.ReadBlobDeadlineCount.Increment(1);
}

if (State->IncrementReadBlobErrorCount()
>= Config->GetMaxReadBlobErrorsBeforeSuicide())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ struct TEvPartitionPrivate
ui32 BytesCount = 0;
TDuration RequestTime;
ui32 GroupId = 0;
bool DeadlineSeen = false;

TReadBlobCompleted() = default;

Expand Down
Loading

0 comments on commit b8e2587

Please sign in to comment.