Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge topics using Kafkactl #121

Open
nil-malh opened this issue Apr 26, 2024 · 5 comments
Open

Purge topics using Kafkactl #121

nil-malh opened this issue Apr 26, 2024 · 5 comments
Labels
enhancement This issue or pull request improves a feature

Comments

@nil-malh
Copy link

Problem

There's no simple way to purge a topic that's by using kafkactl or kafka-admin. The goal is to be able to easily purge a topic from all the records it contains !

Suggestion

The user should be able to do :

kafkactl purge topic <topicName> <--force, --retention-ms=durationMS>

Alternatives Considered

There's two way that could be used to purge a topic from its content one being easier to execute that the other

  • Store in memory the topic.yaml, delete the topic, re-apply the topic.yaml in memory (this avoids a lot of overhead with the other method)

  • Set the topic retention.ms temporarily to a low value like 10ms to delete all records that are older than the retention.ms value, but this implies that the user needs to reset the offsets of all the consumer groups subscribed to the topics that will purged

While the second way requires a bit more of setting up, it could offer more than purging a whole topic we could imagine that this could be used to purge only old message from a topic while keeping the more recent records unscathed !

What are your thoughts on this ? I'll contribute if the issue is deems useful :)

@nil-malh nil-malh added the enhancement This issue or pull request improves a feature label Apr 26, 2024
@loicgreffier
Copy link
Collaborator

loicgreffier commented Apr 26, 2024

@nil-malh https://github.com/michelin/kafkactl?tab=readme-ov-file#delete-records ? 😄

Maybe the delete-records command can be enhanced with options such as the retention.ms. Actually, all records are deleted

@nil-malh
Copy link
Author

nil-malh commented Apr 26, 2024

@nil-malh https://github.com/michelin/kafkactl?tab=readme-ov-file#delete-records ? 😄

I was already aware of this however, it's only reserved for topics that are a cleanup-policy set to delete for those where the cleanup-policy is compact you can't delete records ( see here in the ns4kafka repo )

Maybe the delete-records command can be enhanced with options such as the retention.ms. Actually, all records are deleted

We could add this, but this still limits the scope for topics with a delete cleanup-policy.

This is why I suggested another command like purge or another name like recreate that will recreate the topic from the yaml file and avoid the limit for topic with delete clenup policy ! 😄

@loicgreffier
Copy link
Collaborator

loicgreffier commented Apr 27, 2024

Records deletion has been deactivated on purpose for compacted topics. The Admin Client is throwing java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy. when running the records deletion on a compacted topic.

The records deletion for compacted topics should take place in the delete-records cmd. No need to create a whole dedicated cmd IMHO.

The only option that comes to mind is actually deleting the topic on the broker (if it's a compacted one), and let the Ns4Kafka sync recreates it (Ns4Kafka stores every resource actually).

But we need to warn the user that connected Kafka clients are going to be disrupted waiting for the topic being recreated. I'm thinking about 2 options:

  • Check there is no active consumer group on the topic before running the deletion. But how to deal with producers?
  • Warn the user, and ask him to run the command with an --execute option, just like we do it for reset-password cmd (https://github.com/michelin/kafkactl?tab=readme-ov-file#reset-password) to be sure he gets the consequences of deleting records in a compacted topic

@nil-malh
Copy link
Author

The records deletion for compacted topics should take place in the delete-records cmd. No need to create a whole dedicated cmd IMHO.

Fair enough, the goal of creating a new command was to avoid to have two different logics behind the delete-records but this makes sense, as well !

The only option that comes to mind is actually deleting the topic on the broker (if it's a compacted one), and let the Ns4Kafka sync recreates it (Ns4Kafka stores every resource actually).

I was thinking almost the same. Currently when I want to delete a topic that's not a delete cleanup-policy I do the following

kafkactl get topic <topicName> -o yaml # I am storing the output in a variable
kafactl delete topic <topicName> 
kafkactl apply -o <topic.yaml>

I was actually unaware that NS4Kafka was storing resources thanks ! 😄

But this approach does not use the NS4Kafka ability as you've mentioned ! Will it be faster than waiting for NS4Kafka to spot that a resources is missing and then recreate it ?

Check there is no active consumer group on the topic before running the deletion. But how to deal with producers?

  • Regarding consumer groups, we can add a --force if there's active consumer groups but warn the user that there's active consumer groups on the topic they want to delete and that it will lead to a disruption of their application(s).

  • For producers it's a tad trickier, it highly depends on how the user's app is coded, a producer that want to produce in a deleted topic will throw a TopicAuthorizationException due to the broker saying that this topic does not exists therefore the producer does not have the right to create in the topic (If my Kafka knowledge serves me right ! 😉 )
    So if the user has implemented a retry mechanism on their app this should not be an issue ! However I don't know what happens to the record if no retry mechanism has been implemented, will try this tonight and report back on the behavior

I was thinking about it as well this is the best bet IMO to avoid unexpected deletions of records 😄

Let me know what you think and what should we do moving forward I am keen to any suggestions that you might want to see

@loicgreffier
Copy link
Collaborator

@nil-malh

But this approach does not use the NS4Kafka ability as you've mentioned

You do use it. That's done under the hood.

  • kafkactl delete deletes the resource from both cluster and Ns4Kafka internal topics (see here)
  • kafkactl apply only push the resource to the Ns4Kafka internal topic: (see here), then the resource is picked up by the relevant executor and deployed asynchronously (e.g., TopicAsyncExecutor for topics)

So deleting the compact topic from the cluster and let the next synchronization recreates it sticks to the overall Ns4Kafka design, and is the same thing that running kafkactl delete/kafkactl apply except that we do not perform an unecessary deletion from Ns4Kafka.

we can add a --force if there's active consumer groups but warn the user that there's active consumer groups on the topic

Topic deletion should just be declined as long as any consumer group is active for that topic. As done for reset-offsets. --force sounds too much like "No matter who's consuming my topic, I don't care ☢️" and breaks any client that consumes topics you're owner of.

If no more consumer group is active, then the warning and the --execute takes place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement This issue or pull request improves a feature
Projects
None yet
Development

No branches or pull requests

2 participants