Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update offset token without writing new rows #805

Open
wesleyhillyext opened this issue Aug 5, 2024 · 2 comments
Open

Update offset token without writing new rows #805

wesleyhillyext opened this issue Aug 5, 2024 · 2 comments

Comments

@wesleyhillyext
Copy link

Hi; we have a use-case for Snowpipe Streaming in which we need the ability to commit a new offset token to a channel, even if we have no new rows to insert.

In brief, this comes about because a minority of messages from a Kafka topic have to be ingested into a secondary Snowflake account, when the majority of messages go to a primary account. That secondary account might pass over millions of Kafka messages before it finds a message it needs to insert – without some way to commit its progress sifting through millions of messages, the next time that service restarts, it ends up having to repeating all that work and is causing a few problems.

I was hoping that calling SnowflakeStreamingIngestChannel.insertRows(List.of(), offsetToken) would commit the offset token the next time blobs were registered. But I tested that, and that doesn't appear be the behavior.

Is that the expected behavior, and would it be possible to change it so that zero-row offset updates can be done this way?

I've considered a few alternatives:

  1. Call SYSTEM$SNOWPIPE_STREAMING_UPDATE_CHANNEL_OFFSET_TOKEN periodically to make zero-row offset updates. My main concern is that this would not be protected by the usual channel invalidation mechanism in the case of another client opening the same channel. In (unlikely) race conditions between an old a new client for the same channel, it would be possible to accidentally rewind the offset and duplicate some messages.

  2. Insert these zero-row offset tokens into a table, then on startup always take the maximum across the latest value in that table, and what the channel says the offset token is. This will probably be our approach if we can't just use SnowflakeStreamingIngestChannel.insertRows to make zero-row offset updates.

  3. Use a Kafka Streams app to split the topic into two separate topics, so a channel is no longer skipping over messages in the topic. Due to our scale in terms of number of topics and messages, this is less desirable than option (2).

@sfc-gh-tzhang
Copy link
Contributor

We actual support this as part of the open channel request, see setOffsetToken in OpenChannelRequestBuilder, would that satisfy your requirement?

@wesleyhillyext
Copy link
Author

Oh interesting, I didn't know that's what setOffsetToken did – I didn't see any documentation on that parameter. It seems awkward for this use case – if we wanted to checkpoint our progress through messages which yielded no rows every, say, 30 seconds, then we'd have to close and reopen the channel every 30 seconds. And that would probably cause the last blob of data form the previous channel to be rejected as previous channel is invalidated by the new one, so we'd have to rewind our position in the Kafka topic to match, etc.

For now, given some deadlines we had to meet, we've gone ahead and implemented workaround (2) from my original post. So we have our own implementation of the channel interface which delegates to the Snowpipe channel, but if it sees calls to insertRows with no rows, it remembers those offsets and every minute sends a checkpoint to a separate channel for a SNOWPIPE_CHECKPOINTS table we created in Snowflake. Then our initial offset logic at startup takes the max offset across the real committed channel offset and the channel's checkpoint in that table.

It could still be nice if Snowpipe itself took care of updating offsets without writing rows to avoid this extra complexity, but since we have this workaround in place it would be lower on our list of asks. Things which are harder to workaround ourselves, mainly issues #649 and #549, would be higher priority for us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants