Skip to content

Commit

Permalink
[BugFix] Fix stale mem flush not reduce load memory usage (backport #…
Browse files Browse the repository at this point in the history
…52613) (#52713)

Signed-off-by: meegoo <[email protected]>
Co-authored-by: meegoo <[email protected]>
(cherry picked from commit b57cc09)

# Conflicts:
#	test/sql/test_automatic_bucket/R/test_automatic_partition
#	test/sql/test_automatic_bucket/T/test_automatic_partition
  • Loading branch information
mergify[bot] committed Nov 8, 2024
1 parent a5f6da6 commit 5b4e0d7
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 3 deletions.
2 changes: 1 addition & 1 deletion be/src/storage/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int AsyncDeltaWriter::_execute(void* meta, bthread::TaskIterator<AsyncDeltaWrite
<< " tablet_id: " << writer->tablet()->tablet_id() << ": " << st;
}
if (flush_after_write) {
auto st = writer->flush_memtable_async(false);
auto st = writer->manual_flush();
LOG_IF(WARNING, !st.ok()) << "Fail to flush. txn_id: " << writer->txn_id()
<< " tablet_id: " << writer->tablet()->tablet_id() << ": " << st;
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ Status DeltaWriter::close() {
return Status::OK();
}

Status DeltaWriter::manual_flush() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
return flush_memtable_async(false);
}

Status DeltaWriter::flush_memtable_async(bool eos) {
_last_write_ts = 0;
_write_buffer_size = 0;
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class DeltaWriter {

[[nodiscard]] Status flush_memtable_async(bool eos = false);

// Manual flush used by stale memtable flush
Status manual_flush();

// Rollback all writes and delete the Rowset created by 'commit()', if any.
// [thread-safe]
//
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ inline int AsyncDeltaWriterImpl::execute(void* meta, bthread::TaskIterator<Async
iter->cb(st);
}
if (flush_after_write) {
st = delta_writer->flush_async();
st = delta_writer->manual_flush();
LOG_IF(ERROR, !st.ok()) << "Fail to flush. tablet_id: " << delta_writer->tablet_id()
<< " txn_id: " << delta_writer->txn_id() << ": " << st;
}
Expand Down
12 changes: 12 additions & 0 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class DeltaWriterImpl {

[[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; }

Status manual_flush();

Status flush();

Status flush_async();
Expand Down Expand Up @@ -299,6 +301,11 @@ inline Status DeltaWriterImpl::init_tablet_schema() {
}
}

inline Status DeltaWriterImpl::manual_flush() {
SCOPED_THREAD_LOCAL_MEM_SETTER(_mem_tracker, false);
return flush_async();
}

inline Status DeltaWriterImpl::flush() {
RETURN_IF_ERROR(flush_async());
return _flush_token->wait();
Expand Down Expand Up @@ -657,6 +664,11 @@ MemTracker* DeltaWriter::mem_tracker() {
return _impl->mem_tracker();
}

Status DeltaWriter::manual_flush() {
DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::manual_flush() in a bthread";
return _impl->manual_flush();
}

Status DeltaWriter::flush() {
DCHECK_EQ(0, bthread_self()) << "Should not invoke DeltaWriter::flush() in a bthread";
return _impl->flush();
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class DeltaWriter {
// NOTE: Do NOT invoke this method in a bthread.
Status finish(FinishMode mode = kWriteTxnLog);

// Manual flush used by stale memtable flush
// different from `flush()`, this method will reduce memory usage in `mem_tracker`
Status manual_flush();

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
Status flush();
Expand Down
1 change: 0 additions & 1 deletion be/test/storage/lake/async_delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,5 +515,4 @@ TEST_F(LakeAsyncDeltaWriterTest, test_concurrent_write_and_close) {

SyncPoint::GetInstance()->DisableProcessing();
}

} // namespace starrocks::lake
103 changes: 103 additions & 0 deletions test/sql/test_automatic_bucket/R/test_automatic_partition
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,106 @@ delete from t where k = 1;
select * from t;
-- result:
-- !result
<<<<<<< HEAD
=======

-- name: test_mutable_bucket @sequential
create database ggg;
-- result:
-- !result
use ggg;
-- result:
-- !result
create table t(k date, v int) PARTITION BY DATE_TRUNC('DAY', `k`)
PROPERTIES (
"replication_num" = "1",
"bucket_size" = "1",
"mutable_bucket_num" = "2"
);
-- result:
-- !result
show create table t;
-- result:
t CREATE TABLE `t` (
`k` date NULL COMMENT "",
`v` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k`, `v`)
PARTITION BY date_trunc('DAY', `k`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"bucket_size" = "1",
"compression" = "LZ4",
"fast_schema_evolution" = "true",
"mutable_bucket_num" = "2",
"replicated_storage" = "true",
"replication_num" = "1"
);
-- !result
select count(*) from information_schema.be_tablets t1, information_schema.tables_config t2 where TABLE_NAME='t' and t1.TABLE_ID=t2.TABLE_ID and TABLE_SCHEMA='ggg';
-- result:
2
-- !result
insert into t values('2021-01-01', 1);
-- result:
-- !result
select count(*) from information_schema.be_tablets t1, information_schema.tables_config t2 where TABLE_NAME='t' and t1.TABLE_ID=t2.TABLE_ID and TABLE_SCHEMA='ggg';
-- result:
4
-- !result
alter table t set('mutable_bucket_num'='3');
-- result:
-- !result
show create table t;
-- result:
t CREATE TABLE `t` (
`k` date NULL COMMENT "",
`v` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k`, `v`)
PARTITION BY date_trunc('DAY', `k`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"bucket_size" = "1",
"compression" = "LZ4",
"fast_schema_evolution" = "true",
"mutable_bucket_num" = "3",
"replicated_storage" = "true",
"replication_num" = "1"
);
-- !result
insert into t values('2021-01-01', 1);
-- result:
-- !result
select count(*) from information_schema.be_tablets t1, information_schema.tables_config t2 where TABLE_NAME='t' and t1.TABLE_ID=t2.TABLE_ID and TABLE_SCHEMA='ggg';
-- result:
7
-- !result
select * from t;
-- result:
2021-01-01 1
2021-01-01 1
-- !result
alter table t set('mutable_bucket_num'='-1');
-- result:
E: (5064, 'Getting analyzing error. Detail message: Illegal mutable bucket num: -1.')
-- !result
alter table t set('mutable_bucket_num'='a');
-- result:
E: (5064, 'Getting analyzing error. Detail message: Mutable bucket num: For input string: "a".')
-- !result

-- name: test_stale_flush @sequential
create table t(k int)properties('bucket_size'='1');
-- result:
-- !result
update information_schema.be_configs set value=102400 where name='write_buffer_size';
-- result:
-- !result
insert into t select generate_series from TABLE(generate_series(1, 10000000));
-- result:
-- !result
update information_schema.be_configs set value=104857600 where name='write_buffer_size';
-- result:
-- !result
>>>>>>> b57cc09f6b ([BugFix] Fix stale mem flush not reduce load memory usage (backport #52613) (#52713))
32 changes: 32 additions & 0 deletions test/sql/test_automatic_bucket/T/test_automatic_partition
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,35 @@ select * from t;

delete from t where k = 1;
select * from t;
<<<<<<< HEAD
=======

-- name: test_mutable_bucket @sequential
create database ggg;
use ggg;
create table t(k date, v int) PARTITION BY DATE_TRUNC('DAY', `k`)
PROPERTIES (
"replication_num" = "1",
"bucket_size" = "1",
"mutable_bucket_num" = "2"
);
show create table t;
select count(*) from information_schema.be_tablets t1, information_schema.tables_config t2 where TABLE_NAME='t' and t1.TABLE_ID=t2.TABLE_ID and TABLE_SCHEMA='ggg';
insert into t values('2021-01-01', 1);
select count(*) from information_schema.be_tablets t1, information_schema.tables_config t2 where TABLE_NAME='t' and t1.TABLE_ID=t2.TABLE_ID and TABLE_SCHEMA='ggg';

alter table t set('mutable_bucket_num'='3');
show create table t;
insert into t values('2021-01-01', 1);
select count(*) from information_schema.be_tablets t1, information_schema.tables_config t2 where TABLE_NAME='t' and t1.TABLE_ID=t2.TABLE_ID and TABLE_SCHEMA='ggg';
select * from t;

alter table t set('mutable_bucket_num'='-1');
alter table t set('mutable_bucket_num'='a');

-- name: test_stale_flush @sequential
create table t(k int)properties('bucket_size'='1');
update information_schema.be_configs set value=102400 where name='write_buffer_size';
insert into t select generate_series from TABLE(generate_series(1, 10000000));
update information_schema.be_configs set value=104857600 where name='write_buffer_size';
>>>>>>> b57cc09f6b ([BugFix] Fix stale mem flush not reduce load memory usage (backport #52613) (#52713))

0 comments on commit 5b4e0d7

Please sign in to comment.