Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-2489: limit async get duration #2490

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1053,4 +1053,7 @@ message TStorageServiceConfig
// agents devices and will not suspend local devices. Instead, PURGE_HOST
// should be used for these purposes.
optional bool DiskRegistryAlwaysAllocatesLocalDisks = 388;

optional uint32 BlobStorageAsyncGetTimeoutHDD = 389;
optional uint32 BlobStorageAsyncGetTimeoutSSD = 390;
}
3 changes: 2 additions & 1 deletion cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ TDuration MSeconds(ui32 value)
xxx(IdleAgentDeployByCmsDelay, TDuration, Hours(1) )\
xxx(AllowLiteDiskReallocations, bool, false )\
xxx(DiskRegistryDisksNotificationTimeout, TDuration, Seconds(5) )\

xxx(BlobStorageAsyncGetTimeoutHDD, TDuration, Seconds(0) )\
xxx(BlobStorageAsyncGetTimeoutSSD, TDuration, Seconds(0) )\

// BLOCKSTORE_STORAGE_CONFIG_RW

Expand Down
3 changes: 3 additions & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ class TStorageConfig
TString GetNodeType() const;

NCloud::NProto::TConfigDispatcherSettings GetConfigDispatcherSettings() const;

TDuration GetBlobStorageAsyncGetTimeoutHDD() const;
TDuration GetBlobStorageAsyncGetTimeoutSSD() const;
};

ui64 GetAllocationUnit(
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/libs/storage/core/disk_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ struct TSimpleDiskCounters
EPublishingPolicy::Repl,
TSimpleCounter::ECounterType::Generic,
ECounterExpirationPolicy::Permanent};
TCounter ReadBlobDeadlineCount{
EPublishingPolicy::Repl,
TSimpleCounter::ECounterType::Generic,
ECounterExpirationPolicy::Expiring};

// DiskRegistry based
TCounter HasBrokenDevice{
Expand Down Expand Up @@ -251,6 +255,7 @@ struct TSimpleDiskCounters
MakeMeta<&TSimpleDiskCounters::CompactionRangeCountPerRun>(),
MakeMeta<&TSimpleDiskCounters::UnconfirmedBlobCount>(),
MakeMeta<&TSimpleDiskCounters::ConfirmedBlobCount>(),
MakeMeta<&TSimpleDiskCounters::ReadBlobDeadlineCount>(),

MakeMeta<&TSimpleDiskCounters::HasBrokenDevice>(),
MakeMeta<&TSimpleDiskCounters::HasBrokenDeviceSilent>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class TCompactionActor final
const ui32 MaxBlocksInBlob;
const ui32 MaxAffectedBlocksPerCompaction;
const IBlockDigestGeneratorPtr BlockDigestGenerator;
const TDuration ReadBlobTimeout;

const ui64 CommitId;

Expand Down Expand Up @@ -179,6 +180,7 @@ class TCompactionActor final
ui32 maxBlocksInBlob,
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests);
Expand Down Expand Up @@ -239,6 +241,7 @@ TCompactionActor::TCompactionActor(
ui32 maxBlocksInBlob,
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests)
Expand All @@ -249,6 +252,7 @@ TCompactionActor::TCompactionActor(
, MaxBlocksInBlob(maxBlocksInBlob)
, MaxAffectedBlocksPerCompaction(maxAffectedBlocksPerCompaction)
, BlockDigestGenerator(std::move(blockDigestGenerator))
, ReadBlobTimeout(readBlobTimeout)
, CommitId(commitId)
, RangeCompactionInfos(std::move(rangeCompactionInfos))
, Requests(std::move(requests))
Expand Down Expand Up @@ -387,6 +391,10 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
current.GroupId);
}

const auto readBlobDeadline = ReadBlobTimeout ?
ctx.Now() + ReadBlobTimeout :
TInstant::Max();

for (ui32 batchIndex = 0; batchIndex < BatchRequests.size(); ++batchIndex) {
auto& batch = BatchRequests[batchIndex];
if (batch.UnchangedBlobOffsets) {
Expand Down Expand Up @@ -423,7 +431,7 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
std::move(subSgList),
batch.GroupId,
true, // async
TInstant::Max(), // deadline
readBlobDeadline, // deadline
shouldCalculateChecksums
);

Expand Down Expand Up @@ -1949,6 +1957,11 @@ void TPartitionActor::CompleteCompaction(
}
}

auto readBlobTimeout =
PartitionConfig.GetStorageMediaKind() == NProto::STORAGE_MEDIA_SSD ?
Config->GetBlobStorageAsyncGetTimeoutSSD() :
Config->GetBlobStorageAsyncGetTimeoutHDD();

auto actor = NCloud::Register<TCompactionActor>(
ctx,
args.RequestInfo,
Expand All @@ -1958,6 +1971,7 @@ void TPartitionActor::CompleteCompaction(
State->GetMaxBlocksInBlob(),
Config->GetMaxAffectedBlocksPerCompaction(),
BlockDigestGenerator,
readBlobTimeout,
args.CommitId,
std::move(rangeCompactionInfos),
std::move(requests));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ void TPartitionActor::HandleReadBlobCompleted(
return;
}

if (msg->DeadlineSeen) {
PartCounters->Simple.ReadBlobDeadlineCount.Increment(1);
}

if (State->IncrementReadBlobErrorCount()
>= Config->GetMaxReadBlobErrorsBeforeSuicide())
{
Expand Down
52 changes: 52 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <cloud/storage/core/libs/tablet/blob_id.h>

#include <contrib/ydb/core/base/blobstorage.h>
#include <contrib/ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <contrib/ydb/core/testlib/basics/storage.h>

#include <library/cpp/testing/unittest/registar.h>
Expand Down Expand Up @@ -11162,6 +11163,57 @@ Y_UNIT_TEST_SUITE(TPartitionTest)

UNIT_ASSERT_VALUES_EQUAL(0, compactionByBlockCount);
}

Y_UNIT_TEST(ShouldAbortCompactionIfReadBlobFailsWithDeadlineExceeded)
{
NProto::TStorageServiceConfig config;
config.SetBlobStorageAsyncGetTimeoutHDD(TDuration::Seconds(1).MilliSeconds());
auto runtime = PrepareTestActorRuntime(config);

TPartitionClient partition(*runtime);
partition.WaitReady();

partition.WriteBlocks(3, 33);
partition.Flush();

ui32 failedReadBlob = 0;
runtime->SetEventFilter([&]
(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev)
{
Y_UNUSED(runtime);

if (ev->GetTypeRewrite() == TEvBlobStorage::EvVGet) {
const auto* msg = ev->Get<TEvBlobStorage::TEvVGet>();
if (msg->Record.GetHandleClass() == NKikimrBlobStorage::AsyncRead &&
msg->Record.GetMsgQoS().HasDeadlineSeconds())
{
return true;
}
} else if (ev->GetTypeRewrite() == TEvStatsService::EvVolumePartCounters) {
auto* msg =
ev->Get<TEvStatsService::TEvVolumePartCounters>();
failedReadBlob =
msg->DiskCounters->Simple.ReadBlobDeadlineCount.Value;
}
return false;
});

partition.SendCompactionRequest();
runtime->AdvanceCurrentTime(TDuration::Seconds(1));
auto response = partition.RecvCompactionResponse();
UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetError().GetCode());

runtime->AdvanceCurrentTime(TDuration::Seconds(15));

partition.SendToPipe(
std::make_unique<TEvPartitionPrivate::TEvUpdateCounters>());
{
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvStatsService::EvVolumePartCounters);
runtime->DispatchEvents(options);
}
UNIT_ASSERT_VALUES_EQUAL(1, failedReadBlob);
}
}

} // namespace NCloud::NBlockStore::NStorage::NPartition
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/partition/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ PEERDIR(
library/cpp/monlib/service/pages

contrib/ydb/core/base
contrib/ydb/core/blobstorage
contrib/ydb/core/node_whiteboard
contrib/ydb/core/scheme
contrib/ydb/core/tablet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TCompactionActor final
const TActorId Tablet;
const ui64 CommitId;
const TBlockRange32 BlockRange;
const TDuration ReadBlobTimeout;
TGarbageInfo GarbageInfo;
TAffectedBlobInfos AffectedBlobInfos;
ui32 BlobsSkipped;
Expand Down Expand Up @@ -107,6 +108,7 @@ class TCompactionActor final
const TActorId& tablet,
ui64 commitId,
const TBlockRange32& blockRange,
TDuration readBlobTimeout,
TGarbageInfo garbageInfo,
TAffectedBlobInfos affectedBlobInfos,
ui32 blobsSkipped,
Expand Down Expand Up @@ -162,6 +164,7 @@ TCompactionActor::TCompactionActor(
const TActorId& tablet,
ui64 commitId,
const TBlockRange32& blockRange,
TDuration readBlobTimeout,
TGarbageInfo garbageInfo,
TAffectedBlobInfos affectedBlobInfos,
ui32 blobsSkipped,
Expand All @@ -175,6 +178,7 @@ TCompactionActor::TCompactionActor(
, Tablet(tablet)
, CommitId(commitId)
, BlockRange(blockRange)
, ReadBlobTimeout(readBlobTimeout)
, GarbageInfo(std::move(garbageInfo))
, AffectedBlobInfos(std::move(affectedBlobInfos))
, BlobsSkipped(blobsSkipped)
Expand Down Expand Up @@ -269,6 +273,10 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
{
bool readBlobSent = false;

const auto readBlobDeadline = ReadBlobTimeout ?
ctx.Now() + ReadBlobTimeout :
TInstant::Max();

ui32 requestIndex = 0;
for (auto& req: ReadRequests) {
if (req.DataBlobOffsets) {
Expand All @@ -285,7 +293,7 @@ void TCompactionActor::ReadBlocks(const TActorContext& ctx)
req.BlobContent.GetGuardedSgList(),
req.GroupId,
true, // async
TInstant::Max() // deadline
readBlobDeadline // deadline
);

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand Down Expand Up @@ -1164,6 +1172,11 @@ void TPartitionActor::CompleteCompaction(
blobIndex++);
}

auto readBlobTimeout =
PartitionConfig.GetStorageMediaKind() == NProto::STORAGE_MEDIA_SSD ?
Config->GetBlobStorageAsyncGetTimeoutSSD() :
Config->GetBlobStorageAsyncGetTimeoutHDD();

auto actor = NCloud::Register<TCompactionActor>(
ctx,
args.RequestInfo,
Expand All @@ -1173,6 +1186,7 @@ void TPartitionActor::CompleteCompaction(
SelfId(),
args.CommitId,
args.BlockRange,
readBlobTimeout,
std::move(args.GarbageInfo),
std::move(args.AffectedBlobInfos),
args.BlobsSkipped,
Expand Down
14 changes: 14 additions & 0 deletions cloud/blockstore/libs/storage/partition2/part2_actor_readblob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class TReadBlobActor final
TInstant RequestSent;
TInstant ResponseReceived;

bool DeadlineSeen = false;

public:
TReadBlobActor(
TRequestInfoPtr requestInfo,
Expand Down Expand Up @@ -159,6 +161,10 @@ void TReadBlobActor::NotifyCompleted(
request->RequestTime = ResponseReceived - RequestSent;
request->GroupId = Request->GroupId;

if (DeadlineSeen) {
request->DeadlineSeen = true;
}

NCloud::Send(ctx, Tablet, std::move(request));
}

Expand Down Expand Up @@ -191,6 +197,10 @@ void TReadBlobActor::ReplyError(
description.data(),
response.Print(false).data());

if (response.Status == NKikimrProto::DEADLINE) {
DeadlineSeen = true;
}

auto error = MakeError(E_REJECTED, "TEvBlobStorage::TEvGet failed: " + description);
ReplyAndDie(ctx, std::make_unique<TResponse>(error));
}
Expand Down Expand Up @@ -411,6 +421,10 @@ void TPartitionActor::HandleReadBlobCompleted(
return;
}

if (msg->DeadlineSeen) {
PartCounters->Simple.ReadBlobDeadlineCount.Increment(1);
}

if (State->IncrementReadBlobErrorCount()
>= Config->GetMaxReadBlobErrorsBeforeSuicide())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ struct TEvPartitionPrivate
ui32 BytesCount = 0;
TDuration RequestTime;
ui32 GroupId = 0;
bool DeadlineSeen = false;

TReadBlobCompleted() = default;

Expand Down
Loading
Loading