From 7c080f189fd1e116c3ab337d6b9b020d0e73247f Mon Sep 17 00:00:00 2001
From: Lisen
Date: Tue, 13 Aug 2024 14:01:41 +0800
Subject: [PATCH] support append left outer join append
---
src/Interpreters/Streaming/HashJoin.cpp | 7 +-
.../tests/gtest_streaming_hash_join.cpp | 114 +++++++++++++++++-
2 files changed, 117 insertions(+), 4 deletions(-)
diff --git a/src/Interpreters/Streaming/HashJoin.cpp b/src/Interpreters/Streaming/HashJoin.cpp
index 9d69651f7e..66956bf534 100644
--- a/src/Interpreters/Streaming/HashJoin.cpp
+++ b/src/Interpreters/Streaming/HashJoin.cpp
@@ -785,6 +785,7 @@ size_t insertFromBlockImpl(
const HashJoin::SupportMatrix HashJoin::support_matrix = {
/// - supported
/// Append ...
+ {{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Append}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::ChangelogKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::VersionedKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Changelog}, true},
@@ -927,15 +928,15 @@ void HashJoin::init()
bidirectional_hash_join = !data_enrichment_join;
- /// append-only inner join append-only on ... and date_diff_within(10s)
+ /// append-only inner/left join append-only on ... and date_diff_within(10s)
/// In case when emitChangeLog()
if (streaming_strictness == Strictness::Range
&& (left_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| right_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
- || streaming_kind != Kind::Inner))
+ || (streaming_kind != Kind::Inner && streaming_kind != Kind::Left)))
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
- "Only inner range join is supported and the left and right stream must be append-only streams in range join");
+ "Only inner/left range join is supported and the left and right stream must be append-only streams in range join");
range_bidirectional_hash_join = bidirectional_hash_join && (streaming_strictness == Strictness::Range);
diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp
index 6191e1b6c4..2c30361fcc 100644
--- a/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp
+++ b/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp
@@ -806,7 +806,7 @@ TEST(StreamingHashJoin, SimpleJoinTests)
context);
/// Additional range between
- if (Streaming::isAppendStorage(left_data_stream_semantic) && kind == JoinKind::Inner && strictness == JoinStrictness::All
+ if (Streaming::isAppendStorage(left_data_stream_semantic) && (kind == JoinKind::Inner || kind == JoinKind::Left) && strictness == JoinStrictness::All
&& Streaming::isAppendStorage(right_data_stream_semantic))
{
commonTest(
@@ -826,6 +826,118 @@ TEST(StreamingHashJoin, SimpleJoinTests)
}
}
+TEST(StreamingHashJoin, AppendLeftAllJoinAppend)
+{
+ auto context = getContext().context;
+ Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
+ Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
+
+ /// stream(t1) left all join stream(t2) on t1.col_1 = t2.col_1
+ commonTest(
+ "left",
+ "all",
+ /*on_clause*/ "t1.col_1 = t2.col_1",
+ left_header,
+ Streaming::StorageSemantic::Append,
+ /*left_primary_key_column_indexes*/ std::nullopt,
+ right_header,
+ Streaming::StorageSemantic::Append,
+ /*right_primary_key_column_indexes*/ std::nullopt,
+ /*to_join_steps*/
+ {
+ {
+ /*to join pos*/ ToJoinStep::RIGHT,
+ /*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')", context),
+ /*expected join results*/ ExpectedJoinResults{},
+ },
+ {
+ /*to join pos*/ ToJoinStep::LEFT,
+ /*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
+ /*expected join results*/
+ ExpectedJoinResults{
+ /// output header: col_1, col_2, t2.col_2
+ .values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
+ "(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
+ },
+ },
+ },
+ context);
+}
+
+TEST(StreamingHashJoin, AppendLeftRangeJoinAppend)
+{
+ auto context = getContext().context;
+ Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
+ Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
+
+ commonTest(
+ "left",
+ "all",
+ /*on_clause*/ "t1.col_1 = t2.col_1 and date_diff_within(2s, t1.col_2, t2.col_2)",
+ left_header,
+ Streaming::StorageSemantic::Append,
+ /*left_primary_key_column_indexes*/ std::nullopt,
+ right_header,
+ Streaming::StorageSemantic::Append,
+ /*right_primary_key_column_indexes*/ std::nullopt,
+ /*to_join_steps*/
+ {
+ {
+ /*to join pos*/ ToJoinStep::RIGHT,
+ /*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:01')", context),
+ /*expected join results*/ ExpectedJoinResults{},
+ },
+ {
+ /*to join pos*/ ToJoinStep::LEFT,
+ /*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
+ /*expected join results*/
+ ExpectedJoinResults{
+ /// output header: col_1, col_2, t2.col_2
+ .values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
+ "(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
+ },
+ },
+ {
+ /*to join pos*/ ToJoinStep::RIGHT,
+ /*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:02')(1, '2023-1-1 00:00:03')(2, '2023-1-1 00:00:02')(2, '2023-1-1 00:00:03')", context),
+ /*expected join results*/
+ ExpectedJoinResults{
+ /// output header: col_1, col_2, t2.col_2
+ .values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
+ "(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')",
+ },
+ },
+ {
+ /*to join pos*/ ToJoinStep::LEFT,
+ /*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:02')", context),
+ /*expected join results*/
+ ExpectedJoinResults{
+ /// output header: col_1, col_2, t2.col_2
+ .values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
+ "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
+ "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
+ "(1, '2023-1-1 00:00:02', '2023-1-1 00:00:00')"
+ "(1, '2023-1-1 00:00:02', '2023-1-1 00:00:01')"
+ "(1, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
+ "(1, '2023-1-1 00:00:02', '2023-1-1 00:00:03')",
+ },
+ },
+ {
+ /*to join pos*/ ToJoinStep::LEFT,
+ /*to join block*/ prepareBlockByHeader(left_header, "(2, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:02')(3, '2023-1-1 00:00:03')", context),
+ /*expected join results*/
+ ExpectedJoinResults{
+ /// output header: col_1, col_2, t2.col_2
+ .values = "(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
+ "(2, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
+ "(2, '2023-1-1 00:00:02', '2023-1-1 00:00:03')"
+ "(3, '2023-1-1 00:00:03', '1970-1-1 00:00:00')",
+ },
+ },
+ },
+ context);
+}
+
TEST(StreamingHashJoin, AppendLeftAsofJoinAppend)
{
auto context = getContext().context;