From e668dee748e73786c70bc20e01f4e3d5ed3e753a Mon Sep 17 00:00:00 2001 From: Kathryn May <44557882+kathancox@users.noreply.github.com> Date: Mon, 5 Aug 2024 17:12:47 -0400 Subject: [PATCH] Update the resolved message docs for more guidance on the configuring checkpoint options (#18738) --- src/current/v23.1/changefeed-messages.md | 37 ++++++++++++++++++++---- src/current/v23.1/create-changefeed.md | 4 +-- src/current/v23.2/changefeed-messages.md | 37 ++++++++++++++++++++---- src/current/v23.2/create-changefeed.md | 4 +-- src/current/v24.1/changefeed-messages.md | 37 ++++++++++++++++++++---- src/current/v24.1/create-changefeed.md | 4 +-- src/current/v24.2/changefeed-messages.md | 37 ++++++++++++++++++++---- src/current/v24.2/create-changefeed.md | 4 +-- 8 files changed, 132 insertions(+), 32 deletions(-) diff --git a/src/current/v23.1/changefeed-messages.md b/src/current/v23.1/changefeed-messages.md index 026550f3391..74313d81139 100644 --- a/src/current/v23.1/changefeed-messages.md +++ b/src/current/v23.1/changefeed-messages.md @@ -346,19 +346,44 @@ In some unusual situations you may receive a delete message for a row without fi ## Resolved messages -When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved-option), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until all [ranges]({% link {{ page.version.version }}/architecture/overview.md %}#range) in the changefeed have progressed to a specific point in time. +When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved-option), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until the changefeed job has reached a [checkpoint]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). When you specify the `resolved` option at changefeed creation, the [job's coordinating node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}) will send the resolved timestamp to each endpoint at the sink. For example, each [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) partition will receive a resolved timestamp message, or a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) will receive a resolved timestamp file. There are three different ways to configure resolved timestamp messages: - If you do not specify the `resolved` option at all, then the changefeed coordinator node will not send resolved timestamp messages. -- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. +- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the changefeed job checkpoints and the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -{{site.data.alerts.callout_info}} -If you require `resolved` message frequency under `30s`, then you **must** set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option to at least the desired `resolved` frequency. This is because `resolved` messages will not be emitted more frequently than `min_checkpoint_frequency`, but may be emitted less frequently. -{{site.data.alerts.end}} + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved; + ~~~ + +- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. For example: + + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved=30s; + ~~~ + +### Resolved timestamp frequency + +The changefeed job's coordinating node will emit resolved timestamp messages once the changefeed has reached a checkpoint. The frequency of the checkpoints determine how often the resolved timestamp messages emit to the sink. To configure how often the changefeed checkpoints, you can set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option and [flush frequency]({% link {{ page.version.version }}/changefeed-sinks.md %}) (if flushing is configurable for your sink). + +The `min_checkpoint_frequency` option controls how often nodes flush their progress to the coordinating node. If you need resolved timestamp messages to emit from the changefeed more frequently than the `30s` default, then you must set `min_checkpoint_frequency` to at least the desired resolved timestamp frequency. For example: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE ... WITH resolved=10s, min_checkpoint_frequency=10s; +~~~ + +When you configure the `min_checkpoint_frequency` and `resolved` options, there can be a tradeoff between changefeed message latency and cluster CPU usage. + +- Lowering these options will cause the changefeed to checkpoint and send resolved timestamp messages more frequently, which can add overhead to CPU usage in the cluster. +- Raising these options will result in the changefeed checkpointing and sending resolved timestamp messages less frequently, which can cause latency in message delivery to the sink. + +For example, you can set `min_checkpoint_frequency` and `resolved` to `0s` so that the changefeed job checkpoints as frequently as possible and messages are sent immediately followed by the resolved timestamp. However, the frequent checkpointing will increase CPU usage in the cluster. If your application can tolerate a longer duration than `0s` between checkpoints, this will help to reduce the overhead on the cluster. ## Duplicate messages diff --git a/src/current/v23.1/create-changefeed.md b/src/current/v23.1/create-changefeed.md index 18a71340a09..b345e05665d 100644 --- a/src/current/v23.1/create-changefeed.md +++ b/src/current/v23.1/create-changefeed.md @@ -174,12 +174,12 @@ Option | Value | Description New in v23.1: `key_column` | `'column'` | Override the key used in [message metadata]({% link {{ page.version.version }}/changefeed-messages.md %}). This changes the key hashed to determine downstream partitions. In sinks that support partitioning by message, CockroachDB uses the [32-bit FNV-1a](https://wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function) hashing algorithm to determine which partition to send to.

**Note:** `key_column` does not preserve ordering of messages from CockroachDB to the downstream sink, therefore you must also include the [`unordered`](#unordered) option in your changefeed creation statement. It does not affect per-key [ordering guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) or the output of [`key_in_value`](#key-in-value).

See the [Define a key to determine the changefeed sink partition](#define-a-key-to-determine-the-changefeed-sink-partition) example. `key_in_value` | N/A | Add a primary key array to the emitted message. This makes the [primary key]({% link {{ page.version.version }}/primary-key.md %}) of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). `key_in_value` is automatically used for [cloud storage sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink), [webhook sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), and [GC Pub/Sub sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#google-cloud-pub-sub). `metrics_label` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated.

The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels.

`WITH metrics_label=label_name`

For more detail on usage and considerations, see [Using changefeed metrics labels]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#using-changefeed-metrics-labels). -`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.

**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.

**Default:** `30s` +`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. However, more frequent checkpointing can increase CPU usage. If this is set to `0s`, a node will flush messages as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit [duplicate messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) during this time.

**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). If you require `resolved` messages more frequently than `30s`, you must configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency. For more details, refer to [Resolved message frequency]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-timestamp-frequency).

**Default:** `30s` `mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. `on_error` | `pause` / `fail` | Use `on_error=pause` to pause the changefeed when encountering **non**-retryable errors. `on_error=pause` will pause the changefeed instead of sending it into a terminal failure state. **Note:** Retryable errors will continue to be retried with this option specified.

Use with [`protect_data_from_gc_on_pause`](#protect-pause) to protect changes from [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds).

If a changefeed with `on_error=pause` is running when a watched table is [truncated]({% link {{ page.version.version }}/truncate.md %}), the changefeed will pause but will not be able to resume reads from that table. Using [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) to drop the table from the changefeed and then [resuming the job]({% link {{ page.version.version }}/resume-job.md %}) will work, but you cannot add the same table to the changefeed again. Instead, you will need to [create a new changefeed](#start-a-new-changefeed-where-another-ended) for that table.

Default: `on_error=fail` `protect_data_from_gc_on_pause` | N/A | When a [changefeed is paused]({% link {{ page.version.version }}/pause-job.md %}), ensure that the data needed to [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}) is not garbage collected. If `protect_data_from_gc_on_pause` is **unset**, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and adding `protect_data_from_gc_on_pause` to a changefeed will not protect data if the [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) window has already passed.

Use with [`on_error=pause`](#on-error) to protect changes from garbage collection when encountering non-retryable errors.

See [Garbage collection and changefeeds]({% link {{ page.version.version }}/changefeed-messages.md %}#garbage-collection-and-changefeeds) for more detail on protecting changefeed data.

**Note:** If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the [`gc_protect_expires_after`](#gc-protect-expire) option to set a limit for protected data and for how long a changefeed will remain paused. New in v23.1: `pubsub_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure sink batching and retries. The schema is as follows:

`{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }`.

**Note** that if either `Messages` or `Bytes` are nonzero, then a non-zero value for `Frequency` must be provided.

Refer to [Pub/Sub sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#pub-sub-sink-configuration) for more details on using this option. -`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. +`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until the changefeed job has reached a checkpoint.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. `schema_change_events` | `default` / `column_changes` | The type of schema change event that triggers the behavior specified by the `schema_change_policy` option:
Default: `schema_change_events=default` `schema_change_policy` | `backfill` / `nobackfill` / `stop` | The behavior to take when an event specified by the `schema_change_events` option occurs:
Default: `schema_change_policy=backfill` `split_column_families` | N/A | Use this option to create a changefeed on a table with multiple [column families]({% link {{ page.version.version }}/column-families.md %}). The changefeed will emit messages for each of the table's column families. See [Changefeeds on tables with column families]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}) for more usage detail. diff --git a/src/current/v23.2/changefeed-messages.md b/src/current/v23.2/changefeed-messages.md index cf2e199a9c2..2e935e964eb 100644 --- a/src/current/v23.2/changefeed-messages.md +++ b/src/current/v23.2/changefeed-messages.md @@ -344,19 +344,44 @@ In some unusual situations you may receive a delete message for a row without fi ## Resolved messages -When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved-option), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until all [ranges]({% link {{ page.version.version }}/architecture/overview.md %}#range) in the changefeed have progressed to a specific point in time. +When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved-option), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until the changefeed job has reached a [checkpoint]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). When you specify the `resolved` option at changefeed creation, the [job's coordinating node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}) will send the resolved timestamp to each endpoint at the sink. For example, each [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) partition will receive a resolved timestamp message, or a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) will receive a resolved timestamp file. There are three different ways to configure resolved timestamp messages: - If you do not specify the `resolved` option at all, then the changefeed coordinator node will not send resolved timestamp messages. -- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. +- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the changefeed job checkpoints and the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -{{site.data.alerts.callout_info}} -If you require `resolved` message frequency under `30s`, then you **must** set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option to at least the desired `resolved` frequency. This is because `resolved` messages will not be emitted more frequently than `min_checkpoint_frequency`, but may be emitted less frequently. -{{site.data.alerts.end}} + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved; + ~~~ + +- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. For example: + + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved=30s; + ~~~ + +### Resolved timestamp frequency + +The changefeed job's coordinating node will emit resolved timestamp messages once the changefeed has reached a checkpoint. The frequency of the checkpoints determine how often the resolved timestamp messages emit to the sink. To configure how often the changefeed checkpoints, you can set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option and [flush frequency]({% link {{ page.version.version }}/changefeed-sinks.md %}) (if flushing is configurable for your sink). + +The `min_checkpoint_frequency` option controls how often nodes flush their progress to the coordinating node. If you need resolved timestamp messages to emit from the changefeed more frequently than the `30s` default, then you must set `min_checkpoint_frequency` to at least the desired resolved timestamp frequency. For example: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE ... WITH resolved=10s, min_checkpoint_frequency=10s; +~~~ + +When you configure the `min_checkpoint_frequency` and `resolved` options, there can be a tradeoff between changefeed message latency and cluster CPU usage. + +- Lowering these options will cause the changefeed to checkpoint and send resolved timestamp messages more frequently, which can add overhead to CPU usage in the cluster. +- Raising these options will result in the changefeed checkpointing and sending resolved timestamp messages less frequently, which can cause latency in message delivery to the sink. + +For example, you can set `min_checkpoint_frequency` and `resolved` to `0s` so that the changefeed job checkpoints as frequently as possible and messages are sent immediately followed by the resolved timestamp. However, the frequent checkpointing will increase CPU usage in the cluster. If your application can tolerate a longer duration than `0s` between checkpoints, this will help to reduce the overhead on the cluster. ## Duplicate messages diff --git a/src/current/v23.2/create-changefeed.md b/src/current/v23.2/create-changefeed.md index 4be886227eb..60dd556a568 100644 --- a/src/current/v23.2/create-changefeed.md +++ b/src/current/v23.2/create-changefeed.md @@ -180,12 +180,12 @@ Option | Value | Description New in v23.2: `lagging_ranges_threshold` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set a duration from the present that determines the length of time a range is considered to be lagging behind, which will then track in the [`lagging_ranges`]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#lagging-ranges-metric) metric. Note that ranges undergoing an [initial scan](#initial-scan) for longer than the threshold duration are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. As ranges complete the initial scan, the number of ranges lagging behind will decrease.

**Default:** `3m` New in v23.2: `lagging_ranges_polling_interval` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set the interval rate for when lagging ranges are checked and the `lagging_ranges` metric is updated. Polling adds latency to the `lagging_ranges` metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.

**Default:** `1m` `metrics_label` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated.

The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels.

`WITH metrics_label=label_name`

For more detail on usage and considerations, see [Using changefeed metrics labels]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#using-changefeed-metrics-labels). -`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.

**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.

**Default:** `30s` +`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. However, more frequent checkpointing can increase CPU usage. If this is set to `0s`, a node will flush messages as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit [duplicate messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) during this time.

**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). If you require `resolved` messages more frequently than `30s`, you must configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency. For more details, refer to [Resolved message frequency]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-timestamp-frequency).

**Default:** `30s` `mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. `on_error` | `pause` / `fail` | Use `on_error=pause` to pause the changefeed when encountering **non**-retryable errors. `on_error=pause` will pause the changefeed instead of sending it into a terminal failure state. **Note:** Retryable errors will continue to be retried with this option specified.

Use with [`protect_data_from_gc_on_pause`](#protect-pause) to protect changes from [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds).

If a changefeed with `on_error=pause` is running when a watched table is [truncated]({% link {{ page.version.version }}/truncate.md %}), the changefeed will pause but will not be able to resume reads from that table. Using [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) to drop the table from the changefeed and then [resuming the job]({% link {{ page.version.version }}/resume-job.md %}) will work, but you cannot add the same table to the changefeed again. Instead, you will need to [create a new changefeed](#start-a-new-changefeed-where-another-ended) for that table.

Default: `on_error=fail` `protect_data_from_gc_on_pause` | N/A | This option is deprecated as of v23.2 and will be removed in a future release.

When a [changefeed is paused]({% link {{ page.version.version }}/pause-job.md %}), ensure that the data needed to [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}) is not garbage collected. If `protect_data_from_gc_on_pause` is **unset**, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and adding `protect_data_from_gc_on_pause` to a changefeed will not protect data if the [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) window has already passed.

Use with [`on_error=pause`](#on-error) to protect changes from garbage collection when encountering non-retryable errors.

Refer to [Protect Changefeed Data from Garbage Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.

**Note:** If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the [`gc_protect_expires_after`](#gc-protect-expire) option to set a limit for protected data and for how long a changefeed will remain paused. `pubsub_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure sink batching and retries. The schema is as follows:

`{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }`.

**Note** that if either `Messages` or `Bytes` are nonzero, then a non-zero value for `Frequency` must be provided.

Refer to [Pub/Sub sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#pub-sub-sink-configuration) for more details on using this option. -`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. +`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until the changefeed job has reached a checkpoint.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. `schema_change_events` | `default` / `column_changes` | The type of schema change event that triggers the behavior specified by the `schema_change_policy` option:
Default: `schema_change_events=default` `schema_change_policy` | `backfill` / `nobackfill` / `stop` | The behavior to take when an event specified by the `schema_change_events` option occurs:
Default: `schema_change_policy=backfill` `split_column_families` | N/A | Use this option to create a changefeed on a table with multiple [column families]({% link {{ page.version.version }}/column-families.md %}). The changefeed will emit messages for each of the table's column families. See [Changefeeds on tables with column families]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}) for more usage detail. diff --git a/src/current/v24.1/changefeed-messages.md b/src/current/v24.1/changefeed-messages.md index 0df7faf4cfc..531f350a954 100644 --- a/src/current/v24.1/changefeed-messages.md +++ b/src/current/v24.1/changefeed-messages.md @@ -344,19 +344,44 @@ In some unusual situations you may receive a delete message for a row without fi ## Resolved messages -When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until all [ranges]({% link {{ page.version.version }}/architecture/overview.md %}#range) in the changefeed have progressed to a specific point in time. +When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until the changefeed job has reached a [checkpoint]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). When you specify the `resolved` option at changefeed creation, the [job's coordinating node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}) will send the resolved timestamp to each endpoint at the sink. For example, each [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) partition will receive a resolved timestamp message, or a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) will receive a resolved timestamp file. There are three different ways to configure resolved timestamp messages: - If you do not specify the `resolved` option at all, then the changefeed coordinator node will not send resolved timestamp messages. -- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. +- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the changefeed job checkpoints and the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -{{site.data.alerts.callout_info}} -If you require `resolved` message frequency under `30s`, then you **must** set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option to at least the desired `resolved` frequency. This is because `resolved` messages will not be emitted more frequently than `min_checkpoint_frequency`, but may be emitted less frequently. -{{site.data.alerts.end}} + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved; + ~~~ + +- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. For example: + + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved=30s; + ~~~ + +### Resolved timestamp frequency + +The changefeed job's coordinating node will emit resolved timestamp messages once the changefeed has reached a checkpoint. The frequency of the checkpoints determine how often the resolved timestamp messages emit to the sink. To configure how often the changefeed checkpoints, you can set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option and [flush frequency]({% link {{ page.version.version }}/changefeed-sinks.md %}) (if flushing is configurable for your sink). + +The `min_checkpoint_frequency` option controls how often nodes flush their progress to the coordinating node. If you need resolved timestamp messages to emit from the changefeed more frequently than the `30s` default, then you must set `min_checkpoint_frequency` to at least the desired resolved timestamp frequency. For example: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE ... WITH resolved=10s, min_checkpoint_frequency=10s; +~~~ + +When you configure the `min_checkpoint_frequency` and `resolved` options, there can be a tradeoff between changefeed message latency and cluster CPU usage. + +- Lowering these options will cause the changefeed to checkpoint and send resolved timestamp messages more frequently, which can add overhead to CPU usage in the cluster. +- Raising these options will result in the changefeed checkpointing and sending resolved timestamp messages less frequently, which can cause latency in message delivery to the sink. + +For example, you can set `min_checkpoint_frequency` and `resolved` to `0s` so that the changefeed job checkpoints as frequently as possible and messages are sent immediately followed by the resolved timestamp. However, the frequent checkpointing will increase CPU usage in the cluster. If your application can tolerate a longer duration than `0s` between checkpoints, this will help to reduce the overhead on the cluster. ## Duplicate messages diff --git a/src/current/v24.1/create-changefeed.md b/src/current/v24.1/create-changefeed.md index 79ac076abad..41df691fe90 100644 --- a/src/current/v24.1/create-changefeed.md +++ b/src/current/v24.1/create-changefeed.md @@ -198,12 +198,12 @@ Option | Value | Description `lagging_ranges_threshold` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set a duration from the present that determines the length of time a range is considered to be lagging behind, which will then track in the [`lagging_ranges`]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#lagging-ranges-metric) metric. Note that ranges undergoing an [initial scan](#initial-scan) for longer than the threshold duration are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. As ranges complete the initial scan, the number of ranges lagging behind will decrease.

**Default:** `3m` `lagging_ranges_polling_interval` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set the interval rate for when lagging ranges are checked and the `lagging_ranges` metric is updated. Polling adds latency to the `lagging_ranges` metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.

**Default:** `1m` `metrics_label` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated.

The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels.

`WITH metrics_label=label_name`

For more detail on usage and considerations, see [Using changefeed metrics labels]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#using-changefeed-metrics-labels). -`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.

**Note:** [`resolved`](#resolved) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.

**Default:** `30s` +`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. However, more frequent checkpointing can increase CPU usage. If this is set to `0s`, a node will flush messages as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit [duplicate messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) during this time.

**Note:** [`resolved`](#resolved) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). If you require `resolved` messages more frequently than `30s`, you must configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency. For more details, refer to [Resolved message frequency]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-timestamp-frequency).

**Default:** `30s` `mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. `on_error` | `pause` / `fail` | Use `on_error=pause` to pause the changefeed when encountering **non**-retryable errors. `on_error=pause` will pause the changefeed instead of sending it into a terminal failure state. **Note:** Retryable errors will continue to be retried with this option specified.

Use with [`protect_data_from_gc_on_pause`](#protect-data-from-gc-on-pause) to protect changes from [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds).

If a changefeed with `on_error=pause` is running when a watched table is [truncated]({% link {{ page.version.version }}/truncate.md %}), the changefeed will pause but will not be able to resume reads from that table. Using [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) to drop the table from the changefeed and then [resuming the job]({% link {{ page.version.version }}/resume-job.md %}) will work, but you cannot add the same table to the changefeed again. Instead, you will need to [create a new changefeed](#start-a-new-changefeed-where-another-ended) for that table.

Default: `on_error=fail` `protect_data_from_gc_on_pause` | N/A | This option is deprecated as of v23.2 and will be removed in a future release.

When a [changefeed is paused]({% link {{ page.version.version }}/pause-job.md %}), ensure that the data needed to [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}) is not garbage collected. If `protect_data_from_gc_on_pause` is **unset**, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and adding `protect_data_from_gc_on_pause` to a changefeed will not protect data if the [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) window has already passed.

Use with [`on_error=pause`](#on-error) to protect changes from garbage collection when encountering non-retryable errors.

Refer to [Protect Changefeed Data from Garbage Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.

**Note:** If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the [`gc_protect_expires_after`](#gc-protect-expires-after) option to set a limit for protected data and for how long a changefeed will remain paused. `pubsub_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure sink batching and retries. The schema is as follows:

`{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }`.

**Note** that if either `Messages` or `Bytes` are nonzero, then a non-zero value for `Frequency` must be provided.

Refer to [Pub/Sub sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#pub-sub-sink-configuration) for more details on using this option. -`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. +`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until the changefeed job has reached a checkpoint.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. `schema_change_events` | `default` / `column_changes` | The type of schema change event that triggers the behavior specified by the `schema_change_policy` option:
Default: `schema_change_events=default` `schema_change_policy` | `backfill` / `nobackfill` / `stop` | The behavior to take when an event specified by the `schema_change_events` option occurs:
Default: `schema_change_policy=backfill` `split_column_families` | N/A | Use this option to create a changefeed on a table with multiple [column families]({% link {{ page.version.version }}/column-families.md %}). The changefeed will emit messages for each of the table's column families. See [Changefeeds on tables with column families]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}) for more usage detail. diff --git a/src/current/v24.2/changefeed-messages.md b/src/current/v24.2/changefeed-messages.md index 2cf1057d22a..a1a851b6c1e 100644 --- a/src/current/v24.2/changefeed-messages.md +++ b/src/current/v24.2/changefeed-messages.md @@ -344,19 +344,44 @@ In some unusual situations you may receive a delete message for a row without fi ## Resolved messages -When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until all [ranges]({% link {{ page.version.version }}/architecture/overview.md %}#range) in the changefeed have progressed to a specific point in time. +When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until the changefeed job has reached a [checkpoint]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). When you specify the `resolved` option at changefeed creation, the [job's coordinating node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}) will send the resolved timestamp to each endpoint at the sink. For example, each [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) partition will receive a resolved timestamp message, or a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) will receive a resolved timestamp file. There are three different ways to configure resolved timestamp messages: - If you do not specify the `resolved` option at all, then the changefeed coordinator node will not send resolved timestamp messages. -- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. +- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the changefeed job checkpoints and the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away. -{{site.data.alerts.callout_info}} -If you require `resolved` message frequency under `30s`, then you **must** set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option to at least the desired `resolved` frequency. This is because `resolved` messages will not be emitted more frequently than `min_checkpoint_frequency`, but may be emitted less frequently. -{{site.data.alerts.end}} + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved; + ~~~ + +- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. For example: + + {% include_cached copy-clipboard.html %} + ~~~ sql + CREATE CHANGEFEED FOR TABLE ... WITH resolved=30s; + ~~~ + +### Resolved timestamp frequency + +The changefeed job's coordinating node will emit resolved timestamp messages once the changefeed has reached a checkpoint. The frequency of the checkpoints determine how often the resolved timestamp messages emit to the sink. To configure how often the changefeed checkpoints, you can set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option and [flush frequency]({% link {{ page.version.version }}/changefeed-sinks.md %}) (if flushing is configurable for your sink). + +The `min_checkpoint_frequency` option controls how often nodes flush their progress to the coordinating node. If you need resolved timestamp messages to emit from the changefeed more frequently than the `30s` default, then you must set `min_checkpoint_frequency` to at least the desired resolved timestamp frequency. For example: + +{% include_cached copy-clipboard.html %} +~~~ sql +CREATE CHANGEFEED FOR TABLE ... WITH resolved=10s, min_checkpoint_frequency=10s; +~~~ + +When you configure the `min_checkpoint_frequency` and `resolved` options, there can be a tradeoff between changefeed message latency and cluster CPU usage. + +- Lowering these options will cause the changefeed to checkpoint and send resolved timestamp messages more frequently, which can add overhead to CPU usage in the cluster. +- Raising these options will result in the changefeed checkpointing and sending resolved timestamp messages less frequently, which can cause latency in message delivery to the sink. + +For example, you can set `min_checkpoint_frequency` and `resolved` to `0s` so that the changefeed job checkpoints as frequently as possible and messages are sent immediately followed by the resolved timestamp. However, the frequent checkpointing will increase CPU usage in the cluster. If your application can tolerate a longer duration than `0s` between checkpoints, this will help to reduce the overhead on the cluster. ## Duplicate messages diff --git a/src/current/v24.2/create-changefeed.md b/src/current/v24.2/create-changefeed.md index 80ed80f391c..8700bc37ed2 100644 --- a/src/current/v24.2/create-changefeed.md +++ b/src/current/v24.2/create-changefeed.md @@ -199,12 +199,12 @@ Option | Value | Description `lagging_ranges_threshold` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set a duration from the present that determines the length of time a range is considered to be lagging behind, which will then track in the [`lagging_ranges`]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#lagging-ranges-metric) metric. Note that ranges undergoing an [initial scan](#initial-scan) for longer than the threshold duration are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. As ranges complete the initial scan, the number of ranges lagging behind will decrease.

**Default:** `3m` `lagging_ranges_polling_interval` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Set the interval rate for when lagging ranges are checked and the `lagging_ranges` metric is updated. Polling adds latency to the `lagging_ranges` metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.

**Default:** `1m` `metrics_label` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated.

The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels.

`WITH metrics_label=label_name`

For more detail on usage and considerations, see [Using changefeed metrics labels]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#using-changefeed-metrics-labels). -`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.

**Note:** [`resolved`](#resolved) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.

**Default:** `30s` +`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. However, more frequent checkpointing can increase CPU usage. If this is set to `0s`, a node will flush messages as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit [duplicate messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) during this time.

**Note:** [`resolved`](#resolved) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). If you require `resolved` messages more frequently than `30s`, you must configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency. For more details, refer to [Resolved message frequency]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-timestamp-frequency).

**Default:** `30s` `mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. `on_error` | `pause` / `fail` | Use `on_error=pause` to pause the changefeed when encountering **non**-retryable errors. `on_error=pause` will pause the changefeed instead of sending it into a terminal failure state. **Note:** Retryable errors will continue to be retried with this option specified.

Use with [`protect_data_from_gc_on_pause`](#protect-data-from-gc-on-pause) to protect changes from [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds).

If a changefeed with `on_error=pause` is running when a watched table is [truncated]({% link {{ page.version.version }}/truncate.md %}), the changefeed will pause but will not be able to resume reads from that table. Using [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) to drop the table from the changefeed and then [resuming the job]({% link {{ page.version.version }}/resume-job.md %}) will work, but you cannot add the same table to the changefeed again. Instead, you will need to [create a new changefeed](#start-a-new-changefeed-where-another-ended) for that table.

Default: `on_error=fail` `protect_data_from_gc_on_pause` | N/A | This option is deprecated as of v23.2 and will be removed in a future release.

When a [changefeed is paused]({% link {{ page.version.version }}/pause-job.md %}), ensure that the data needed to [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}) is not garbage collected. If `protect_data_from_gc_on_pause` is **unset**, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and adding `protect_data_from_gc_on_pause` to a changefeed will not protect data if the [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) window has already passed.

Use with [`on_error=pause`](#on-error) to protect changes from garbage collection when encountering non-retryable errors.

Refer to [Protect Changefeed Data from Garbage Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.

**Note:** If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the [`gc_protect_expires_after`](#gc-protect-expires-after) option to set a limit for protected data and for how long a changefeed will remain paused. `pubsub_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure sink batching and retries. The schema is as follows:

`{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }`.

**Note** that if either `Messages` or `Bytes` are nonzero, then a non-zero value for `Frequency` must be provided.

Refer to [Pub/Sub sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#pub-sub-sink-configuration) for more details on using this option. -`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. +`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until the changefeed job has reached a checkpoint.

Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.

**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.

Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail. `schema_change_events` | `default` / `column_changes` | The type of schema change event that triggers the behavior specified by the `schema_change_policy` option:
Default: `schema_change_events=default` `schema_change_policy` | `backfill` / `nobackfill` / `stop` | The behavior to take when an event specified by the `schema_change_events` option occurs:
Default: `schema_change_policy=backfill` `split_column_families` | N/A | Use this option to create a changefeed on a table with multiple [column families]({% link {{ page.version.version }}/column-families.md %}). The changefeed will emit messages for each of the table's column families. See [Changefeeds on tables with column families]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}) for more usage detail.