Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix stale mem flush not reduce load memory usage (backport #52613) (backport #52713) #52730

Merged
merged 3 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/storage/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int AsyncDeltaWriter::_execute(void* meta, bthread::TaskIterator<AsyncDeltaWrite
<< " tablet_id: " << writer->tablet()->tablet_id() << ": " << st;
}
if (flush_after_write) {
auto st = writer->flush_memtable_async(false);
auto st = writer->manual_flush();
LOG_IF(WARNING, !st.ok()) << "Fail to flush. txn_id: " << writer->txn_id()
<< " tablet_id: " << writer->tablet()->tablet_id() << ": " << st;
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ Status DeltaWriter::close() {
return Status::OK();
}

Status DeltaWriter::manual_flush() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
return flush_memtable_async(false);
}

Status DeltaWriter::flush_memtable_async(bool eos) {
_last_write_ts = 0;
_write_buffer_size = 0;
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class DeltaWriter {

[[nodiscard]] Status flush_memtable_async(bool eos = false);

// Manual flush used by stale memtable flush
Status manual_flush();

// Rollback all writes and delete the Rowset created by 'commit()', if any.
// [thread-safe]
//
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ inline int AsyncDeltaWriterImpl::execute(void* meta, bthread::TaskIterator<Async
iter->cb(st);
}
if (flush_after_write) {
st = delta_writer->flush_async();
st = delta_writer->manual_flush();
LOG_IF(ERROR, !st.ok()) << "Fail to flush. tablet_id: " << delta_writer->tablet_id()
<< " txn_id: " << delta_writer->txn_id() << ": " << st;
}
Expand Down
12 changes: 12 additions & 0 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class DeltaWriterImpl {

[[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; }

Status manual_flush();

Status flush();

Status flush_async();
Expand Down Expand Up @@ -299,6 +301,11 @@ inline Status DeltaWriterImpl::init_tablet_schema() {
}
}

inline Status DeltaWriterImpl::manual_flush() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
return flush_async();
}

inline Status DeltaWriterImpl::flush() {
RETURN_IF_ERROR(flush_async());
return _flush_token->wait();
Expand Down Expand Up @@ -657,6 +664,11 @@ MemTracker* DeltaWriter::mem_tracker() {
return _impl->mem_tracker();
}

Status DeltaWriter::manual_flush() {
DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::manual_flush() in a bthread";
return _impl->manual_flush();
}

Status DeltaWriter::flush() {
DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::flush() in a bthread";
return _impl->flush();
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class DeltaWriter {
// NOTE: Do NOT invoke this method in a bthread.
Status finish(FinishMode mode = kWriteTxnLog);

// Manual flush used by stale memtable flush
// different from `flush()`, this method will reduce memory usage in `mem_tracker`
Status manual_flush();

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
Status flush();
Expand Down
1 change: 0 additions & 1 deletion be/test/storage/lake/async_delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,5 +515,4 @@ TEST_F(LakeAsyncDeltaWriterTest, test_concurrent_write_and_close) {

SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace starrocks::lake
Loading