Skip to content

Commit

Permalink
[FLINK-35585] Add documentation for DISTRIBUTED BY (apache#24929)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnh5y authored Jun 24, 2024
1 parent 67d23fc commit f6f8813
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 10 deletions.
4 changes: 4 additions & 0 deletions docs/content.zh/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ Flink 会对工厂类逐个进行检查,确保其“标识符”是全局唯
<td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java" name="SupportsPartitioning" >}}</td>
<td>支持 <code>DynamicTableSink</code> 写入分区数据。</td>
</tr>
<tr>
<td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsBucketing.java" name="SupportsBucketing" >}}</td>
<td>Enables bucketing for a <code>DynamicTableSink</code>.</td>
</tr>
<tr>
<td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java" name="SupportsWritingMetadata" >}}</td>
<td>支持 <code>DynamicTableSink</code> 写入元数据列。Sink 端会在消费数据行时,在最后接受相应的元数据信息并进行持久化,其中包括元数据的格式信息。</td>
Expand Down
29 changes: 25 additions & 4 deletions docs/content.zh/docs/dev/table/sql/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ Flink SQL> DESC CATALOG EXTENDED cat2;
当前支持的 ALTER TABLE 语法如下
```text
ALTER TABLE [IF EXISTS] table_name {
ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...]}
| MODIFY { <schema_component> | (<schema_component> [, ...]) }
| DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...]}
ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...] | <distribution> }
| MODIFY { <schema_component> | (<schema_component> [, ...]) | <distribution> }
| DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...] | DISTRIBUTION }
| RENAME old_column_name TO new_column_name
| RENAME TO new_table_name
| SET (key1=val1, ...)
Expand Down Expand Up @@ -368,14 +368,20 @@ ALTER TABLE [IF EXISTS] table_name {
<partition_component>:
PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)]
<distribution>:
{
DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
| DISTRIBUTION INTO n BUCKETS
}
```

**IF EXISTS**

若表不存在,则不进行任何操作。

### ADD
使用 `ADD` 语句向已有表中增加 [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key),[watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark), [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by)。
使用 `ADD` 语句向已有表中增加 [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key),[watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark), [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by), and a distribution]({{< ref "docs/dev/table/sql/create" >}}#distributed)

向表新增列时可通过 `FIRST` or `AFTER col_name` 指定位置,不指定位置时默认追加在最后。

Expand All @@ -398,6 +404,18 @@ ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1');

-- 新增两个分区
ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2');

-- add new distribution using a hash on uid into 4 buckets
ALTER TABLE MyTable ADD DISTRIBUTION BY HASH(uid) INTO 4 BUCKETS;

-- add new distribution on uid into 4 buckets
CREATE TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 4 BUCKETS;

-- add new distribution on uid.
CREATE TABLE MyTable ADD DISTRIBUTION BY (uid);

-- add new distribution into 4 buckets
CREATE TABLE MyTable ADD DISTRIBUTION INTO 4 BUCKETS;
```
<span class="label label-danger">注意</span> 指定列为主键列时会隐式修改该列的 nullability 为 false。

Expand Down Expand Up @@ -445,6 +463,9 @@ ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2);

-- 删除 watermark
ALTER TABLE MyTable DROP WATERMARK;

-- drop distribution
ALTER TABLE MyTable DROP DISTRIBUTION;
```

### RENAME
Expand Down
40 changes: 39 additions & 1 deletion docs/content.zh/docs/dev/table/sql/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[ <distribution> ]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
Expand All @@ -183,10 +184,16 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | DISTRIBUTION | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
<distribution>:
{
DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
| DISTRIBUTION INTO n BUCKETS
}
```

根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。
Expand Down Expand Up @@ -405,6 +412,36 @@ Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处

根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。

### `DISTRIBUTED`

Buckets enable load balancing in an external storage system by splitting data into disjoint subsets. These subsets group rows with potentially "infinite" keyspace into smaller and more manageable chunks that allow for efficient parallel processing.

Bucketing depends heavily on the semantics of the underlying connector. However, a user can influence the bucketing behavior by specifying the number of buckets, the bucketing algorithm, and (if the algorithm allows it) the columns which are used for target bucket calculation.

All bucketing components (i.e. bucket number, distribution algorithm, bucket key columns) are
optional from a SQL syntax perspective.

Given the following SQL statements:

```sql
-- Example 1
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 4 BUCKETS;

-- Example 2
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 4 BUCKETS;

-- Example 3
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid);

-- Example 4
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 4 BUCKETS;
```

Example 1 declares a hash function on a fixed number of 4 buckets (i.e. HASH(uid) % 4 = target
bucket). Example 2 leaves the selection of an algorithm up to the connector. Additionally,
Example 3 leaves the number of buckets up to the connector.
In contrast, Example 4 only defines the number of buckets.

### `WITH` Options

表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。
Expand Down Expand Up @@ -464,6 +501,7 @@ CREATE TABLE Orders_with_watermark (
* CONSTRAINTS - 主键和唯一键约束
* GENERATED - 计算列
* OPTIONS - 连接器信息、格式化方式等配置项
* DISTRIBUTION - distribution definition
* PARTITIONS - 表分区信息
* WATERMARKS - watermark 定义

Expand Down
4 changes: 4 additions & 0 deletions docs/content/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ the `ParallelismProvider` interface.
<td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java" name="SupportsPartitioning" >}}</td>
<td>Enables to write partitioned data in a <code>DynamicTableSink</code>.</td>
</tr>
<tr>
<td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsBucketing.java" name="SupportsBucketing" >}}</td>
<td>Enables bucketing for a <code>DynamicTableSink</code>.</td>
</tr>
<tr>
<td>{{< gh_link file="flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java" name="SupportsWritingMetadata" >}}</td>
<td>Enables to write metadata columns into a <code>DynamicTableSink</code>. A table sink is
Expand Down
29 changes: 25 additions & 4 deletions docs/content/docs/dev/table/sql/alter.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,9 @@ Flink SQL> DESC CATALOG EXTENDED cat2;
The following grammar gives an overview about the available syntax:
```text
ALTER TABLE [IF EXISTS] table_name {
ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...]}
| MODIFY { <schema_component> | (<schema_component> [, ...]) }
| DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...]}
ADD { <schema_component> | (<schema_component> [, ...]) | [IF NOT EXISTS] <partition_component> [<partition_component> ...] | <distribution> }
| MODIFY { <schema_component> | (<schema_component> [, ...]) | <distribution> }
| DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK | [IF EXISTS] <partition_component> [, ...] | DISTRIBUTION }
| RENAME old_column_name TO new_column_name
| RENAME TO new_table_name
| SET (key1=val1, ...)
Expand Down Expand Up @@ -369,14 +369,20 @@ ALTER TABLE [IF EXISTS] table_name {
<partition_component>:
PARTITION (key1=val1, key2=val2, ...) [WITH (key1=val1, key2=val2, ...)]
<distribution>:
{
DISTRIBUTION BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
| DISTRIBUTION INTO n BUCKETS
}
```

**IF EXISTS**

If the table does not exist, nothing happens.

### ADD
Use `ADD` clause to add [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key), [watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark) and [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by) to an existing table.
Use `ADD` clause to add [columns]({{< ref "docs/dev/table/sql/create" >}}#columns), [constraints]({{< ref "docs/dev/table/sql/create" >}}#primary-key), a [watermark]({{< ref "docs/dev/table/sql/create" >}}#watermark), [partitions]({{< ref "docs/dev/table/sql/create" >}}#partitioned-by), and a [distribution]({{< ref "docs/dev/table/sql/create" >}}#distributed) to an existing table.

To add a column at the specified position, use `FIRST` or `AFTER col_name`. By default, the column is appended at last.

Expand All @@ -399,6 +405,18 @@ ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1');

-- add two new partitions
ALTER TABLE MyTable ADD PARTITION (p1=1,p2='a') with ('k1'='v1') PARTITION (p1=1,p2='b') with ('k2'='v2');

-- add new distribution using a hash on uid into 4 buckets
ALTER TABLE MyTable ADD DISTRIBUTION BY HASH(uid) INTO 4 BUCKETS;

-- add new distribution on uid into 4 buckets
CREATE TABLE MyTable ADD DISTRIBUTION BY (uid) INTO 4 BUCKETS;

-- add new distribution on uid.
CREATE TABLE MyTable ADD DISTRIBUTION BY (uid);

-- add new distribution into 4 buckets
CREATE TABLE MyTable ADD DISTRIBUTION INTO 4 BUCKETS;
```
<span class="label label-danger">Note</span> Add a column to be primary key will change the column's nullability to false implicitly.

Expand Down Expand Up @@ -447,6 +465,9 @@ ALTER TABLE MyTable DROP PARTITION (`id` = 1), PARTITION (`id` = 2);

-- drop a watermark
ALTER TABLE MyTable DROP WATERMARK;

-- drop distribution
ALTER TABLE MyTable DROP DISTRIBUTION;
```

### RENAME
Expand Down
40 changes: 39 additions & 1 deletion docs/content/docs/dev/table/sql/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
[ <distribution> ]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
Expand All @@ -181,10 +182,16 @@ CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | DISTRIBUTION | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
<distribution>:
{
DISTRIBUTED BY [ { HASH | RANGE } ] (bucket_column_name1, bucket_column_name2, ...) ] [INTO n BUCKETS]
| DISTRIBUTED INTO n BUCKETS
}
```

The statement above creates a table with the given name. If a table with the same name already exists
Expand Down Expand Up @@ -406,6 +413,36 @@ Flink will assume correctness of the primary key by assuming that the columns nu

Partition the created table by the specified columns. A directory is created for each partition if this table is used as a filesystem sink.

### `DISTRIBUTED`

Buckets enable load balancing in an external storage system by splitting data into disjoint subsets. These subsets group rows with potentially "infinite" keyspace into smaller and more manageable chunks that allow for efficient parallel processing.

Bucketing depends heavily on the semantics of the underlying connector. However, a user can influence the bucketing behavior by specifying the number of buckets, the bucketing algorithm, and (if the algorithm allows it) the columns which are used for target bucket calculation.

All bucketing components (i.e. bucket number, distribution algorithm, bucket key columns) are
optional from a SQL syntax perspective.

Given the following SQL statements:

```sql
-- Example 1
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY HASH(uid) INTO 4 BUCKETS;

-- Example 2
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid) INTO 4 BUCKETS;

-- Example 3
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED BY (uid);

-- Example 4
CREATE TABLE MyTable (uid BIGINT, name STRING) DISTRIBUTED INTO 4 BUCKETS;
```

Example 1 declares a hash function on a fixed number of 4 buckets (i.e. HASH(uid) % 4 = target
bucket). Example 2 leaves the selection of an algorithm up to the connector. Additionally,
Example 3 leaves the number of buckets up to the connector.
In contrast, Example 4 only defines the number of buckets.

### `WITH` Options

Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector.
Expand Down Expand Up @@ -465,6 +502,7 @@ You can control the merging behavior of:
* GENERATED - computed columns
* METADATA - metadata columns
* OPTIONS - connector options that describe connector and format properties
* DISTRIBUTION - distribution definition
* PARTITIONS - partition of the tables
* WATERMARKS - watermark declarations

Expand Down

0 comments on commit f6f8813

Please sign in to comment.