Skip to content

Commit

Permalink
[BugFix] Fix stale mem flush not reduce load memory usage
Browse files Browse the repository at this point in the history
Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Nov 7, 2024
1 parent 868f415 commit 8b467dc
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 2 deletions.
2 changes: 1 addition & 1 deletion be/src/storage/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int AsyncDeltaWriter::_execute(void* meta, bthread::TaskIterator<AsyncDeltaWrite
<< " tablet_id: " << writer->tablet()->tablet_id() << ": " << st;
}
if (flush_after_write) {
auto st = writer->flush_memtable_async(false);
auto st = writer->manual_flush();
LOG_IF(WARNING, !st.ok()) << "Fail to flush. txn_id: " << writer->txn_id()
<< " tablet_id: " << writer->tablet()->tablet_id() << ": " << st;
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ Status DeltaWriter::close() {
return Status::OK();
}

Status DeltaWriter::manual_flush() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
return flush_memtable_async(false);
}

Status DeltaWriter::flush_memtable_async(bool eos) {
_last_write_ts = 0;
_write_buffer_size = 0;
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class DeltaWriter {
// [NOT thread-safe]
Status commit();

// Manual flush used by stale memtable flush
Status manual_flush();

Status flush_memtable_async(bool eos = false);

// Rollback all writes and delete the Rowset created by 'commit()', if any.
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ inline int AsyncDeltaWriterImpl::execute(void* meta, bthread::TaskIterator<Async
case kFlushTask: {
auto flush_task = std::static_pointer_cast<FlushTask>(task_ptr);
if (st.ok()) {
st.update(delta_writer->flush());
st.update(delta_writer->manual_flush());
}
flush_task->cb(st);
break;
Expand Down
12 changes: 12 additions & 0 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class DeltaWriterImpl {

[[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; }

Status manual_flush();

Status flush();

Status flush_async();
Expand Down Expand Up @@ -307,6 +309,11 @@ inline Status DeltaWriterImpl::init_tablet_schema() {
}
}

inline Status DeltaWriterImpl::manual_flush() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
return flush_async();
}

inline Status DeltaWriterImpl::flush() {
RETURN_IF_ERROR(flush_async());
return _flush_token->wait();
Expand Down Expand Up @@ -699,6 +706,11 @@ MemTracker* DeltaWriter::mem_tracker() {
return _impl->mem_tracker();
}

Status DeltaWriter::manual_flush() {
DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::manual_flush() in a bthread";
return _impl->manual_flush();
}

Status DeltaWriter::flush() {
DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::flush() in a bthread";
return _impl->flush();
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class DeltaWriter {
// NOTE: Do NOT invoke this method in a bthread.
Status finish();

// Manual flush used by stale memtable flush
// different from `flush()`, this method will reduce memory usage in `mem_tracker`
Status manual_flush();

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
Status flush();
Expand Down
3 changes: 3 additions & 0 deletions be/test/storage/lake/async_delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ TEST_F(LakeAsyncDeltaWriterTest, test_flush) {
latch.count_down();
});
latch.wait();
while (delta_writer->queueing_memtable_num() > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
EXPECT_EQ(0, delta_writer->queueing_memtable_num());

// test flush after close
Expand Down
67 changes: 67 additions & 0 deletions test/sql/test_automatic_bucket/R/test_automatic_partition
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ PROPERTIES (








-- name: test_invalid_bucket_size
create table t0(k int) properties('bucket_size'='0');
-- result:
Expand All @@ -62,6 +67,11 @@ alter table t set('bucket_size'='2048');








-- name: test_automatic_bucket
create database kkk;
-- result:
Expand Down Expand Up @@ -96,6 +106,11 @@ alter table t set('bucket_size'='1');








-- name: test_range_partition @sequential
create database ttt;
-- result:
Expand Down Expand Up @@ -157,6 +172,11 @@ select * from t;








-- name: test_list_partition @sequential
create database ddd;
-- result:
Expand Down Expand Up @@ -218,6 +238,11 @@ select * from t;








-- name: test_expr_partition @sequential
create database eee;
-- result:
Expand Down Expand Up @@ -274,6 +299,11 @@ select * from t;








-- name: test_schema_change @sequential
create table t(k date, v int) DUPLICATE KEY(k) PARTITION BY DATE_TRUNC('DAY', `k`)
PROPERTIES (
Expand Down Expand Up @@ -380,6 +410,11 @@ select * from t;








-- name: test_mv @sequential
create table t(k date, v int, v1 int) DUPLICATE KEY(k) PARTITION BY DATE_TRUNC('DAY', `k`)
PROPERTIES (
Expand Down Expand Up @@ -464,6 +499,11 @@ select k, v1 from t;








-- name: test_delete
create table t(k int, v int)
PROPERTIES (
Expand Down Expand Up @@ -504,6 +544,11 @@ select * from t;








-- name: test_mutable_bucket @sequential
create database ggg;
-- result:
Expand Down Expand Up @@ -589,6 +634,11 @@ alter table t set('mutable_bucket_num'='a');
-- result:
E: (1064, 'Getting analyzing error. Detail message: Mutable bucket num: For input string: "a".')
-- !result





-- name: test_tablet_check
create table t(k int)properties('bucket_size'='1');
-- result:
Expand Down Expand Up @@ -617,4 +667,21 @@ select * from t;
-- !result
admin set frontend config('consistency_tablet_meta_check_interval_ms'='7200000');
-- result:
-- !result




-- name: test_stale_flush @sequential
create table t(k int)properties('bucket_size'='1');
-- result:
-- !result
update information_schema.be_configs set value=102400 where name='write_buffer_size';
-- result:
-- !result
insert into t select generate_series from TABLE(generate_series(1, 10000000));
-- result:
-- !result
update information_schema.be_configs set value=104857600 where name='write_buffer_size';
-- result:
-- !result
6 changes: 6 additions & 0 deletions test/sql/test_automatic_bucket/T/test_automatic_partition
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,9 @@ admin set frontend config('consistency_tablet_meta_check_interval_ms'='100');
select sleep(5);
select * from t;
admin set frontend config('consistency_tablet_meta_check_interval_ms'='7200000');

-- name: test_stale_flush @sequential
create table t(k int)properties('bucket_size'='1');
update information_schema.be_configs set value=102400 where name='write_buffer_size';
insert into t select generate_series from TABLE(generate_series(1, 10000000));
update information_schema.be_configs set value=104857600 where name='write_buffer_size';

0 comments on commit 8b467dc

Please sign in to comment.