diff --git a/bulkerapp/app/app_config.go b/bulkerapp/app/app_config.go index 40c6cc3..b7a4255 100644 --- a/bulkerapp/app/app_config.go +++ b/bulkerapp/app/app_config.go @@ -45,9 +45,10 @@ type Config struct { //Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"} KafkaSASL string `mapstructure:"KAFKA_SASL"` - KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"` - KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"168"` - KafkaRetryTopicRetentionHours int `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"168"` + KafkaTopicCompression string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"` + KafkaTopicRetentionHours int `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"168"` + + KafkaRetryTopicRetentionHours int `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"48"` KafkaRetryTopicSegmentBytes int `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"` KafkaDeadTopicRetentionHours int `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"` KafkaTopicReplicationFactor int `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"` @@ -56,8 +57,10 @@ type Config struct { //TODO: max.poll.interval.ms // KafkaDestinationsTopicName destination topic for /ingest endpoint - KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"` - KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"4"` + KafkaDestinationsTopicName string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"` + KafkaDestinationsTopicRetentionHours int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_RETENTION_HOURS" default:"48"` + + KafkaDestinationsTopicPartitions int `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"4"` KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"` @@ -65,6 +68,8 @@ type Config struct { TopicManagerRefreshPeriodSec int `mapstructure:"TOPIC_MANAGER_REFRESH_PERIOD_SEC" default:"5"` // ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report. + ProducerBatchSize int `mapstructure:"PRODUCER_BATCH_SIZE" default:"65535"` + ProducerLingerMs int `mapstructure:"PRODUCER_LINGER_MS" default:"1000"` ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"` // # BATCHING @@ -83,8 +88,8 @@ type Config struct { // For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes. // Default: 5 MessagesRetryBackoffBase float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_BASE" default:"5"` - // MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours - MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"1440"` + // MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 3180 minutes = 53 hours (5^5 minutes) + MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"3180"` // # EVENTS LOGGING diff --git a/bulkerapp/app/producer.go b/bulkerapp/app/producer.go index caee44b..0e7a20c 100644 --- a/bulkerapp/app/producer.go +++ b/bulkerapp/app/producer.go @@ -27,7 +27,8 @@ func NewProducer(config *Config, kafkaConfig *kafka.ConfigMap) (*Producer, error base := appbase.NewServiceBase("producer") producerConfig := kafka.ConfigMap(utils.MapPutAll(kafka.ConfigMap{ - //TODO: add producer specific config here + "batch.size": config.ProducerBatchSize, + "linger.ms": config.ProducerLingerMs, }, *kafkaConfig)) producer, err := kafka.NewProducer(&producerConfig) if err != nil { diff --git a/bulkerapp/app/topic_manager.go b/bulkerapp/app/topic_manager.go index b8af534..981f527 100644 --- a/bulkerapp/app/topic_manager.go +++ b/bulkerapp/app/topic_manager.go @@ -257,7 +257,10 @@ func (tm *TopicManager) processMetadata(metadata *kafka.Metadata) { } } tm.allTopics = allTopics - err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, nil) + err := tm.ensureTopic(tm.config.KafkaDestinationsTopicName, tm.config.KafkaDestinationsTopicPartitions, + map[string]string{ + "retention.ms": fmt.Sprint(tm.config.KafkaDestinationsTopicRetentionHours * 60 * 60 * 1000), + }) if err != nil { metrics.TopicManagerError("destination-topic_error").Inc() tm.SystemErrorf("Failed to create destination topic [%s]: %v", tm.config.KafkaDestinationsTopicName, err) diff --git a/bulkerlib/implementations/sql/testcontainers/clickhouse/ch_cluster_container.go b/bulkerlib/implementations/sql/testcontainers/clickhouse/ch_cluster_container.go index c62c1cd..12c7e87 100644 --- a/bulkerlib/implementations/sql/testcontainers/clickhouse/ch_cluster_container.go +++ b/bulkerlib/implementations/sql/testcontainers/clickhouse/ch_cluster_container.go @@ -72,7 +72,7 @@ func (ch *ClickHouseClusterContainer) Close() error { execError := ch.Compose.Down(context.Background()) err := execError.Error if err != nil { - return fmt.Errorf("could down docker compose: %s", ch.Compose, ch.Identifier) + return fmt.Errorf("could down docker compose: %s", ch.Identifier) } }