diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index 0e1e37e6..cc2db43d 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -17,9 +17,9 @@ package org.apache.rocketmq.spring.autoconfigure; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Map; import java.util.Objects; - import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; @@ -27,6 +27,7 @@ import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -41,8 +42,6 @@ import org.springframework.core.env.StandardEnvironment; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.databind.ObjectMapper; - @Configuration public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class); @@ -152,7 +151,7 @@ private void validate(ExtRocketMQTemplateConfiguration annotation, } if (rocketMQProperties.getNameServer() == null || - rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) { + RocketMQUtil.getNameServerString(rocketMQProperties.getNameServer()).equals(environment.resolvePlaceholders(annotation.nameServer()))) { throw new BeanDefinitionValidationException( "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " + "global property, please use the default RocketMQTemplate!"); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index 64626047..91fc594f 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -17,11 +17,11 @@ package org.apache.rocketmq.spring.autoconfigure; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; - import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; @@ -29,6 +29,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.aop.framework.AopProxyUtils; @@ -43,8 +44,6 @@ import org.springframework.core.env.StandardEnvironment; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.databind.ObjectMapper; - @Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); @@ -134,7 +133,7 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); - nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; + nameServer = StringUtils.isEmpty(nameServer) ? RocketMQUtil.getNameServerString(rocketMQProperties.getNameServer()) : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java index dbe697b7..899c40be 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java @@ -17,8 +17,9 @@ package org.apache.rocketmq.spring.autoconfigure; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; import javax.annotation.PostConstruct; - import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; @@ -29,6 +30,7 @@ import org.apache.rocketmq.spring.config.TransactionHandlerRegistry; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -48,8 +50,6 @@ import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import com.fasterxml.jackson.databind.ObjectMapper; - @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnClass({MQAdmin.class, ObjectMapper.class}) @@ -73,12 +73,12 @@ public void checkProperties() { @Bean @ConditionalOnMissingBean(DefaultMQProducer.class) - @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"}) + @ConditionalOnProperty(prefix = "rocketmq", value = {"producer.group"}) public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); - String nameServer = rocketMQProperties.getNameServer(); + List nameServer = rocketMQProperties.getNameServer(); String groupName = producerConfig.getGroup(); - Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); + Assert.notEmpty(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); String accessChannel = rocketMQProperties.getAccessChannel(); @@ -95,8 +95,7 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(), rocketMQProperties.getProducer().getCustomizedTraceTopic()); } - - producer.setNamesrvAddr(nameServer); + producer.setNamesrvAddr(RocketMQUtil.getNameServerString(nameServer)); if (!StringUtils.isEmpty(accessChannel)) { producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java index 832d0219..7412fb27 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java @@ -17,20 +17,20 @@ package org.apache.rocketmq.spring.autoconfigure; -import org.apache.rocketmq.common.MixAll; -import org.springframework.boot.context.properties.ConfigurationProperties; - import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.rocketmq.common.MixAll; +import org.springframework.boot.context.properties.ConfigurationProperties; @SuppressWarnings("WeakerAccess") @ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { /** - * The name server for rocketMQ, formats: `host:port;host:port`. + * The name servers for rocketMQ, formats: `host:port,host:port`. */ - private String nameServer; + private List nameServer; /** * Enum type for accesChannel, values: LOCAL, CLOUD @@ -49,11 +49,11 @@ public class RocketMQProperties { */ private Consumer consumer = new Consumer(); - public String getNameServer() { + public List getNameServer() { return nameServer; } - public void setNameServer(String nameServer) { + public void setNameServer(List nameServer) { this.nameServer = nameServer; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java index 4c731f24..afd94f92 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java @@ -17,6 +17,10 @@ package org.apache.rocketmq.spring.support; import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; @@ -39,10 +43,6 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.Objects; - public class RocketMQUtil { private final static Logger log = LoggerFactory.getLogger(RocketMQUtil.class); @@ -254,4 +254,20 @@ public static String getInstanceName(RPCHook rpcHook, String identify) { .append(separator).append(UtilAll.getPid()); return instanceName.toString(); } + + public static String getNameServerString(List nameServer) { + if (nameServer == null || nameServer.isEmpty()) { + throw new IllegalArgumentException("property nameServer is empty"); + } + String nameServerToString; + if (nameServer.size() > 1) { + nameServerToString = String.join(";", nameServer); + } else { + nameServerToString = nameServer.get(0); + if (nameServerToString.contains(";")) { + log.warn("name-server format `host:port;host:port` is deprecated."); + } + } + return nameServerToString; + } } diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java index 3a0bc41b..58324624 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQUtilTest.java @@ -17,13 +17,16 @@ package org.apache.rocketmq.spring.support; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.junit.Test; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; -import java.util.Arrays; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class RocketMQUtilTest { @@ -92,4 +95,12 @@ public void testHeaderConvertToSpringMsg() { assertEquals("tags", springMsg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TAGS)); } + @Test + public void testGetNameServerString() { + List nameServer = new ArrayList<>(); + nameServer.add("localhost:9876"); + nameServer.add("10.0.0.1:9876"); + assertEquals(RocketMQUtil.getNameServerString(nameServer), "localhost:9876;10.0.0.1:9876"); + } + } \ No newline at end of file