Skip to content

Commit

Permalink
Log topic name when delaying events, add warning about using DelayedF…
Browse files Browse the repository at this point in the history
…linkKafkaConsumer (#7011)
  • Loading branch information
piotrp authored Oct 15, 2024
1 parent f0ecff4 commit 150f95c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/integration/KafkaIntegration.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ Important thing to remember is that Kafka server addresses/Schema Registry addre
| kafkaProperties."isolation.level" | High | string | | Controls how to read messages written transactionally. [isolation.level](https://kafka.apache.org/documentation/#consumerconfigs_isolation.level) |
| kafkaProperties | Medium | map | | Additional configuration of [producers](https://kafka.apache.org/documentation/#producerconfigs) or [consumers](https://kafka.apache.org/documentation/#consumerconfigs) |
| useStringForKey | Medium | boolean | true | Kafka message keys will be in the string format (not in Avro) |
| kafkaEspProperties.forceLatestRead | Medium | boolean | false | If scenario is restarted, should offsets of source consumers be reset to latest (can be useful in test enrivonments) |
| kafkaEspProperties.forceLatestRead | Medium | boolean | false | If scenario is restarted, should offsets of source consumers be reset to latest (can be useful in test environments) |
| topicsExistenceValidationConfig.enabled | Low | boolean | true | Determine if existence of topics should be validated. For Kafka topics used in Sinks, topic existence won't be checked if [auto.create.topics.enable](https://kafka.apache.org/documentation/#brokerconfigs_auto.create.topics.enable) is true on Kafka cluster - note, that it may require permissions to access Kafka cluster settings |
| topicsExistenceValidationConfig.validatorConfig.autoCreateFlagFetchCacheTtl | Low | duration | 5 minutes | TTL for checking Kafka cluster settings |
| topicsExistenceValidationConfig.validatorConfig.topicsFetchCacheTtl | Low | duration | 30 seconds | TTL for caching list of existing topics |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ object DelayedFlinkKafkaConsumer {
type ExtractTimestampForDelay[T] = (KafkaTopicPartitionState[T, TopicPartition], T, Long) => Long
}

/**
* Warning: this consumer works correctly only when it's handling a single partition (so job's parallelism must be
* at least equal to the number of topic partitions). Otherwise, a single message will block reading
* from multiple partitions, leading bigger delays than intended.
*/
class DelayedFlinkKafkaConsumer[T](
topics: NonEmptyList[PreparedKafkaTopic[TopicName.ForSource]],
schema: KafkaDeserializationSchema[T],
Expand Down Expand Up @@ -207,7 +212,8 @@ class DelayedKafkaFetcher[T](

if (sleepTime >= maxSleepTime) {
val logMessage = s"Sleeping for $sleepTime ms of total $remainingDelay ms for ${records.size()} events. " +
s"Max event timestamp is $maxEventTimestamp, fetcher delay is $delay, partition:offset is ${partitionState.getPartition}:$offset"
s"Max event timestamp is $maxEventTimestamp, fetcher delay is $delay, topic:partition:offset is " +
s"${partitionState.getTopic}:${partitionState.getPartition}:$offset"
logger.info(logMessage)
}

Expand Down

0 comments on commit 150f95c

Please sign in to comment.