diff --git a/buession-canal-parent/pom.xml b/buession-canal-parent/pom.xml
index 1b667ee..8a2abcb 100644
--- a/buession-canal-parent/pom.xml
+++ b/buession-canal-parent/pom.xml
@@ -70,7 +70,7 @@
com.alibaba.otter
- canal.client
+ canal.protocol
${alibaba.canal.version}
@@ -89,7 +89,7 @@
com.alibaba.otter
- canal.protocol
+ canal.client
${alibaba.canal.version}
diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java
index 5c545d5..3ab6a1f 100644
--- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java
+++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java
@@ -26,10 +26,7 @@
import com.buession.canal.annotation.CanalBinding;
import com.buession.core.validator.Validate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.aop.scope.ScopedProxyFactoryBean;
-import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
@@ -47,7 +44,9 @@
import org.springframework.lang.NonNull;
import org.springframework.util.ClassUtils;
+import java.util.HashSet;
import java.util.Optional;
+import java.util.Set;
/**
* {@link CanalBinding} 扫描器
@@ -62,7 +61,7 @@ class CanalBindingClassPathMapperScanner extends ClassPathBeanDefinitionScanner
*/
private boolean lazyInitialization;
- private final static Logger logger = LoggerFactory.getLogger(CanalBindingClassPathMapperScanner.class);
+ private final Set destinations = new HashSet<>();
/**
* 构造函数
@@ -97,20 +96,6 @@ protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return annotationMetadata.isIndependent();
}
- @Override
- protected boolean checkCandidate(@NonNull String beanName, @NonNull BeanDefinition beanDefinition) {
- if(super.checkCandidate(beanName, beanDefinition)){
- return true;
- }else{
- if(logger.isDebugEnabled()){
- logger.warn(
- "Skipping CanalBindingFactoryBean with name '{}' and '{}' bindingType. Bean already defined with the same name!",
- beanName, beanDefinition.getBeanClassName());
- }
- return false;
- }
- }
-
@Override
protected void registerBeanDefinition(@NonNull BeanDefinitionHolder definitionHolder,
@NonNull BeanDefinitionRegistry registry) {
@@ -118,26 +103,15 @@ protected void registerBeanDefinition(@NonNull BeanDefinitionHolder definitionHo
AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) definitionHolder.getBeanDefinition();
- boolean scopedProxy = false;
-
if(ScopedProxyFactoryBean.class.getName().equals(beanDefinition.getBeanClassName())){
beanDefinition = (AbstractBeanDefinition) Optional
.ofNullable(((RootBeanDefinition) beanDefinition).getDecoratedDefinition())
.map(BeanDefinitionHolder::getBeanDefinition).orElseThrow(()->new IllegalStateException(
"The target bean definition of scoped proxy bean not found. Root bean definition[" +
definitionHolder + ']'));
- scopedProxy = true;
}
processBeanDefinition(beanDefinition);
-
- if(scopedProxy){
- return;
- }
-
- if(beanDefinition.isSingleton() == false){
- registerProxyBeanDefinitionHolder(definitionHolder, registry);
- }
}
private void processBeanDefinition(final AbstractBeanDefinition beanDefinition) {
@@ -145,28 +119,27 @@ private void processBeanDefinition(final AbstractBeanDefinition beanDefinition)
CanalBinding canalBinding = AnnotationUtils.findAnnotation(ClassUtils.resolveClassName(beanClassName, null),
CanalBinding.class);
+ if(canalBinding == null){
+ return;
+ }
if(Validate.isBlank(canalBinding.destination())){
throw new IllegalStateException(
"Either 'destination' must be required in @CanalBinding for: " + beanClassName);
}
+ if(destinations.contains(canalBinding.destination())){
+ throw new IllegalStateException(
+ "The destination: " + canalBinding.destination() + " already exists in @CanalBinding for: " +
+ beanClassName);
+ }
+
beanDefinition.setLazyInit(lazyInitialization);
beanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, beanClassName);
- }
- private void registerProxyBeanDefinitionHolder(final BeanDefinitionHolder definitionHolder,
- final BeanDefinitionRegistry registry) {
- final BeanDefinitionHolder proxyBeanDefinitionHolder = ScopedProxyUtils.createScopedProxy(definitionHolder,
- registry, true);
-
- if(registry.containsBeanDefinition(proxyBeanDefinitionHolder.getBeanName())){
- registry.removeBeanDefinition(proxyBeanDefinitionHolder.getBeanName());
- }
- registry.registerBeanDefinition(proxyBeanDefinitionHolder.getBeanName(),
- proxyBeanDefinitionHolder.getBeanDefinition());
+ destinations.add(canalBinding.destination());
}
}
diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java
index a84ced3..e638bb7 100644
--- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java
+++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java
@@ -56,7 +56,7 @@ public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansExcepti
@Override
public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {
CanalBinding canalBinding = AnnotationUtils.findAnnotation(bean.getClass(), CanalBinding.class);
-
+
if(canalBinding != null){
Dispatcher dispatcher = beanFactory.getBean(Dispatcher.class);
detectBindingMethods(bean, bean.getClass(), (AbstractDispatcher) dispatcher, canalBinding.destination());
@@ -76,8 +76,8 @@ protected void detectBindingMethods(final Object bean, final Class> beanType,
return;
}
- String listenerName = EventListenerUtils.buildEventListenerName(destination, canalEventListener.schema(),
- canalEventListener.table(), canalEventListener.eventType());
+ String listenerName = EventListenerUtils.buildEventListenerName(destination,
+ canalEventListener.schema(), canalEventListener.table(), canalEventListener.eventType());
dispatcher.getEventListenerRegistry().register(listenerName, bean, invocableMethod);
});
}