You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi; we have a use-case for Snowpipe Streaming in which we need the ability to commit a new offset token to a channel, even if we have no new rows to insert.
In brief, this comes about because a minority of messages from a Kafka topic have to be ingested into a secondary Snowflake account, when the majority of messages go to a primary account. That secondary account might pass over millions of Kafka messages before it finds a message it needs to insert – without some way to commit its progress sifting through millions of messages, the next time that service restarts, it ends up having to repeating all that work and is causing a few problems.
I was hoping that calling SnowflakeStreamingIngestChannel.insertRows(List.of(), offsetToken) would commit the offset token the next time blobs were registered. But I tested that, and that doesn't appear be the behavior.
Is that the expected behavior, and would it be possible to change it so that zero-row offset updates can be done this way?
I've considered a few alternatives:
Call SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN periodically to make zero-row offset updates. My main concern is that this would not be protected by the usual channel invalidation mechanism in the case of another client opening the same channel. In (unlikely) race conditions between an old a new client for the same channel, it would be possible to accidentally rewind the offset and duplicate some messages.
Insert these zero-row offset tokens into a table, then on startup always take the maximum across the latest value in that table, and what the channel says the offset token is. This will probably be our approach if we can't just use SnowflakeStreamingIngestChannel.insertRows to make zero-row offset updates.
Use a Kafka Streams app to split the topic into two separate topics, so a channel is no longer skipping over messages in the topic. Due to our scale in terms of number of topics and messages, this is less desirable than option (2).
The text was updated successfully, but these errors were encountered:
Oh interesting, I didn't know that's what setOffsetToken did – I didn't see any documentation on that parameter. It seems awkward for this use case – if we wanted to checkpoint our progress through messages which yielded no rows every, say, 30 seconds, then we'd have to close and reopen the channel every 30 seconds. And that would probably cause the last blob of data form the previous channel to be rejected as previous channel is invalidated by the new one, so we'd have to rewind our position in the Kafka topic to match, etc.
For now, given some deadlines we had to meet, we've gone ahead and implemented workaround (2) from my original post. So we have our own implementation of the channel interface which delegates to the Snowpipe channel, but if it sees calls to insertRows with no rows, it remembers those offsets and every minute sends a checkpoint to a separate channel for a SNOWPIPE_CHECKPOINTS table we created in Snowflake. Then our initial offset logic at startup takes the max offset across the real committed channel offset and the channel's checkpoint in that table.
It could still be nice if Snowpipe itself took care of updating offsets without writing rows to avoid this extra complexity, but since we have this workaround in place it would be lower on our list of asks. Things which are harder to workaround ourselves, mainly issues #649 and #549, would be higher priority for us.
Hi; we have a use-case for Snowpipe Streaming in which we need the ability to commit a new offset token to a channel, even if we have no new rows to insert.
In brief, this comes about because a minority of messages from a Kafka topic have to be ingested into a secondary Snowflake account, when the majority of messages go to a primary account. That secondary account might pass over millions of Kafka messages before it finds a message it needs to insert – without some way to commit its progress sifting through millions of messages, the next time that service restarts, it ends up having to repeating all that work and is causing a few problems.
I was hoping that calling
SnowflakeStreamingIngestChannel.insertRows(List.of(), offsetToken)
would commit the offset token the next time blobs were registered. But I tested that, and that doesn't appear be the behavior.Is that the expected behavior, and would it be possible to change it so that zero-row offset updates can be done this way?
I've considered a few alternatives:
Call
SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN
periodically to make zero-row offset updates. My main concern is that this would not be protected by the usual channel invalidation mechanism in the case of another client opening the same channel. In (unlikely) race conditions between an old a new client for the same channel, it would be possible to accidentally rewind the offset and duplicate some messages.Insert these zero-row offset tokens into a table, then on startup always take the maximum across the latest value in that table, and what the channel says the offset token is. This will probably be our approach if we can't just use
SnowflakeStreamingIngestChannel.insertRows
to make zero-row offset updates.Use a Kafka Streams app to split the topic into two separate topics, so a channel is no longer skipping over messages in the topic. Due to our scale in terms of number of topics and messages, this is less desirable than option (2).
The text was updated successfully, but these errors were encountered: