Skip to content

Commit

Permalink
bulker: reduce default topic retention to 48 hours.
Browse files Browse the repository at this point in the history
bulker producer: set batch.size and linger.ms to improve compression efficiency
  • Loading branch information
absorbb committed Aug 15, 2023
1 parent 96e03f2 commit 7e28c32
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
19 changes: 12 additions & 7 deletions bulkerapp/app/app_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ type Config struct {
//Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"}
KafkaSASL string `mapstructure:"KAFKA_SASL"`

KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"168"`
KafkaRetryTopicRetentionHours int `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"168"`
KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"168"`

KafkaRetryTopicRetentionHours int `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"48"`
KafkaRetryTopicSegmentBytes int `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"`
KafkaDeadTopicRetentionHours int `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"`
KafkaTopicReplicationFactor int `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"`
Expand All @@ -56,15 +57,19 @@ type Config struct {
//TODO: max.poll.interval.ms

// KafkaDestinationsTopicName destination topic for /ingest endpoint
KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"`
KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"4"`
KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"`
KafkaDestinationsTopicRetentionHours int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_RETENTION_HOURS" default:"48"`

KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"4"`

KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"`

// TopicManagerRefreshPeriodSec how often topic manager will check for new topics
TopicManagerRefreshPeriodSec int `mapstructure:"TOPIC_MANAGER_REFRESH_PERIOD_SEC" default:"5"`

// ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report.
ProducerBatchSize int `mapstructure:"PRODUCER_BATCH_SIZE" default:"65535"`
ProducerLingerMs int `mapstructure:"PRODUCER_LINGER_MS" default:"1000"`
ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"`

// # BATCHING
Expand All @@ -83,8 +88,8 @@ type Config struct {
// For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes.
// Default: 5
MessagesRetryBackoffBase float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_BASE" default:"5"`
// MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours
MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"1440"`
// MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 3180 minutes = 53 hours (5^5 minutes)
MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"3180"`

// # EVENTS LOGGING

Expand Down
3 changes: 2 additions & 1 deletion bulkerapp/app/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func NewProducer(config *Config, kafkaConfig *kafka.ConfigMap) (*Producer, error
base := appbase.NewServiceBase("producer")

producerConfig := kafka.ConfigMap(utils.MapPutAll(kafka.ConfigMap{
//TODO: add producer specific config here
"batch.size": config.ProducerBatchSize,
"linger.ms": config.ProducerLingerMs,
}, *kafkaConfig))
producer, err := kafka.NewProducer(&producerConfig)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion bulkerapp/app/topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) {
}
}
tm.allTopics = allTopics
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, nil)
err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions,
map[string]string{
"retention.ms": fmt.Sprint(tm.config.KafkaDestinationsTopicRetentionHours * 60 * 60 * 1000),
})
if err != nil {
metrics.TopicManagerError("destination-topic_error").Inc()
tm.SystemErrorf("Failed to create destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (ch *ClickHouseClusterContainer) Close() error {
execError := ch.Compose.Down(context.Background())
err := execError.Error
if err != nil {
return fmt.Errorf("could down docker compose: %s", ch.Compose, ch.Identifier)
return fmt.Errorf("could down docker compose: %s", ch.Identifier)
}
}

Expand Down

0 comments on commit 7e28c32

Please sign in to comment.