Skip to content

Commit

Permalink
support append left outer join append
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Aug 13, 2024
1 parent 273900f commit 7c080f1
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 4 deletions.
7 changes: 4 additions & 3 deletions src/Interpreters/Streaming/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ size_t insertFromBlockImpl(
const HashJoin::SupportMatrix HashJoin::support_matrix = {
/// <left_stroage_semantic, join_kind, join_strictness, right_storage_semantic> - supported
/// Append ...
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Append}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::ChangelogKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::VersionedKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Changelog}, true},
Expand Down Expand Up @@ -927,15 +928,15 @@ void HashJoin::init()

bidirectional_hash_join = !data_enrichment_join;

/// append-only inner join append-only on ... and date_diff_within(10s)
/// append-only inner/left join append-only on ... and date_diff_within(10s)
/// In case when emitChangeLog()
if (streaming_strictness == Strictness::Range
&& (left_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| right_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| streaming_kind != Kind::Inner))
|| (streaming_kind != Kind::Inner && streaming_kind != Kind::Left)))
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Only inner range join is supported and the left and right stream must be append-only streams in range join");
"Only inner/left range join is supported and the left and right stream must be append-only streams in range join");

range_bidirectional_hash_join = bidirectional_hash_join && (streaming_strictness == Strictness::Range);

Expand Down
114 changes: 113 additions & 1 deletion src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ TEST(StreamingHashJoin, SimpleJoinTests)
context);

/// Additional range between
if (Streaming::isAppendStorage(left_data_stream_semantic) && kind == JoinKind::Inner && strictness == JoinStrictness::All
if (Streaming::isAppendStorage(left_data_stream_semantic) && (kind == JoinKind::Inner || kind == JoinKind::Left) && strictness == JoinStrictness::All
&& Streaming::isAppendStorage(right_data_stream_semantic))
{
commonTest(
Expand All @@ -826,6 +826,118 @@ TEST(StreamingHashJoin, SimpleJoinTests)
}
}

TEST(StreamingHashJoin, AppendLeftAllJoinAppend)
{
auto context = getContext().context;
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);

/// stream(t1) left all join stream(t2) on t1.col_1 = t2.col_1
commonTest(
"left",
"all",
/*on_clause*/ "t1.col_1 = t2.col_1",
left_header,
Streaming::StorageSemantic::Append,
/*left_primary_key_column_indexes*/ std::nullopt,
right_header,
Streaming::StorageSemantic::Append,
/*right_primary_key_column_indexes*/ std::nullopt,
/*to_join_steps*/
{
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')", context),
/*expected join results*/ ExpectedJoinResults{},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
},
},
},
context);
}

TEST(StreamingHashJoin, AppendLeftRangeJoinAppend)
{
auto context = getContext().context;
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);

commonTest(
"left",
"all",
/*on_clause*/ "t1.col_1 = t2.col_1 and date_diff_within(2s, t1.col_2, t2.col_2)",
left_header,
Streaming::StorageSemantic::Append,
/*left_primary_key_column_indexes*/ std::nullopt,
right_header,
Streaming::StorageSemantic::Append,
/*right_primary_key_column_indexes*/ std::nullopt,
/*to_join_steps*/
{
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:01')", context),
/*expected join results*/ ExpectedJoinResults{},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
},
},
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:02')(1, '2023-1-1 00:00:03')(2, '2023-1-1 00:00:02')(2, '2023-1-1 00:00:03')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')",
},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:02')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:00')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:01')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:03')",
},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(2, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:02')(3, '2023-1-1 00:00:03')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:03')"
"(3, '2023-1-1 00:00:03', '1970-1-1 00:00:00')",
},
},
},
context);
}

TEST(StreamingHashJoin, AppendLeftAsofJoinAppend)
{
auto context = getContext().context;
Expand Down

0 comments on commit 7c080f1

Please sign in to comment.