Skip to content

Commit

Permalink
Update the resolved message docs for more guidance on the configuring…
Browse files Browse the repository at this point in the history
… checkpoint options (#18738)
  • Loading branch information
kathancox authored Aug 5, 2024
1 parent 9cf620d commit e668dee
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 32 deletions.
37 changes: 31 additions & 6 deletions src/current/v23.1/changefeed-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,19 +346,44 @@ In some unusual situations you may receive a delete message for a row without fi
## Resolved messages
When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved-option), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until all [ranges]({% link {{ page.version.version }}/architecture/overview.md %}#range) in the changefeed have progressed to a specific point in time.
When you create a changefeed with the [`resolved` option]({% link {{ page.version.version }}/create-changefeed.md %}#resolved-option), the changefeed will emit resolved timestamp messages in a format dependent on the connected [sink]({% link {{ page.version.version }}/changefeed-sinks.md %}). The resolved timestamp is the high-water mark that guarantees that no previously unseen rows with an [earlier update timestamp](#ordering-and-delivery-guarantees) will be emitted to the sink. That is, resolved timestamp messages do not emit until the changefeed job has reached a [checkpoint]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}).
When you specify the `resolved` option at changefeed creation, the [job's coordinating node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}) will send the resolved timestamp to each endpoint at the sink. For example, each [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) partition will receive a resolved timestamp message, or a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) will receive a resolved timestamp file.

There are three different ways to configure resolved timestamp messages:

- If you do not specify the `resolved` option at all, then the changefeed coordinator node will not send resolved timestamp messages.
- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away.
- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed.
- If you include `WITH resolved` in your changefeed creation statement **without** specifying a value, the coordinator node will emit resolved timestamps as the changefeed job checkpoints and the high-water mark advances. Note that new Kafka partitions may not receive resolved messages right away.

{{site.data.alerts.callout_info}}
If you require `resolved` message frequency under `30s`, then you **must** set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option to at least the desired `resolved` frequency. This is because `resolved` messages will not be emitted more frequently than `min_checkpoint_frequency`, but may be emitted less frequently.
{{site.data.alerts.end}}
{% include_cached copy-clipboard.html %}
~~~ sql
CREATE CHANGEFEED FOR TABLE ... WITH resolved;
~~~

- If you specify a duration like `WITH resolved={duration}`, the changefeed will use it as the minimum duration between `resolved` messages that the changefeed coordinator sends. The changefeed will only emit a resolved timestamp message if the timestamp has advanced and at least the optional duration has elapsed. For example:

{% include_cached copy-clipboard.html %}
~~~ sql
CREATE CHANGEFEED FOR TABLE ... WITH resolved=30s;
~~~

### Resolved timestamp frequency

The changefeed job's coordinating node will emit resolved timestamp messages once the changefeed has reached a checkpoint. The frequency of the checkpoints determine how often the resolved timestamp messages emit to the sink. To configure how often the changefeed checkpoints, you can set the [`min_checkpoint_frequency`]({% link {{ page.version.version }}/create-changefeed.md %}#min-checkpoint-frequency) option and [flush frequency]({% link {{ page.version.version }}/changefeed-sinks.md %}) (if flushing is configurable for your sink).
The `min_checkpoint_frequency` option controls how often nodes flush their progress to the coordinating node. If you need resolved timestamp messages to emit from the changefeed more frequently than the `30s` default, then you must set `min_checkpoint_frequency` to at least the desired resolved timestamp frequency. For example:
{% include_cached copy-clipboard.html %}
~~~ sql
CREATE CHANGEFEED FOR TABLE ... WITH resolved=10s, min_checkpoint_frequency=10s;
~~~
When you configure the `min_checkpoint_frequency` and `resolved` options, there can be a tradeoff between changefeed message latency and cluster CPU usage.
- Lowering these options will cause the changefeed to checkpoint and send resolved timestamp messages more frequently, which can add overhead to CPU usage in the cluster.
- Raising these options will result in the changefeed checkpointing and sending resolved timestamp messages less frequently, which can cause latency in message delivery to the sink.
For example, you can set `min_checkpoint_frequency` and `resolved` to `0s` so that the changefeed job checkpoints as frequently as possible and messages are sent immediately followed by the resolved timestamp. However, the frequent checkpointing will increase CPU usage in the cluster. If your application can tolerate a longer duration than `0s` between checkpoints, this will help to reduce the overhead on the cluster.
## Duplicate messages
Expand Down
4 changes: 2 additions & 2 deletions src/current/v23.1/create-changefeed.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ Option | Value | Description
<span class="version-tag">New in v23.1:</span> <a name="key-column"></a>`key_column` | `'column'` | Override the key used in [message metadata]({% link {{ page.version.version }}/changefeed-messages.md %}). This changes the key hashed to determine downstream partitions. In sinks that support partitioning by message, CockroachDB uses the [32-bit FNV-1a](https://wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function) hashing algorithm to determine which partition to send to.<br><br>**Note:** `key_column` does not preserve ordering of messages from CockroachDB to the downstream sink, therefore you must also include the [`unordered`](#unordered) option in your changefeed creation statement. It does not affect per-key [ordering guarantees]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) or the output of [`key_in_value`](#key-in-value).<br><br>See the [Define a key to determine the changefeed sink partition](#define-a-key-to-determine-the-changefeed-sink-partition) example.
<a name="key-in-value"></a>`key_in_value` | N/A | Add a primary key array to the emitted message. This makes the [primary key]({% link {{ page.version.version }}/primary-key.md %}) of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). `key_in_value` is automatically used for [cloud storage sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink), [webhook sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), and [GC Pub/Sub sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#google-cloud-pub-sub).
`metrics_label` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated.<br><br>The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels.<br><br>`WITH metrics_label=label_name` <br><br>For more detail on usage and considerations, see [Using changefeed metrics labels]({% link {{ page.version.version }}/monitor-and-debug-changefeeds.md %}#using-changefeed-metrics-labels).
<a name="min-checkpoint-frequency"></a>`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. If this is set to `0s`, a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time. <br><br>**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). Since `min_checkpoint_frequency` defaults to `30s`, you **must** configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency if you require `resolved` messages more frequently than `30s`.<br><br>**Default:** `30s`
<a name="min-checkpoint-frequency"></a>`min_checkpoint_frequency` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Controls how often nodes flush their progress to the [coordinating changefeed node]({% link {{ page.version.version }}/how-does-an-enterprise-changefeed-work.md %}). Changefeeds will wait for at least the specified duration before a flush to the sink. This can help you control the flush frequency of higher latency sinks to achieve better throughput. However, more frequent checkpointing can increase CPU usage. If this is set to `0s`, a node will flush messages as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then `min_checkpoint_frequency` is the amount of time that changefeed will need to catch up. That is, it could emit [duplicate messages]({% link {{ page.version.version }}/changefeed-messages.md %}#duplicate-messages) during this time. <br><br>**Note:** [`resolved`](#resolved-option) messages will not be emitted more frequently than the configured `min_checkpoint_frequency` (but may be emitted less frequently). If you require `resolved` messages more frequently than `30s`, you must configure `min_checkpoint_frequency` to at least the desired `resolved` message frequency. For more details, refer to [Resolved message frequency]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-timestamp-frequency).<br><br>**Default:** `30s`
`mvcc_timestamp` | N/A | Include the [MVCC]({% link {{ page.version.version }}/architecture/storage-layer.md %}#mvcc) timestamp for each emitted row in a changefeed. With the `mvcc_timestamp` option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill.
<a name="on-error"></a>`on_error` | `pause` / `fail` | Use `on_error=pause` to pause the changefeed when encountering **non**-retryable errors. `on_error=pause` will pause the changefeed instead of sending it into a terminal failure state. **Note:** Retryable errors will continue to be retried with this option specified. <br><br>Use with [`protect_data_from_gc_on_pause`](#protect-pause) to protect changes from [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds).<br><br>If a changefeed with `on_error=pause` is running when a watched table is [truncated]({% link {{ page.version.version }}/truncate.md %}), the changefeed will pause but will not be able to resume reads from that table. Using [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) to drop the table from the changefeed and then [resuming the job]({% link {{ page.version.version }}/resume-job.md %}) will work, but you cannot add the same table to the changefeed again. Instead, you will need to [create a new changefeed](#start-a-new-changefeed-where-another-ended) for that table.<br><br>Default: `on_error=fail`
<a name="protect-pause"></a>`protect_data_from_gc_on_pause` | N/A | When a [changefeed is paused]({% link {{ page.version.version }}/pause-job.md %}), ensure that the data needed to [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}) is not garbage collected. If `protect_data_from_gc_on_pause` is **unset**, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and adding `protect_data_from_gc_on_pause` to a changefeed will not protect data if the [garbage collection]({% link {{ page.version.version }}/configure-replication-zones.md %}#gc-ttlseconds) window has already passed. <br><br>Use with [`on_error=pause`](#on-error) to protect changes from garbage collection when encountering non-retryable errors. <br><br>See [Garbage collection and changefeeds]({% link {{ page.version.version }}/changefeed-messages.md %}#garbage-collection-and-changefeeds) for more detail on protecting changefeed data.<br><br>**Note:** If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the [`gc_protect_expires_after`](#gc-protect-expire) option to set a limit for protected data and for how long a changefeed will remain paused.
<span class="version-tag">New in v23.1:</span> `pubsub_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure sink batching and retries. The schema is as follows:<br><br> `{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }`. <br><br>**Note** that if either `Messages` or `Bytes` are nonzero, then a non-zero value for `Frequency` must be provided. <br><br>Refer to [Pub/Sub sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#pub-sub-sink-configuration) for more details on using this option.
<a name="resolved-option"></a>`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time. <br><br>Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.<br><br>**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.<br><br>Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail.
<a name="resolved-option"></a>`resolved` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Emits [resolved timestamp]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until the changefeed job has reached a checkpoint. <br><br>Set an optional minimal duration between emitting resolved timestamps. Example: `resolved='10s'`. This option will **only** emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.<br><br>**Note:** If you set `resolved` lower than `30s`, then you **must** also set [`min_checkpoint_frequency`](#min-checkpoint-frequency) to at minimum the same value as `resolved`, because `resolved` messages may be emitted less frequently than `min_checkpoint_frequency`, but cannot be emitted more frequently.<br><br>Refer to [Resolved messages]({% link {{ page.version.version }}/changefeed-messages.md %}#resolved-messages) for more detail.
<a name="schema-events"></a>`schema_change_events` | `default` / `column_changes` | The type of schema change event that triggers the behavior specified by the `schema_change_policy` option:<ul><li>`default`: Include all [`ADD COLUMN`]({% link {{ page.version.version }}/alter-table.md %}#add-column) events for columns that have a non-`NULL` [`DEFAULT` value]({% link {{ page.version.version }}/default-value.md %}) or are [computed]({% link {{ page.version.version }}/computed-columns.md %}), and all [`DROP COLUMN`]({% link {{ page.version.version }}/alter-table.md %}#drop-column) events.</li><li>`column_changes`: Include all schema change events that add or remove any column.</li></ul><br>Default: `schema_change_events=default`
<a name="schema-policy"></a>`schema_change_policy` | `backfill` / `nobackfill` / `stop` | The behavior to take when an event specified by the `schema_change_events` option occurs:<ul><li>`backfill`: When [schema changes with column backfill]({% link {{ page.version.version }}/changefeed-messages.md %}#schema-changes-with-column-backfill) are finished, output all watched rows using the new schema.</li><li>`nobackfill`: For [schema changes with column backfill]({% link {{ page.version.version }}/changefeed-messages.md %}#schema-changes-with-column-backfill), perform no logical backfills. The changefeed will not emit any messages about the schema change. </li><li>`stop`: For [schema changes with column backfill]({% link {{ page.version.version }}/changefeed-messages.md %}#schema-changes-with-column-backfill), wait for all data preceding the schema change to be resolved before exiting with an error indicating the timestamp at which the schema change occurred. An `error: schema change occurred at <timestamp>` will display in the `cockroach.log` file.</li></ul><br>Default: `schema_change_policy=backfill`
<a name="split-column-families"></a>`split_column_families` | N/A | Use this option to create a changefeed on a table with multiple [column families]({% link {{ page.version.version }}/column-families.md %}). The changefeed will emit messages for each of the table's column families. See [Changefeeds on tables with column families]({% link {{ page.version.version }}/changefeeds-on-tables-with-column-families.md %}) for more usage detail.
Expand Down
Loading

0 comments on commit e668dee

Please sign in to comment.