diff --git a/docs/content.zh/docs/dev/table/sourcesSinks.md b/docs/content.zh/docs/dev/table/sourcesSinks.md index 8209ce9966b5e..4e827040227bf 100644 --- a/docs/content.zh/docs/dev/table/sourcesSinks.md +++ b/docs/content.zh/docs/dev/table/sourcesSinks.md @@ -270,6 +270,10 @@ Flink 会对工厂类逐个进行检查,确保其“标识符”是全局唯 {{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java" name="SupportsPartitioning" >}} 支持 DynamicTableSink 写入分区数据。 + + {{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsBucketing.java" name="SupportsBucketing" >}} + Enables bucketing for a DynamicTableSink. + {{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java" name="SupportsWritingMetadata" >}} 支持 DynamicTableSink 写入元数据列。Sink 端会在消费数据行时,在最后接受相应的元数据信息并进行持久化,其中包括元数据的格式信息。 diff --git a/docs/content.zh/docs/dev/table/sql/alter.md b/docs/content.zh/docs/dev/table/sql/alter.md index 993613379e4dc..13809d6a4ed93 100644 --- a/docs/content.zh/docs/dev/table/sql/alter.md +++ b/docs/content.zh/docs/dev/table/sql/alter.md @@ -333,9 +333,9 @@ Flink SQL> DESC CATALOG EXTENDED cat2; 当前支持的 ALTER TABLE 语法如下 ```text ALTER TABLE [IF EXISTS] table_name { - ADD { | ( [, ...]) | [IF NOT EXISTS] [ ...]} - | MODIFY { | ( [, ...]) } - | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] [, ...]} + ADD { | ( [, ...]) | [IF NOT EXISTS] [ ...] | } + | MODIFY { | ( [, ...]) | } + | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] [, ...] | DISTRIBUTION } | RENAME old_column_name TO new_column_name | RENAME TO new_table_name | SET (key1=val1, ...) @@ -368,6 +368,12 @@ ALTER TABLE [IF EXISTS] table_name { : PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)] + +: +{ + DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] + | DISTRIBUTION INTO n BUCKETS +} ``` **IF EXISTS** @@ -375,7 +381,7 @@ ALTER TABLE [IF EXISTS] table_name { 若表不存在,则不进行任何操作。 ### ADD -使用 `ADD` 语句向已有表中增加 [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key),[watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark), [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by)。 +使用 `ADD` 语句向已有表中增加 [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key),[watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark), [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by), and a distribution]({{< ref "docs/dev/table/sql/create" >}}#distributed)。 向表新增列时可通过 `FIRST` or `AFTER col_name` 指定位置,不指定位置时默认追加在最后。 @@ -398,6 +404,18 @@ ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1'); -- 新增两个分区 ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2'); + +-- add new distribution using a hash on uid into 4 buckets +ALTER TABLE MyTable ADD DISTRIBUTION BY HASH(uid) INTO 4 BUCKETS; + +-- add new distribution on uid into 4 buckets +CREATE TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 4 BUCKETS; + +-- add new distribution on uid. +CREATE TABLE MyTable ADD DISTRIBUTION BY (uid); + +-- add new distribution into 4 buckets +CREATE TABLE MyTable ADD DISTRIBUTION INTO 4 BUCKETS; ``` 注意 指定列为主键列时会隐式修改该列的 nullability 为 false。 @@ -445,6 +463,9 @@ ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2); -- 删除 watermark ALTER TABLE MyTable DROP WATERMARK; + +-- drop distribution +ALTER TABLE MyTable DROP DISTRIBUTION; ``` ### RENAME diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md index 69467cd4042a0..7d8074eaee70e 100644 --- a/docs/content.zh/docs/dev/table/sql/create.md +++ b/docs/content.zh/docs/dev/table/sql/create.md @@ -157,6 +157,7 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + [ ] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( )] | AS select_query ] @@ -183,10 +184,16 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name : { - { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } + { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | DISTRIBUTION | PARTITIONS } | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...] +: +{ + DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] + | DISTRIBUTION INTO n BUCKETS +} + ``` 根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。 @@ -405,6 +412,36 @@ Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处 根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。 +### `DISTRIBUTED` + +Buckets enable load balancing in an external storage system by splitting data into disjoint subsets. These subsets group rows with potentially "infinite" keyspace into smaller and more manageable chunks that allow for efficient parallel processing. + +Bucketing depends heavily on the semantics of the underlying connector. However, a user can influence the bucketing behavior by specifying the number of buckets, the bucketing algorithm, and (if the algorithm allows it) the columns which are used for target bucket calculation. + +All bucketing components (i.e. bucket number, distribution algorithm, bucket key columns) are +optional from a SQL syntax perspective. + +Given the following SQL statements: + +```sql +-- Example 1 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 4 BUCKETS; + +-- Example 2 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 4 BUCKETS; + +-- Example 3 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid); + +-- Example 4 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 4 BUCKETS; +``` + +Example 1 declares a hash function on a fixed number of 4 buckets (i.e. HASH(uid) % 4 = target +bucket). Example 2 leaves the selection of an algorithm up to the connector. Additionally, +Example 3 leaves the number of buckets up to the connector. +In contrast, Example 4 only defines the number of buckets. + ### `WITH` Options 表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。 @@ -464,6 +501,7 @@ CREATE TABLE Orders_with_watermark ( * CONSTRAINTS - 主键和唯一键约束 * GENERATED - 计算列 * OPTIONS - 连接器信息、格式化方式等配置项 +* DISTRIBUTION - distribution definition * PARTITIONS - 表分区信息 * WATERMARKS - watermark 定义 diff --git a/docs/content/docs/dev/table/sourcesSinks.md b/docs/content/docs/dev/table/sourcesSinks.md index dde34f5c94bc6..14193e4f11fd1 100644 --- a/docs/content/docs/dev/table/sourcesSinks.md +++ b/docs/content/docs/dev/table/sourcesSinks.md @@ -334,6 +334,10 @@ the `ParallelismProvider` interface. {{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java" name="SupportsPartitioning" >}} Enables to write partitioned data in a DynamicTableSink. + + {{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsBucketing.java" name="SupportsBucketing" >}} + Enables bucketing for a DynamicTableSink. + {{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java" name="SupportsWritingMetadata" >}} Enables to write metadata columns into a DynamicTableSink. A table sink is diff --git a/docs/content/docs/dev/table/sql/alter.md b/docs/content/docs/dev/table/sql/alter.md index b5f199ced5b7b..30821ccf63844 100644 --- a/docs/content/docs/dev/table/sql/alter.md +++ b/docs/content/docs/dev/table/sql/alter.md @@ -334,9 +334,9 @@ Flink SQL> DESC CATALOG EXTENDED cat2; The following grammar gives an overview about the available syntax: ```text ALTER TABLE [IF EXISTS] table_name { - ADD { | ( [, ...]) | [IF NOT EXISTS] [ ...]} - | MODIFY { | ( [, ...]) } - | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] [, ...]} + ADD { | ( [, ...]) | [IF NOT EXISTS] [ ...] | } + | MODIFY { | ( [, ...]) | } + | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] [, ...] | DISTRIBUTION } | RENAME old_column_name TO new_column_name | RENAME TO new_table_name | SET (key1=val1, ...) @@ -369,6 +369,12 @@ ALTER TABLE [IF EXISTS] table_name { : PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)] + +: +{ + DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] + | DISTRIBUTION INTO n BUCKETS +} ``` **IF EXISTS** @@ -376,7 +382,7 @@ ALTER TABLE [IF EXISTS] table_name { If the table does not exist, nothing happens. ### ADD -Use `ADD` clause to add [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key), [watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark) and [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by) to an existing table. +Use `ADD` clause to add [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key), a [watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark), [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by), and a [distribution]({{< ref "docs/dev/table/sql/create" >}}#distributed) to an existing table. To add a column at the specified position, use `FIRST` or `AFTER col_name`. By default, the column is appended at last. @@ -399,6 +405,18 @@ ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1'); -- add two new partitions ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2'); + +-- add new distribution using a hash on uid into 4 buckets +ALTER TABLE MyTable ADD DISTRIBUTION BY HASH(uid) INTO 4 BUCKETS; + +-- add new distribution on uid into 4 buckets +CREATE TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 4 BUCKETS; + +-- add new distribution on uid. +CREATE TABLE MyTable ADD DISTRIBUTION BY (uid); + +-- add new distribution into 4 buckets +CREATE TABLE MyTable ADD DISTRIBUTION INTO 4 BUCKETS; ``` Note Add a column to be primary key will change the column's nullability to false implicitly. @@ -447,6 +465,9 @@ ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2); -- drop a watermark ALTER TABLE MyTable DROP WATERMARK; + +-- drop distribution +ALTER TABLE MyTable DROP DISTRIBUTION; ``` ### RENAME diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md index 136468190e413..c03f67878dede 100644 --- a/docs/content/docs/dev/table/sql/create.md +++ b/docs/content/docs/dev/table/sql/create.md @@ -155,6 +155,7 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + [ ] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( )] | AS select_query ] @@ -181,10 +182,16 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name : { - { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } + { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | DISTRIBUTION | PARTITIONS } | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...] +: +{ + DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS] + | DISTRIBUTED INTO n BUCKETS +} + ``` The statement above creates a table with the given name. If a table with the same name already exists @@ -406,6 +413,36 @@ Flink will assume correctness of the primary key by assuming that the columns nu Partition the created table by the specified columns. A directory is created for each partition if this table is used as a filesystem sink. +### `DISTRIBUTED` + +Buckets enable load balancing in an external storage system by splitting data into disjoint subsets. These subsets group rows with potentially "infinite" keyspace into smaller and more manageable chunks that allow for efficient parallel processing. + +Bucketing depends heavily on the semantics of the underlying connector. However, a user can influence the bucketing behavior by specifying the number of buckets, the bucketing algorithm, and (if the algorithm allows it) the columns which are used for target bucket calculation. + +All bucketing components (i.e. bucket number, distribution algorithm, bucket key columns) are +optional from a SQL syntax perspective. + +Given the following SQL statements: + +```sql +-- Example 1 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 4 BUCKETS; + +-- Example 2 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 4 BUCKETS; + +-- Example 3 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid); + +-- Example 4 +CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 4 BUCKETS; +``` + +Example 1 declares a hash function on a fixed number of 4 buckets (i.e. HASH(uid) % 4 = target +bucket). Example 2 leaves the selection of an algorithm up to the connector. Additionally, +Example 3 leaves the number of buckets up to the connector. +In contrast, Example 4 only defines the number of buckets. + ### `WITH` Options Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector. @@ -465,6 +502,7 @@ You can control the merging behavior of: * GENERATED - computed columns * METADATA - metadata columns * OPTIONS - connector options that describe connector and format properties +* DISTRIBUTION - distribution definition * PARTITIONS - partition of the tables * WATERMARKS - watermark declarations