Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer fails to update its metadata when topic id changes #4898

Open
5 of 7 tasks
marcin-krystianc opened this issue Nov 12, 2024 · 2 comments
Open
5 of 7 tasks

Producer fails to update its metadata when topic id changes #4898

marcin-krystianc opened this issue Nov 12, 2024 · 2 comments
Labels

Comments

@marcin-krystianc
Copy link

Description

Producer correctly recognizes that the topic ID has changed, i.e., the topic was re-created (removed and created with the same name).
Unfortunately, the metadata for the just-recreated topic is never fully processed, and the internal metadata cache in the producer is not updated because when the topic is recreated, the leader epoch is always reset to 0 and this logic prevents the update from happening (f47815b#diff-b5723f3211c0413f28859bb339709d86154050db38ce91a29653c36130981f63R2070-R2082).
Thus in cases where the leader of the recreated topic has changed, the producer app keeps trying to produce messages to the old partition leader and never produces to the new partition leader.

How to reproduce

We use an application written in C# and Confluent.Kafka 2.6.0 (which internally uses librdkafka). In the repro-scenario, the app at the beginning creates or recreates (if the topic already exists) 3000 single-partition topics, after that it starts the producer that keeps producing to these newly-(re)created topics.
Topics are being removed and created before the producer is even instantiated, but since there are multiple brokers (3 in our case) in the cluster, some brokers need more time than others to process the topic removal and re-creation thus they can keep reporting topics with old IDs for some time. That can lead to the producer seeing an old ID (and an old leader) of a topic and then a new ID (and a new leader) afterward, but since the new ID will be with epoch leader reset to 0 it will not be properly digested by the producer (leader epoch is not newer...)

Repro steps:

  • Create a test cluster consisting of at least 3 brokers
  • Run the test app for the first time (it creates 3000 topics and happily produces to them)
  • Stop the test app and run it a second time (it removes existing topics and creates 3000 topics again), but this time the app will fail to produce to some topics because of the reasons mentioned above

Parameters:

dotnet run -c Release --project KafkaTool.csproj \
producer \
--config allow.auto.create.topics=false \
--config bootstrap.servers=localhost:40001,localhost:40002,localhost:40003 \
--topics=3000 \
--partitions=1 \
--replication-factor=3 \
--min-isr=2 \
--topic-stem=my-topic \
--messages-per-second=50000 \
--burst-messages-per-second=500000 \
--burst-cycle=0 \
--burst-duration=10000 \
\
--config request.timeout.ms=195000 \
--config message.timeout.ms=195000 \
--config request.required.acks=-1 \
--config enable.idempotence=false \
--config max.in.flight.requests.per.connection=1 \
--config topic.metadata.refresh.interval.ms=30000 \
--config topic.metadata.propagation.max.ms=75000 \
--config debug=all \
\
--config auto.offset.reset=earliest \
--config enable.auto.offset.store=false \
--config enable.auto.commit=false \
\
--config queue.buffering.max.ms=100 \
--config queue.buffering.max.messages=1000000 \
--config queue.buffering.max.kbytes=1000000 \
\
--config statistics.interval.ms=0 \
--statistics-path=my.txt \
\
--reporting-cycle=1000 \
--producers=1 \
--recreate-topics=true \
--recreate-topics-delay=1 \
--recreate-topics-batch-size=1000

Logs:

11:16:19 info: Log[0] Admin log: message=[thrd:main]: localhost:40002/2:   Topic my-topic-1168 with 1 partitions, name=rdkafka#producer-1, facility=METADATA, level=Debug
11:16:24 info: Producer0:[0] kafka-log Facility:TOPIC, Message[thrd:app]: New local topic: my-topic-1168
11:16:24 info: Producer0:[0] kafka-log Facility:TOPPARNEW, Message[thrd:app]: NEW my-topic-1168 [-1] 0x7f04b442a9a0 refcnt 0x7f04b442aa30 (at rd_kafka_topic_new0:488)
11:16:24 info: Producer0:[0] kafka-log Facility:CONF, Message[thrd:app]: Topic "my-topic-1168" configuration (default_topic_conf):
11:16:26 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1168 metadata information unknown
11:16:26 info: Producer0:[0] kafka-log Facility:NOINFO, Message[thrd:main]: Topic my-topic-1168 partition count is zero: should refresh metadata
11:16:30 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40001/1:   Topic my-topic-1168 with 1 partitions
11:16:30 info: Producer0:[0] kafka-log Facility:STATE, Message[thrd:main]: Topic my-topic-1168 changed state unknown -> exists
11:16:30 info: Producer0:[0] kafka-log Facility:PARTCNT, Message[thrd:main]: Topic my-topic-1168 partition count changed from 0 to 1
11:16:30 info: Producer0:[0] kafka-log Facility:TOPPARNEW, Message[thrd:main]: NEW my-topic-1168 [0] 0x7f04a811ee80 refcnt 0x7f04a811ef10 (at rd_kafka_topic_partition_cnt_update:937)
11:16:30 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 changed id from AAAAAAAAAAAAAAAAAAAAAA to 65J5lf+sT0GSlxIMD9BSbw
11:16:30 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 2 Epoch 0
11:16:30 info: Producer0:[0] kafka-log Facility:BROKER, Message[thrd:main]: my-topic-1168 [0]: leader -1 epoch -1 -> leader 2 epoch 0
11:16:30 info: Producer0:[0] kafka-log Facility:BRKDELGT, Message[thrd:main]: my-topic-1168 [0]: delegate to broker localhost:40002/2 (rktp 0x7f04a811ee80, term 0, ref 3)
11:16:30 info: Producer0:[0] kafka-log Facility:BRKDELGT, Message[thrd:main]: my-topic-1168 [0]: delegating to broker localhost:40002/2 for partition with 0 messages (0 bytes) queued
11:16:30 info: Producer0:[0] kafka-log Facility:BRKMIGR, Message[thrd:main]: Migrating topic my-topic-1168 [0] 0x7f04a811ee80 from (none) to localhost:40002/2 (sending PARTITION_JOIN to localhost:40002/2)
11:16:30 info: Producer0:[0] kafka-log Facility:PARTCNT, Message[thrd:main]: Partitioning 1 unassigned messages in topic my-topic-1168 to 1 partitions
11:16:30 info: Producer0:[0] kafka-log Facility:UAS, Message[thrd:main]: 1/1 messages were partitioned in topic my-topic-1168
11:16:30 info: Producer0:[0] kafka-log Facility:TOPBRK, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: Topic my-topic-1168 [0]: joining broker (rktp 0x7f04a811ee80, 1 message(s) queued)
11:16:30 info: Producer0:[0] kafka-log Facility:FETCHADD, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: Added my-topic-1168 [0] to active list (117 entries, opv 0, 1 messages queued): joining
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (1 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 1 message(s) (80 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
11:16:31 info: Producer0:[0] kafka-log Facility:MSGSET, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: MessageSet with 1 message(s) (MsgId 0, BaseSeq -1) encountered error: Broker: Unknown topic or partition (actions Refresh,MsgNotPersisted)
11:16:31 info: Producer0:[0] kafka-log Facility:BROKERUA, Message[thrd:localhost:40002/bootstrap]: my-topic-1168 [0]: broker unavailable: produce: Broker: Unknown topic or partition
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (1 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 1 message(s) in xmit queue (0 added from partition queue)
11:16:31 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 1 message(s) (80 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
........................................................................
11:17:19 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40001/1:   Topic my-topic-1168 with 1 partitions
11:17:19 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 changed id from 65J5lf+sT0GSlxIMD9BSbw to 8YrcEGC1SjyeG1LpGlO57w
11:17:19 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:17:33 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 115 message(s) in xmit queue (115 added from partition queue)
11:17:33 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 115 message(s) (2525 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
11:17:33 info: Producer0:[0] kafka-log Facility:MSGSET, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: MessageSet with 115 message(s) (MsgId 0, BaseSeq -1) encountered error: Broker: Not leader for partition (actions Refresh,MsgNotPersisted)
11:17:33 info: Producer0:[0] kafka-log Facility:BROKERUA, Message[thrd:localhost:40002/bootstrap]: my-topic-1168 [0]: broker unavailable: produce: Broker: Not leader for partition
11:17:36 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40002/2:   Topic my-topic-1168 with 1 partitions
11:17:36 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:17:40 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 150 message(s) in xmit queue (150 added from partition queue)
11:17:40 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 150 message(s) (3295 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
11:17:40 info: Producer0:[0] kafka-log Facility:MSGSET, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: MessageSet with 150 message(s) (MsgId 0, BaseSeq -1) encountered error: Broker: Not leader for partition (actions Refresh,MsgNotPersisted)
11:17:40 info: Producer0:[0] kafka-log Facility:BROKERUA, Message[thrd:localhost:40002/bootstrap]: my-topic-1168 [0]: broker unavailable: produce: Broker: Not leader for partition
11:17:44 info: Producer0:[0] kafka-log Facility:METADATAUPDATE, Message[thrd:main]: Partition my-topic-1168(8YrcEGC1SjyeG1LpGlO57w)[0]: leader epoch is not newer 0 >= 0
11:17:44 info: Producer0:[0] kafka-log Facility:METADATAUPDATE, Message[thrd:main]: Partition my-topic-1168(8YrcEGC1SjyeG1LpGlO57w)[0]: leader epoch is not newer 0 >= 0
11:17:44 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40001/1:   Topic my-topic-1168 with 1 partitions
11:17:44 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:17:44 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40003/3:   Topic my-topic-1168 with 1 partitions
11:17:44 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:17:54 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40001/1:   Topic my-topic-1168 with 1 partitions
11:17:54 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:18:11 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 455 message(s) in xmit queue (455 added from partition queue)
11:18:11 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 455 message(s) (10005 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
11:18:11 info: Producer0:[0] kafka-log Facility:MSGSET, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: MessageSet with 455 message(s) (MsgId 0, BaseSeq -1) encountered error: Broker: Not leader for partition (actions Refresh,MsgNotPersisted)
11:18:11 info: Producer0:[0] kafka-log Facility:BROKERUA, Message[thrd:localhost:40002/bootstrap]: my-topic-1168 [0]: broker unavailable: produce: Broker: Not leader for partition
11:18:11 info: Producer0:[0] kafka-log Facility:METADATAUPDATE, Message[thrd:main]: Partition my-topic-1168(8YrcEGC1SjyeG1LpGlO57w)[0]: leader epoch is not newer 0 >= 0
11:18:11 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40003/3:   Topic my-topic-1168 with 1 partitions
11:18:11 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:18:17 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 527 message(s) in xmit queue (527 added from partition queue)
11:18:17 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 527 message(s) (11589 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
11:18:17 info: Producer0:[0] kafka-log Facility:MSGSET, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: MessageSet with 527 message(s) (MsgId 0, BaseSeq -1) encountered error: Broker: Not leader for partition (actions Refresh,MsgNotPersisted)
11:18:17 info: Producer0:[0] kafka-log Facility:BROKERUA, Message[thrd:localhost:40002/bootstrap]: my-topic-1168 [0]: broker unavailable: produce: Broker: Not leader for partition
11:18:17 info: Producer0:[0] kafka-log Facility:METADATAUPDATE, Message[thrd:main]: Partition my-topic-1168(8YrcEGC1SjyeG1LpGlO57w)[0]: leader epoch is not newer 0 >= 0
11:18:18 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40003/3:   Topic my-topic-1168 with 1 partitions
11:18:18 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:18:18 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40001/1:   Topic my-topic-1168 with 1 partitions
11:18:18 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:18:29 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: localhost:40001/1:   Topic my-topic-1168 with 1 partitions
11:18:29 info: Producer0:[0] kafka-log Facility:METADATA, Message[thrd:main]: Topic my-topic-1168 [0] Leader 1 Epoch 0
11:18:31 info: Producer0:[0] kafka-log Facility:TOPPAR, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0] 632 message(s) in xmit queue (632 added from partition queue)
11:18:31 info: Producer0:[0] kafka-log Facility:PRODUCE, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: Produce MessageSet with 632 message(s) (13899 bytes, ApiVersion 10, MsgVersion 2, MsgId 0, BaseSeq -1, PID{Invalid}, uncompressed)
11:18:31 info: Producer0:[0] kafka-log Facility:MSGSET, Message[thrd:localhost:40002/bootstrap]: localhost:40002/2: my-topic-1168 [0]: MessageSet with 632 message(s) (MsgId 0, BaseSeq -1) encountered error: Broker: Not leader for partition (actions Refresh,MsgNotPersisted)

Checklist

  • librdkafka version: 2.6.0
  • Apache Kafka version: 3.8.0
  • librdkafka client configuration: (See parameters)
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Operating system: ubuntu:22.04(x64)
  • Provide broker log excerpts
  • Critical issue
@emasab emasab added the bug label Nov 15, 2024
@emasab
Copy link
Contributor

emasab commented Nov 15, 2024

@marcin-krystianc Thanks for the report, the implementation of KIP-516 (topic ids) needs to be completed with regards to topic re-creation with same name, there are many fields to reset, among them there's the leader epoch. At the moment it's still needed to restart the clients and avoid relying on automatic handling of topic re-creation.

@marcin-krystianc
Copy link
Author

@marcin-krystianc Thanks for the report, the implementation of KIP-516 (topic ids) needs to be completed with regards to topic re-creation with same name, there are many fields to reset, among them there's the leader epoch. At the moment it's still needed to restart the clients and avoid relying on automatic handling of topic re-creation.

I'm happy to contribute a fix for it, but I would appreciate some guidance.
When I analysed how to approach the problem I came to a conclusion that after detecting the topic id change (https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_topic.c#L1340-L1357), we should remove the topic from cache & mark the topic as UNKNOWN and run the update procedure again. Thus, it would properly initialise the topic with all its fields and set it state again to EXISTS. What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants