Skip to content

Commit

Permalink
add ydb patch
Browse files Browse the repository at this point in the history
  • Loading branch information
yegorskii committed Nov 15, 2024
1 parent 5ad9e01 commit 6d26041
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 88 deletions.
1 change: 1 addition & 0 deletions contrib/ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ struct TEvBlobStorage {
EvRequestProxySessionsState,
EvProxySessionsState,
EvBunchOfEvents,
EvDeadline,

// blobstorage controller interface
EvControllerRegisterNode = 0x10031602,
Expand Down
6 changes: 6 additions & 0 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ class TBlobStorageGroupRequestActor : public TActor<TDerived> {
Derived().ReplyAndDie(NKikimrProto::ERROR);
return true;
}

case TEvBlobStorage::EvDeadline: {
ErrorReason = "Deadline timer hit";
Derived().ReplyAndDie(NKikimrProto::DEADLINE);
return true;
}
}

return false;
Expand Down
7 changes: 6 additions & 1 deletion contrib/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
EvConfigureQueryTimeout,
EvEstablishingSessionTimeout,
Ev5min,
EvCheckDeadlines,
};

struct TEvUpdateResponsiveness : TEventLocal<TEvUpdateResponsiveness, EvUpdateResponsiveness> {};
Expand Down Expand Up @@ -61,7 +62,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
TIntrusivePtr<TStoragePoolCounters> StoragePoolCounters;
TIntrusivePtr<TGroupSessions> Sessions;
TDeque<std::unique_ptr<IEventHandle>> InitQueue;
THashSet<TActorId, TActorId::THash> ActiveRequests;
std::multimap<TInstant, TActorId> DeadlineMap;
THashMap<TActorId, std::multimap<TInstant, TActorId>::iterator, TActorId::THash> ActiveRequests;
ui64 UnconfiguredBufferSize = 0;
const bool IsEjected;
bool ForceWaitAllDrives;
Expand Down Expand Up @@ -250,6 +252,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
void Handle(TEvStopBatchingGetRequests::TPtr& ev);

// todo: in-fly tracking for cancelation and
void PushRequest(IActor *actor, TInstant deadline);
void CheckDeadlines();
void HandleNormal(TEvBlobStorage::TEvGet::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvPut::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev);
Expand Down Expand Up @@ -356,6 +360,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
IgnoreFunc(TEvConfigureQueryTimeout);
IgnoreFunc(TEvEstablishingSessionTimeout);
fFunc(Ev5min, Handle5min);
cFunc(EvCheckDeadlines, CheckDeadlines);
)

#define HANDLE_EVENTS(HANDLER) \
Expand Down
23 changes: 10 additions & 13 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt

bool IsManyPuts = false;

TInstant Deadline;
ui64 RequestsSent = 0;
ui64 ResponsesReceived = 0;
ui64 MaxSaneRequests = 0;
Expand Down Expand Up @@ -473,7 +472,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
ev->Bunch.emplace_back(new IEventHandle(
TActorId() /*recipient*/,
item.Recipient,
put = new TEvBlobStorage::TEvPut(item.BlobId, TRcBuf(item.Buffer), Deadline, HandleClass, Tactic),
put = new TEvBlobStorage::TEvPut(item.BlobId, TRcBuf(item.Buffer), item.Deadline, HandleClass, Tactic),
0 /*flags*/,
item.Cookie,
nullptr /*forwardOnNondelivery*/,
Expand Down Expand Up @@ -511,7 +510,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
ev->RestartCounter, "DSProxy.Put", nullptr)
, PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy, source, cookie, Span.GetTraceId())
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, Deadline(ev->Deadline)
, HandleClass(ev->HandleClass)
, ReportedBytes(0)
, TimeStatsEnabled(timeStatsEnabled)
Expand Down Expand Up @@ -555,7 +553,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
, PutImpl(info, state, events, mon, handleClass, tactic, enableRequestMod3x3ForMinLatecy)
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, IsManyPuts(true)
, Deadline(TInstant::Zero())
, HandleClass(handleClass)
, ReportedBytes(0)
, TimeStatsEnabled(timeStatsEnabled)
Expand All @@ -570,7 +567,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
Y_DEBUG_ABORT_UNLESS(events.size() <= MaxBatchedPutRequests);
for (auto &ev : events) {
auto& msg = *ev->Get();
Deadline = Max(Deadline, msg.Deadline);
if (msg.Orbit.HasShuttles()) {
RootCauseTrack.IsOn = true;
}
Expand Down Expand Up @@ -599,7 +595,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
<< " BlobIDs# " << BlobIdSequenceToString()
<< " HandleClass# " << NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)
<< " Tactic# " << TEvBlobStorage::TEvPut::TacticName(Tactic)
<< " Deadline# " << Deadline
<< " RestartCounter# " << RestartCounter);

StartTime = TActivationContext::Monotonic();
Expand Down Expand Up @@ -652,19 +647,21 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
return true;
}

void Handle(TKikimrEvents::TEvWakeup::TPtr &ev) {
Y_UNUSED(ev);
void HandleWakeup() {
A_LOG_WARN_S("BPP14", "Wakeup "
<< " ActorId# " << SelfId()
<< " Group# " << Info->GroupID
<< " BlobIDs# " << BlobIdSequenceToString()
<< " Not answered in "
<< (TActivationContext::Monotonic() - StartTime) << " seconds");
if (TInstant::Now() > Deadline) {
ErrorReason = "Deadline exceeded";
ReplyAndDie(NKikimrProto::DEADLINE);
return;
const TInstant now = TActivationContext::Now();
TPutImpl::TPutResultVec putResults;
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
}
}
ReplyAndDieWithLastResponse(putResults);
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
}

Expand Down Expand Up @@ -745,7 +742,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle);
hFunc(TEvAccelerate, Handle);
cFunc(TEvBlobStorage::EvResume, ResumeBootstrap);
hFunc(TKikimrEvents::TEvWakeup, Handle);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);

default:
Y_DEBUG_ABORT_UNLESS(false, "unexpected event Type# 0x%08" PRIx32, type);
Expand Down
4 changes: 1 addition & 3 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) {

TString TPutImpl::DumpFullState() const {
TStringStream str;
str << "{Deadline# " << Deadline;
str << Endl;
str << " Info# " << Info->ToString();
str << "{Info# " << Info->ToString();
str << Endl;
str << " Blackboard# " << Blackboard.ToString();
str << Endl;
Expand Down
26 changes: 15 additions & 11 deletions contrib/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class TPutImpl {
TBlobStorageGroupInfo::TServiceIds VDisksSvc;
TBlobStorageGroupInfo::TVDiskIds VDisksId;

TInstant Deadline;
const TIntrusivePtr<TBlobStorageGroupInfo> Info;

TBlackboard Blackboard;
Expand Down Expand Up @@ -56,10 +55,11 @@ class TPutImpl {
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
NWilson::TSpan Span;
std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
TInstant Deadline;

TBlobInfo(TLogoBlobID id, TRope&& buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId,
NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single,
std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay)
std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay, TInstant deadline)
: BlobId(id)
, Buffer(std::move(buffer))
, BufferSize(Buffer.size())
Expand All @@ -69,6 +69,7 @@ class TPutImpl {
, ExtraBlockChecks(std::move(extraBlockChecks))
, Span(single ? NWilson::TSpan() : NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Put.Blob"))
, ExecutionRelay(std::move(executionRelay))
, Deadline(deadline)
{}

void Output(IOutputStream& s) const {
Expand Down Expand Up @@ -103,8 +104,7 @@ class TPutImpl {
TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state,
TEvBlobStorage::TEvPut *ev, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon,
bool enableRequestMod3x3ForMinLatecy, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId)
: Deadline(ev->Deadline)
, Info(info)
: Info(info)
, Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead)
, IsDone(1)
, WrittenBeyondBarrier(1)
Expand All @@ -116,7 +116,7 @@ class TPutImpl {
{
BlobMap.emplace(ev->Id, Blobs.size());
Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit),
std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay));
std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay), ev->Deadline);

auto& blob = Blobs.back();
LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
Expand All @@ -127,8 +127,7 @@ class TPutImpl {
TBatchedVec<TEvBlobStorage::TEvPut::TPtr> &events, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon,
NKikimrBlobStorage::EPutHandleClass putHandleClass, TEvBlobStorage::TEvPut::ETactic tactic,
bool enableRequestMod3x3ForMinLatecy)
: Deadline(TInstant::Zero())
, Info(info)
: Info(info)
, Blackboard(info, state, putHandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead)
, IsDone(events.size())
, WrittenBeyondBarrier(events.size())
Expand All @@ -146,8 +145,8 @@ class TPutImpl {
Y_ABORT_UNLESS(msg.Tactic == tactic);
BlobMap.emplace(msg.Id, Blobs.size());
Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId),
std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay));
Deadline = Max(Deadline, msg.Deadline);
std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay),
msg.Deadline);

auto& blob = Blobs.back();
LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
Expand Down Expand Up @@ -222,7 +221,7 @@ class TPutImpl {
if (std::next(it) == end) { // TEvVPut
auto [orderNumber, ptr] = *it++;
auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(ptr->Id, ptr->Buffer, Info->GetVDiskId(orderNumber),
false, nullptr, Deadline, Blackboard.PutHandleClass);
false, nullptr, Blobs[ptr->BlobIdx].Deadline, Blackboard.PutHandleClass);

auto& record = ev->Record;
for (const auto& [tabletId, generation] : Blobs[ptr->BlobIdx].ExtraBlockChecks) {
Expand All @@ -235,7 +234,12 @@ class TPutImpl {
HandoffPartsSent += ptr->IsHandoff;
++VPutRequests;
} else { // TEvVMultiPut
auto ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(Info->GetVDiskId(it->first), Deadline,
TInstant deadline;
for (auto temp = it; temp != end; ++temp) {
auto [orderNumber, ptr] = *temp;
deadline = Max(deadline, Blobs[ptr->BlobIdx].Deadline);
}
auto ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(Info->GetVDiskId(it->first), deadline,
Blackboard.PutHandleClass, false);
while (it != end) {
auto [orderNumber, ptr] = *it++;
Expand Down
Loading

0 comments on commit 6d26041

Please sign in to comment.