diff --git a/be/src/storage/async_delta_writer.cpp b/be/src/storage/async_delta_writer.cpp index 3e598a4589a1f..da4d732f35199 100644 --- a/be/src/storage/async_delta_writer.cpp +++ b/be/src/storage/async_delta_writer.cpp @@ -84,7 +84,7 @@ int AsyncDeltaWriter::_execute(void* meta, bthread::TaskIteratortablet()->tablet_id() << ": " << st; } if (flush_after_write) { - auto st = writer->flush_memtable_async(false); + auto st = writer->manual_flush(); LOG_IF(WARNING, !st.ok()) << "Fail to flush. txn_id: " << writer->txn_id() << " tablet_id: " << writer->tablet()->tablet_id() << ": " << st; } diff --git a/be/src/storage/delta_writer.cpp b/be/src/storage/delta_writer.cpp index 9fb99fb2347e2..206fd653d68b1 100644 --- a/be/src/storage/delta_writer.cpp +++ b/be/src/storage/delta_writer.cpp @@ -533,6 +533,11 @@ Status DeltaWriter::close() { return Status::OK(); } +Status DeltaWriter::manual_flush() { + SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false); + return flush_memtable_async(false); +} + Status DeltaWriter::flush_memtable_async(bool eos) { _last_write_ts = 0; _write_buffer_size = 0; diff --git a/be/src/storage/delta_writer.h b/be/src/storage/delta_writer.h index 0e889592647f6..b0c1492f81ce0 100644 --- a/be/src/storage/delta_writer.h +++ b/be/src/storage/delta_writer.h @@ -110,6 +110,9 @@ class DeltaWriter { // [NOT thread-safe] Status commit(); + // Manual flush used by stale memtable flush + Status manual_flush(); + Status flush_memtable_async(bool eos = false); // Rollback all writes and delete the Rowset created by 'commit()', if any. diff --git a/be/src/storage/lake/async_delta_writer.cpp b/be/src/storage/lake/async_delta_writer.cpp index 09980f0ac4b6c..3a40019c4deee 100644 --- a/be/src/storage/lake/async_delta_writer.cpp +++ b/be/src/storage/lake/async_delta_writer.cpp @@ -163,7 +163,7 @@ inline int AsyncDeltaWriterImpl::execute(void* meta, bthread::TaskIterator(task_ptr); if (st.ok()) { - st.update(delta_writer->flush()); + st.update(delta_writer->manual_flush()); } flush_task->cb(st); break; diff --git a/be/src/storage/lake/delta_writer.cpp b/be/src/storage/lake/delta_writer.cpp index b451d8e3beb62..63b267a8114c6 100644 --- a/be/src/storage/lake/delta_writer.cpp +++ b/be/src/storage/lake/delta_writer.cpp @@ -115,6 +115,8 @@ class DeltaWriterImpl { [[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; } + Status manual_flush(); + Status flush(); Status flush_async(); @@ -307,6 +309,11 @@ inline Status DeltaWriterImpl::init_tablet_schema() { } } +inline Status DeltaWriterImpl::manual_flush() { + SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false); + return flush_async(); +} + inline Status DeltaWriterImpl::flush() { RETURN_IF_ERROR(flush_async()); return _flush_token->wait(); @@ -699,6 +706,11 @@ MemTracker* DeltaWriter::mem_tracker() { return _impl->mem_tracker(); } +Status DeltaWriter::manual_flush() { + DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::manual_flush() in a bthread"; + return _impl->manual_flush(); +} + Status DeltaWriter::flush() { DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::flush() in a bthread"; return _impl->flush(); diff --git a/be/src/storage/lake/delta_writer.h b/be/src/storage/lake/delta_writer.h index 26d1d833c89a8..22d7e0ae0c9fd 100644 --- a/be/src/storage/lake/delta_writer.h +++ b/be/src/storage/lake/delta_writer.h @@ -65,6 +65,10 @@ class DeltaWriter { // NOTE: Do NOT invoke this method in a bthread. Status finish(); + // Manual flush used by stale memtable flush + // different from `flush()`, this method will reduce memory usage in `mem_tracker` + Status manual_flush(); + // Manual flush, mainly used in UT // NOTE: Do NOT invoke this method in a bthread. Status flush(); diff --git a/be/test/storage/lake/async_delta_writer_test.cpp b/be/test/storage/lake/async_delta_writer_test.cpp index 4d059fb4ccf44..43dcea08484c1 100644 --- a/be/test/storage/lake/async_delta_writer_test.cpp +++ b/be/test/storage/lake/async_delta_writer_test.cpp @@ -543,6 +543,9 @@ TEST_F(LakeAsyncDeltaWriterTest, test_flush) { latch.count_down(); }); latch.wait(); + while (delta_writer->queueing_memtable_num() > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } EXPECT_EQ(0, delta_writer->queueing_memtable_num()); // test flush after close diff --git a/test/sql/test_automatic_bucket/R/test_automatic_partition b/test/sql/test_automatic_bucket/R/test_automatic_partition index 830eb105ac12b..b2b7ddbbf247c 100644 --- a/test/sql/test_automatic_bucket/R/test_automatic_partition +++ b/test/sql/test_automatic_bucket/R/test_automatic_partition @@ -38,6 +38,11 @@ PROPERTIES ( + + + + + -- name: test_invalid_bucket_size create table t0(k int) properties('bucket_size'='0'); -- result: @@ -62,6 +67,11 @@ alter table t set('bucket_size'='2048'); + + + + + -- name: test_automatic_bucket create database kkk; -- result: @@ -96,6 +106,11 @@ alter table t set('bucket_size'='1'); + + + + + -- name: test_range_partition @sequential create database ttt; -- result: @@ -157,6 +172,11 @@ select * from t; + + + + + -- name: test_list_partition @sequential create database ddd; -- result: @@ -218,6 +238,11 @@ select * from t; + + + + + -- name: test_expr_partition @sequential create database eee; -- result: @@ -274,6 +299,11 @@ select * from t; + + + + + -- name: test_schema_change @sequential create table t(k date, v int) DUPLICATE KEY(k) PARTITION BY DATE_TRUNC('DAY', `k`) PROPERTIES ( @@ -380,6 +410,11 @@ select * from t; + + + + + -- name: test_mv @sequential create table t(k date, v int, v1 int) DUPLICATE KEY(k) PARTITION BY DATE_TRUNC('DAY', `k`) PROPERTIES ( @@ -464,6 +499,11 @@ select k, v1 from t; + + + + + -- name: test_delete create table t(k int, v int) PROPERTIES ( @@ -504,6 +544,11 @@ select * from t; + + + + + -- name: test_mutable_bucket @sequential create database ggg; -- result: @@ -589,6 +634,11 @@ alter table t set('mutable_bucket_num'='a'); -- result: E: (1064, 'Getting analyzing error. Detail message: Mutable bucket num: For input string: "a".') -- !result + + + + + -- name: test_tablet_check create table t(k int)properties('bucket_size'='1'); -- result: @@ -617,4 +667,21 @@ select * from t; -- !result admin set frontend config('consistency_tablet_meta_check_interval_ms'='7200000'); -- result: +-- !result + + + + +-- name: test_stale_flush @sequential +create table t(k int)properties('bucket_size'='1'); +-- result: +-- !result +update information_schema.be_configs set value=102400 where name='write_buffer_size'; +-- result: +-- !result +insert into t select generate_series from TABLE(generate_series(1, 10000000)); +-- result: +-- !result +update information_schema.be_configs set value=104857600 where name='write_buffer_size'; +-- result: -- !result \ No newline at end of file diff --git a/test/sql/test_automatic_bucket/T/test_automatic_partition b/test/sql/test_automatic_bucket/T/test_automatic_partition index 79fd0eb7c1238..7062da9b78528 100644 --- a/test/sql/test_automatic_bucket/T/test_automatic_partition +++ b/test/sql/test_automatic_bucket/T/test_automatic_partition @@ -193,3 +193,9 @@ admin set frontend config('consistency_tablet_meta_check_interval_ms'='100'); select sleep(5); select * from t; admin set frontend config('consistency_tablet_meta_check_interval_ms'='7200000'); + +-- name: test_stale_flush @sequential +create table t(k int)properties('bucket_size'='1'); +update information_schema.be_configs set value=102400 where name='write_buffer_size'; +insert into t select generate_series from TABLE(generate_series(1, 10000000)); +update information_schema.be_configs set value=104857600 where name='write_buffer_size';