From 4c2215df90731d53296b7216bee29904838db43d Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Sun, 12 Nov 2023 21:39:46 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90spring=E3=80=91=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../CanalBindingClassPathMapperScanner.java | 86 +++++++------------ .../CanalBindingScannerRegistrar.java | 52 ++++++++--- .../CanalBindingBeanPostProcessor.java | 62 ++++++++++++- .../annotation/factory/package-info.java | 2 +- .../client/factory/CanalClientFactory.java | 50 ++++++++--- .../factory/CanalClientFactoryBean.java | 16 ++-- 6 files changed, 172 insertions(+), 96 deletions(-) diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java index 20ec1f6..3102aab 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java @@ -25,7 +25,6 @@ package com.buession.canal.spring.annotation; import com.buession.canal.annotation.CanalBinding; -import com.buession.canal.spring.binding.factory.CanalBindingFactoryBean; import com.buession.core.validator.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +32,7 @@ import org.springframework.aop.scope.ScopedProxyUtils; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.support.AbstractBeanDefinition; @@ -48,9 +48,7 @@ import org.springframework.lang.NonNull; import org.springframework.util.ClassUtils; -import java.util.Arrays; import java.util.Optional; -import java.util.Set; /** * {@link CanalBinding} 扫描器 @@ -65,6 +63,8 @@ class CanalBindingClassPathMapperScanner extends ClassPathBeanDefinitionScanner */ private boolean lazyInitialization; + private final AutowireCapableBeanFactory beanFactory; + private final static Logger logger = LoggerFactory.getLogger(CanalBindingClassPathMapperScanner.class); /** @@ -76,10 +76,13 @@ class CanalBindingClassPathMapperScanner extends ClassPathBeanDefinitionScanner * {@link Environment} * @param resourceLoader * {@link ResourceLoader} + * @param beanFactory + * {@link AutowireCapableBeanFactory} */ public CanalBindingClassPathMapperScanner(BeanDefinitionRegistry registry, Environment environment, - ResourceLoader resourceLoader) { + ResourceLoader resourceLoader, AutowireCapableBeanFactory beanFactory) { super(registry, false, environment, resourceLoader); + this.beanFactory = beanFactory; addIncludeFilter(new AnnotationTypeFilter(CanalBinding.class)); setBeanNameGenerator(FullyQualifiedAnnotationBeanNameGenerator.INSTANCE); } @@ -106,41 +109,19 @@ protected boolean checkCandidate(@NonNull String beanName, @NonNull BeanDefiniti return true; }else{ if(logger.isDebugEnabled()){ - logger.warn("Skipping CanalBindingFactoryBean with name '{}' and '{}' bindingType" + - ". Bean already defined with the same name!", beanName, beanDefinition.getBeanClassName()); + logger.warn( + "Skipping CanalBindingFactoryBean with name '{}' and '{}' bindingType. Bean already defined with the same name!", + beanName, beanDefinition.getBeanClassName()); } return false; } } @Override - @NonNull - protected Set doScan(@NonNull String... basePackages) { - Set beanDefinitions = super.doScan(basePackages); - - if(beanDefinitions.isEmpty()){ - if(logger.isDebugEnabled()){ - logger.debug("No CanalBinding was found in '{}' package. Please check your configuration.", - Arrays.toString(basePackages)); - } - }else{ - processBeanDefinitions(beanDefinitions); - } - - return beanDefinitions; - } + protected void registerBeanDefinition(BeanDefinitionHolder definitionHolder, BeanDefinitionRegistry registry) { + super.registerBeanDefinition(definitionHolder, registry); - private void processBeanDefinitions(final Set beanDefinitions) { - BeanDefinitionRegistry beanDefinitionRegistry = getRegistry(); - - for(BeanDefinitionHolder beanDefinitionHolder : beanDefinitions){ - processBeanDefinition(beanDefinitionHolder, beanDefinitionRegistry); - } - } - - private void processBeanDefinition(final BeanDefinitionHolder beanDefinitionHolder, - final BeanDefinitionRegistry beanDefinitionRegistry) { - AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) beanDefinitionHolder.getBeanDefinition(); + AbstractBeanDefinition beanDefinition = (AbstractBeanDefinition) definitionHolder.getBeanDefinition(); boolean scopedProxy = false; @@ -149,58 +130,49 @@ private void processBeanDefinition(final BeanDefinitionHolder beanDefinitionHold .ofNullable(((RootBeanDefinition) beanDefinition).getDecoratedDefinition()) .map(BeanDefinitionHolder::getBeanDefinition).orElseThrow(()->new IllegalStateException( "The target bean definition of scoped proxy bean not found. Root bean definition[" + - beanDefinitionHolder + "]")); + definitionHolder + ']')); scopedProxy = true; } - processBeanDefinition(beanDefinitionHolder, beanDefinition); + processBeanDefinition(beanDefinition); if(scopedProxy){ return; } if(beanDefinition.isSingleton() == false){ - registerProxyBeanDefinitionHolder(beanDefinitionHolder, beanDefinitionRegistry); + registerProxyBeanDefinitionHolder(definitionHolder, registry); } } - private void processBeanDefinition(final BeanDefinitionHolder beanDefinitionHolder, - final AbstractBeanDefinition beanDefinition) { + private void processBeanDefinition(final AbstractBeanDefinition beanDefinition) { String beanClassName = beanDefinition.getBeanClassName(); - if(logger.isDebugEnabled()){ - logger.debug("Creating CanalBindingFactoryBean with name '{}' and '{}' bindingType", - beanDefinitionHolder.getBeanName(), beanClassName); - } - Class bindingClazz = ClassUtils.resolveClassName(beanClassName, null); - CanalBinding canalBinding = AnnotationUtils.findAnnotation(bindingClazz, CanalBinding.class); + CanalBinding canalBinding = AnnotationUtils.findAnnotation(ClassUtils.resolveClassName(beanClassName, null), + CanalBinding.class); if(Validate.isBlank(canalBinding.destination())){ throw new IllegalStateException( - "Either 'destination' must be provided in @CanalBinding for: " + bindingClazz.getName()); + "Either 'destination' must be required in @CanalBinding for: " + beanClassName); } - beanDefinition.getPropertyValues().add("destination", canalBinding.destination()); - beanDefinition.getPropertyValues().add("bindingType", bindingClazz); - - beanDefinition.setBeanClass(CanalBindingFactoryBean.class); - beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanClassName); - beanDefinition.setLazyInit(lazyInitialization); beanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, beanClassName); + + //CanalEventListenerAnnotationUtils.registryEventListenerMethod(canalBinding.destination(), ); } - private void registerProxyBeanDefinitionHolder(final BeanDefinitionHolder beanDefinitionHolder, - final BeanDefinitionRegistry beanDefinitionRegistry) { - final BeanDefinitionHolder proxyBeanDefinitionHolder = ScopedProxyUtils.createScopedProxy(beanDefinitionHolder, - beanDefinitionRegistry, true); + private void registerProxyBeanDefinitionHolder(final BeanDefinitionHolder definitionHolder, + final BeanDefinitionRegistry registry) { + final BeanDefinitionHolder proxyBeanDefinitionHolder = ScopedProxyUtils.createScopedProxy(definitionHolder, + registry, true); - if(beanDefinitionRegistry.containsBeanDefinition(proxyBeanDefinitionHolder.getBeanName())){ - beanDefinitionRegistry.removeBeanDefinition(proxyBeanDefinitionHolder.getBeanName()); + if(registry.containsBeanDefinition(proxyBeanDefinitionHolder.getBeanName())){ + registry.removeBeanDefinition(proxyBeanDefinitionHolder.getBeanName()); } - beanDefinitionRegistry.registerBeanDefinition(proxyBeanDefinitionHolder.getBeanName(), + registry.registerBeanDefinition(proxyBeanDefinitionHolder.getBeanName(), proxyBeanDefinitionHolder.getBeanDefinition()); } diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingScannerRegistrar.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingScannerRegistrar.java index 8ff2db9..d6ee911 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingScannerRegistrar.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingScannerRegistrar.java @@ -25,8 +25,15 @@ package com.buession.canal.spring.annotation; import com.buession.canal.annotation.CanalBinding; +import com.buession.canal.client.dispatcher.DefaultDispatcher; +import com.buession.canal.spring.annotation.factory.CanalBindingBeanPostProcessor; import com.buession.core.validator.Validate; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.context.EnvironmentAware; import org.springframework.context.ResourceLoaderAware; @@ -49,37 +56,46 @@ * @see BeanDefinition * @since 0.0.1 */ -class CanalBindingScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware { +class CanalBindingScannerRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware, ResourceLoaderAware, + BeanFactoryAware { private Environment environment; private ResourceLoader resourceLoader; - public Environment getEnvironment() { - return environment; - } + private AutowireCapableBeanFactory beanFactory; @Override public void setEnvironment(@NonNull Environment environment) { this.environment = environment; } - public ResourceLoader getResourceLoader() { - return resourceLoader; - } - @Override public void setResourceLoader(@NonNull ResourceLoader resourceLoader) { this.resourceLoader = resourceLoader; } + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + if(beanFactory instanceof AutowireCapableBeanFactory){ + this.beanFactory = (AutowireCapableBeanFactory) beanFactory; + } + } + @Override public void registerBeanDefinitions(@NonNull AnnotationMetadata metadata, @NonNull BeanDefinitionRegistry registry) { + registerCanalBindingBeanDefinitions(metadata, registry); + registerDispatcherBeanDefinition(metadata, registry); + registerCanalBindingBeanPostProcessor(metadata, registry); + } + + private void registerCanalBindingBeanDefinitions(final AnnotationMetadata metadata, + final BeanDefinitionRegistry registry) { final AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(EnableCanal.class.getName())); final CanalBindingClassPathMapperScanner scanner = new CanalBindingClassPathMapperScanner(registry, - getEnvironment(), getResourceLoader()); + environment, resourceLoader, beanFactory); final Set basePackages = getBasePackages(annotationAttributes); String lazyInitialization = annotationAttributes.getString("lazyInitialization"); @@ -91,7 +107,23 @@ public void registerBeanDefinitions(@NonNull AnnotationMetadata metadata, basePackages.add(getDefaultBasePackage(metadata)); } - scanner.doScan(basePackages.toArray(new String[]{})); + scanner.scan(basePackages.toArray(new String[]{})); + } + + private void registerDispatcherBeanDefinition(final AnnotationMetadata metadata, + final BeanDefinitionRegistry registry) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(DefaultDispatcher.class); + + registry.registerBeanDefinition("default.Canal.Dispatcher", builder.getBeanDefinition()); + } + + private void registerCanalBindingBeanPostProcessor(final AnnotationMetadata metadata, + final BeanDefinitionRegistry registry) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition( + CanalBindingBeanPostProcessor.class); + builder.addPropertyValue("beanFactory", beanFactory); + + registry.registerBeanDefinition(CanalBindingBeanPostProcessor.class.getName(), builder.getBeanDefinition()); } private static Set getBasePackages(final AnnotationAttributes annotationAttributes) { diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java index a93805b..a161b6a 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java @@ -21,10 +21,64 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.spring.annotation.factory;/** - * - * + */ +package com.buession.canal.spring.annotation.factory; + +import com.buession.canal.annotation.CanalBinding; +import com.buession.canal.annotation.CanalEventListener; +import com.buession.canal.client.dispatcher.AbstractDispatcher; +import com.buession.canal.client.dispatcher.Dispatcher; +import com.buession.canal.core.listener.utils.EventListenerUtils; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.lang.NonNull; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; + +/** * @author Yong.Teng * @since 0.0.1 - */public class CanalBindingBeanPostProcessor { + */ +public class CanalBindingBeanPostProcessor implements BeanPostProcessor, BeanFactoryAware { + + private BeanFactory beanFactory; + + @Override + public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException { + CanalBinding canalBinding = AnnotationUtils.findAnnotation(bean.getClass(), CanalBinding.class); + if(canalBinding != null){ + Dispatcher dispatcher = beanFactory.getBean(Dispatcher.class); + detectBindingMethods(bean, bean.getClass(), (AbstractDispatcher) dispatcher, canalBinding.destination()); + } + + return bean; + } + + protected void detectBindingMethods(final Object bean, final Class beanType, final AbstractDispatcher dispatcher, + final String destination) { + ReflectionUtils.doWithMethods(beanType, (method)->{ + Method invocableMethod = AopUtils.selectInvocableMethod(method, beanType); + CanalEventListener canalEventListener = AnnotationUtils.findAnnotation(invocableMethod, + CanalEventListener.class); + + if(canalEventListener == null){ + return; + } + + String listenerName = EventListenerUtils.buildEventListenerName(destination, canalEventListener.schema(), + canalEventListener.table(), canalEventListener.eventType()); + dispatcher.getEventListenerRegistry().register(listenerName, bean, invocableMethod); + }); + } + } diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/package-info.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/package-info.java index 92ab8f3..19e6990 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/package-info.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/package-info.java @@ -26,4 +26,4 @@ * @author Yong.Teng * @since 0.0.1 */ -package com.buession.canal.spring.annotation; \ No newline at end of file +package com.buession.canal.spring.annotation.factory; \ No newline at end of file diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactory.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactory.java index 3b5e6dc..0d1941e 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactory.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactory.java @@ -24,10 +24,10 @@ */ package com.buession.canal.spring.client.factory; -import com.buession.canal.client.Binder; import com.buession.canal.client.CanalClient; +import com.buession.canal.client.CanalContext; +import com.buession.canal.client.dispatcher.Dispatcher; -import java.util.List; import java.util.concurrent.ExecutorService; /** @@ -39,9 +39,14 @@ public class CanalClientFactory { /** - * {@link Binder} 列表 + * {@link CanalContext} */ - private List binders; + private CanalContext context; + + /** + * 分发器 + */ + private Dispatcher dispatcher; /** * {@link ExecutorService} @@ -49,22 +54,41 @@ public class CanalClientFactory { private ExecutorService executor; /** - * 返回 {@link Binder} 列表 + * 返回 {@link CanalContext} + * + * @return {@link CanalContext} + */ + public CanalContext getContext() { + return context; + } + + /** + * 设置 {@link CanalContext} + * + * @param context + * {@link CanalContext} + */ + public void setContext(CanalContext context) { + this.context = context; + } + + /** + * 返回分发器 * - * @return {@link Binder} 列表 + * @return 分发器 */ - public List getBinders() { - return binders; + public Dispatcher getDispatcher() { + return dispatcher; } /** - * 设置 {@link Binder} 列表 + * 设置分发器 * - * @param binders - * {@link Binder} 列表 + * @param dispatcher + * 分发器 */ - public void setBinders(List binders) { - this.binders = binders; + public void setDispatcher(Dispatcher dispatcher) { + this.dispatcher = dispatcher; } /** diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java index 56b1344..d00398b 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java @@ -30,8 +30,6 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextStartedEvent; /** * {@link CanalClient} 工厂 bean @@ -39,8 +37,8 @@ * @author Yong.Teng * @since 0.0.1 */ -public class CanalClientFactoryBean extends CanalClientFactory implements ApplicationListener, - FactoryBean, InitializingBean, DisposableBean, AutoCloseable { +public class CanalClientFactoryBean extends CanalClientFactory implements FactoryBean, + InitializingBean, DisposableBean, AutoCloseable { private CanalClient canalClient; @@ -54,16 +52,12 @@ public Class getObjectType() { return canalClient.getClass(); } - @Override - public void onApplicationEvent(ContextStartedEvent event) { - - } - @Override public void afterPropertiesSet() throws Exception { - Assert.isNull(getBinders(), "Property 'binders' is required"); + Assert.isNull(getContext(), "Property 'context' is required"); + Assert.isNull(getDispatcher(), "Property 'dispatcher' is required"); - canalClient = new DefaultCanalClient(getBinders(), getExecutor()); + canalClient = new DefaultCanalClient(getContext(), getDispatcher(), getExecutor()); if(canalClient.isRunning() == false){ canalClient.start();