Skip to content

Commit

Permalink
Fix StreamLoadPipe constructor
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 4, 2024
1 parent 85bf341 commit 3d5e0bc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
9 changes: 3 additions & 6 deletions be/src/runtime/stream_load/stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,17 @@ namespace starrocks {

static constexpr size_t DEFAULT_STREAM_LOAD_PIPE_BUFFERED_BYTES = 1024 * 1024;
static constexpr size_t DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE = 64 * 1024;
static constexpr int32_t DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS = 50;

// StreamLoadPipe use to transfer data from producer to consumer
// Data in pip is stored in chunks.
class StreamLoadPipe : public MessageBodySink {
public:
StreamLoadPipe(size_t max_buffered_bytes = DEFAULT_STREAM_LOAD_PIPE_BUFFERED_BYTES,
size_t min_chunk_size = DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE)
: StreamLoadPipe(false, DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS, max_buffered_bytes, min_chunk_size) {
}
: StreamLoadPipe(false, -1, max_buffered_bytes, min_chunk_size) {}

StreamLoadPipe(bool non_blocking_read, int32_t non_blocking_wait_ms = DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS,
size_t max_buffered_bytes = DEFAULT_STREAM_LOAD_PIPE_BUFFERED_BYTES,
size_t min_chunk_size = DEFAULT_STREAM_LOAD_PIPE_CHUNK_SIZE)
StreamLoadPipe(bool non_blocking_read, int32_t non_blocking_wait_ms, size_t max_buffered_bytes,
size_t min_chunk_size)
: _non_blocking_read(non_blocking_read),
_non_blocking_wait_ms(non_blocking_wait_ms),
_max_buffered_bytes(max_buffered_bytes),
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/stream_load/time_bounded_stream_load_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

namespace starrocks {

static constexpr int32_t DEFAULT_STREAM_LOAD_PIPE_NON_BLOCKING_WAIT_MS = 50;

class TimeBoundedStreamLoadPipe : public StreamLoadPipe {
public:
TimeBoundedStreamLoadPipe(int active_time_ms,
Expand Down
2 changes: 1 addition & 1 deletion be/test/runtime/stream_load_pipe_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ PARALLEL_TEST(StreamLoadPipeTest, cancel_with_ok_status) {
}

PARALLEL_TEST(StreamLoadPipeTest, non_blocking_read) {
StreamLoadPipe pipe(true, 50);
StreamLoadPipe pipe(true, 50, 1000, 1000);

ASSERT_TRUE(pipe.read().status().is_time_out());

Expand Down

0 comments on commit 3d5e0bc

Please sign in to comment.