diff --git a/CHANGELOG.md b/CHANGELOG.md index 75da976..d50c029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,34 @@ =========================== +## [0.0.2](https://github.com/buession/buession-canal/releases/tag/v0.0.2) (2023-12-27) + +### 🔨依赖升级 + +- [依赖库版本升级和安全漏洞修复](https://github.com/buession/buession-parent/releases/tag/v2.3.2) + + +### 🐞 Bug 修复 + +- **buession-canal-core:** 修复 buession-beans bean 转换导致的数据丢失的 BUG +- **buession-canal-core:** 修复 @CanalEventListener 仅指定 schema 或 table 服务映射方法的 BUG +- **buession-canal-core:** 修复无法获取 @CanalEventListener schema 和 table 的 BUG +- **buession-canal-core:** 修复线程池线程序号错误的 BUG +- **buession-canal-core:** 修复 Table 设置数据库名和表名错误的 BUG +- **buession-canal-client:** 修复多实例下,实例不执行的 BUG +- **buession-canal-spring:** 修复注解 CanalBinding 无法重复定义 destination 的 BUG + + +### ⏪ 优化 + +- **buession-canal-core:** 代码优化 +- **buession-canal-spring:** 代码优化 +- **buession-canal-spring:** 优化 CanalClientFactoryBean 多次调用 afterPropertiesSet 时,重复初始化 CanalClient + + +--- + + ## [0.0.1](https://github.com/buession/buession-canal/releases/tag/v0.0.1) (2023-11-19) ### 🔨依赖升级 diff --git a/buession-canal-annotation/pom.xml b/buession-canal-annotation/pom.xml index aaea51b..6fe10e6 100644 --- a/buession-canal-annotation/pom.xml +++ b/buession-canal-annotation/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-annotation https://canal.buession.com/ diff --git a/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java b/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java index 8daba51..88140ba 100644 --- a/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java +++ b/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java @@ -28,7 +28,6 @@ import java.lang.annotation.Documented; import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @@ -41,7 +40,6 @@ */ @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) -@Inherited @Documented public @interface CanalEventListener { diff --git a/buession-canal-client/pom.xml b/buession-canal-client/pom.xml index 8b24895..beb2d90 100644 --- a/buession-canal-client/pom.xml +++ b/buession-canal-client/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-client https://canal.buession.com/ diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java index 86b0386..e971ff5 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java @@ -56,8 +56,6 @@ public abstract class AbstractCanalClient implements CanalClient { */ private final Dispatcher dispatcher; - private volatile boolean running = false; - private final Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -100,20 +98,13 @@ public void start() { adapterClient.init(); process(adapterClient, dispatcher, executor); }); - - running = true; } @Override public void stop() { logger.info("CanalClient stopping..."); + context.getAdapterClients().forEach(AdapterClient::close); executor.shutdown(); - running = false; - } - - @Override - public boolean isRunning() { - return running; } protected abstract void process(final AdapterClient adapterClient, final Dispatcher dispatcher, diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java index 31b7cfb..0031c9a 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java @@ -47,6 +47,7 @@ public interface CanalClient { * * @return true / false */ + @Deprecated default boolean isRunning() { return false; } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java index 965fe1d..62e9c46 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java @@ -66,11 +66,7 @@ public DefaultCanalClient(final CanalContext context, final Dispatcher dispatche @Override protected void process(final AdapterClient adapterClient, final Dispatcher dispatcher, final ExecutorService executor) { - executor.submit(()->{ - while(isRunning()){ - dispatcher.dispatch(adapterClient); - } - }); + executor.submit(()->dispatcher.dispatch(adapterClient)); } } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java index 8a8cb91..0100898 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java @@ -65,6 +65,13 @@ public abstract class AbstractAdapterClient implements @SuppressWarnings({"rawtypes"}) private MessageConverter messageConverter = new DefaultMessageConverter(); + /** + * 是否在运行 + * + * @since 0.0.2 + */ + private volatile boolean running = false; + /** * 构造函数 * @@ -123,6 +130,7 @@ public void init() throws CanalClientException { // 回滚到未进行 ack 的地方,下次 fetch 时,可以从最后一个没有 ack 的位置获取数据 connector.rollback(); + running = true; } @Override @@ -145,8 +153,14 @@ public void rollback() throws CanalClientException { connector.rollback(); } + @Override + public boolean isRunning() { + return running; + } + @Override public void close() throws CanalClientException { + running = false; connector.unsubscribe(); connector.disconnect(); } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java index a5d770a..0c18e86 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java @@ -149,6 +149,15 @@ public interface AdapterClient { */ void rollback() throws CanalClientException; + /** + * 判断 canal 客户端是否是开启状态 + * + * @return true / false + * + * @since 0.0.1 + */ + boolean isRunning(); + /** * 关闭客户端 * diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java index 4e00dc5..bbb1b23 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java @@ -37,9 +37,7 @@ import com.buession.canal.core.listener.support.EventTypeArgumentResolver; import com.buession.canal.core.listener.support.HeaderArgumentResolver; import com.buession.canal.core.listener.support.RowArgumentResolver; -import com.buession.canal.core.listener.support.RowArrayArgumentResolver; import com.buession.canal.core.listener.support.RowChangeArgumentResolver; -import com.buession.canal.core.listener.support.RowCollectionArgumentResolver; import com.buession.canal.core.listener.support.RowDataArgumentResolver; import com.buession.canal.core.listener.support.RowDataArrayArgumentResolver; import com.buession.canal.core.listener.support.RowDataCollectionArgumentResolver; @@ -78,23 +76,29 @@ public EventListenerRegistry getEventListenerRegistry() { public void dispatch(AdapterClient adapterClient) { Configuration configuration = adapterClient.getConfiguration(); - try{ - Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(), - TimeUnit.MILLISECONDS); + while(adapterClient.isRunning()){ + try{ + Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(), + TimeUnit.MILLISECONDS); - if(result != null && result.getMessages() != null){ - for(CanalMessage message : result.getMessages()){ - doDispatch(message); + if(logger.isDebugEnabled()){ + logger.debug("Return {} messages.", result == null ? 0 : result.getMessages().size()); + } + + if(result != null && result.getMessages() != null){ + for(CanalMessage message : result.getMessages()){ + doDispatch(message); + } } - } - if(result != null && result.getId() > -1){ - adapterClient.ack(result.getId()); - }else{ - adapterClient.ack(); + if(result != null && result.getId() > -1){ + adapterClient.ack(result.getId()); + }else{ + adapterClient.ack(); + } + }catch(Exception e){ + logger.error("Message handle error", e); } - }catch(Exception e){ - logger.error("Message handle error", e); } } @@ -112,7 +116,7 @@ protected void doDispatch(final CanalMessage canalMessage) throws Exception { protected abstract EventListenerMethod findMethod(final CanalMessage canalMessage); protected static List getDefaultArgumentResolvers() { - return ListBuilder.create(13) + return ListBuilder.create(11) .add(new DestinationArgumentResolver()) .add(new EntryTypeArgumentResolver()) .add(new EventTypeArgumentResolver()) @@ -122,8 +126,6 @@ protected static List getDefaultArgumentResolvers .add(new RowDataCollectionArgumentResolver()) .add(new RowDataArrayArgumentResolver()) .add(new RowArgumentResolver()) - .add(new RowCollectionArgumentResolver()) - .add(new RowArrayArgumentResolver()) .add(new SchemaArgumentResolver()) .add(new TableArgumentResolver()) .build(); diff --git a/buession-canal-core/pom.xml b/buession-canal-core/pom.xml index 7fad21b..95f8751 100644 --- a/buession-canal-core/pom.xml +++ b/buession-canal-core/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-core https://canal.buession.com/ diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java b/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java index f1c7a96..e355337 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java @@ -28,7 +28,6 @@ import java.io.Serializable; import java.util.List; -import java.util.Map; import java.util.StringJoiner; /** @@ -183,13 +182,14 @@ public void setData(List data) { @Override public String toString() { return new StringJoiner(", ", "CanalMessage[", "]") - .add("destination='" + destination + "'") + .add("destination=" + destination) .add("table=" + table) .add("entryType=" + entryType) .add("eventType=" + eventType) .add("isDdl=" + isDdl) .add("header=" + header) .add("rowChange=" + rowChange) + .add("data=" + data) .toString(); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java b/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java index 312088e..e1bff20 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java @@ -40,7 +40,7 @@ public class DefaultCanalThreadFactory implements ThreadFactory { private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); + private final AtomicInteger threadNumber = new AtomicInteger(0); private final String namePrefix; @@ -57,7 +57,7 @@ public Thread newThread(@NonNull Runnable runnable) { if(thread.getPriority() != Thread.NORM_PRIORITY){ thread.setPriority(Thread.NORM_PRIORITY); } - + thread.setDaemon(true); return thread; diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java index 13f647f..e103b5f 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java @@ -64,11 +64,11 @@ private CanalMessage doParseEntry(final String destination, final CanalEntry.Ent final List data = new ArrayList<>(rowChange.getRowDatasList().size()); rowChange.getRowDatasList().forEach((rowData)->data.addAll(rowData.getAfterColumnsList())); - + final CanalMessage canalMessage = new CanalMessage(); canalMessage.setDestination(destination); - canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName())); + canalMessage.setTable(new Table(entry.getHeader().getTableName(), entry.getHeader().getSchemaName())); canalMessage.setEntryType(entry.getEntryType()); canalMessage.setEventType(entry.getHeader().getEventType()); canalMessage.setHeader(entry.getHeader()); diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java index 7a02ca1..02278a5 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java @@ -65,7 +65,7 @@ private CanalMessage doParseEntry(final String destination, final FlatMessage me } canalMessage.setDestination(destination); - canalMessage.setTable(new Table(message.getDatabase(), message.getTable())); + canalMessage.setTable(new Table(message.getTable(), message.getDatabase())); canalMessage.setEventType(CanalEntry.EventType.valueOf(message.getType())); canalMessage.setData(columns); canalMessage.setDdl(message.getIsDdl()); diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java index 40a14a7..d7270d8 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java @@ -111,7 +111,7 @@ public Object invoke(final CanalMessage canalMessage) throws Exception { return doInvoke(args); } - protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) throws Exception { + private Object[] getMethodArgumentValues(final CanalMessage canalMessage) throws Exception { MethodParameter[] parameters = getParameters(); if(Validate.isEmpty(parameters)){ @@ -123,9 +123,8 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro for(int i = 0; i < parameters.length; i++){ MethodParameter parameter = parameters[i]; - if(argumentResolvers.supports(parameter) == false){ - throw new IllegalStateException(formatArgumentError(parameter, "No suitable resolver")); - } + Assert.isFalse(argumentResolvers.supports(parameter), + ()->new IllegalStateException(formatArgumentError(parameter, "No suitable resolver"))); try{ args[i] = argumentResolvers.resolve(parameter, canalMessage); @@ -143,7 +142,7 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro return args; } - protected Object doInvoke(Object... args) throws Exception { + private Object doInvoke(Object... args) throws Exception { ReflectionUtils.makeAccessible(getBridgedMethod()); return getBridgedMethod().invoke(getTarget(), args); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java index ffc8691..f2bd564 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java @@ -235,9 +235,8 @@ public boolean equals(Object obj) { if(obj instanceof MethodParameter){ MethodParameter that = (MethodParameter) obj; return Objects.equals(name, that.getName()) && Objects.equals(parameter, that.getParameter()) && - Objects.equals(getParameterType(), that.getParameterType()) && - Objects.equals(type, that.getType()) && - Arrays.equals(getAnnotations(), that.getAnnotations()); + Objects.equals(getParameterType(), that.getParameterType()) && Objects.equals(type, + that.getType()) && Arrays.equals(getAnnotations(), that.getAnnotations()); } return false; diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java index cb6f9fd..0e545b4 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java @@ -26,6 +26,7 @@ import com.buession.canal.core.CanalMessage; import com.buession.canal.core.listener.MethodParameter; +import com.buession.core.utils.Assert; import java.util.ArrayList; import java.util.Collections; @@ -72,10 +73,8 @@ public boolean supports(final MethodParameter parameter) { @Override public Object resolve(final MethodParameter parameter, final CanalMessage canalMessage) throws Exception { EventListenerArgumentResolver resolver = getArgumentResolver(parameter); - if(resolver == null){ - throw new IllegalArgumentException("Unsupported parameter type [" + - parameter.getType().getName() + "]. supports should be called first."); - } + Assert.isNull(resolver, ()->new IllegalArgumentException("Unsupported parameter type [" + + parameter.getType().getName() + "]. supports should be called first.")); return resolver.resolve(parameter, canalMessage); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java index f7cd1e1..528fae8 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java @@ -25,7 +25,6 @@ package com.buession.canal.core.listener.support; import com.alibaba.otter.canal.protocol.CanalEntry; -import com.buession.beans.BeanConverter; import com.buession.beans.BeanUtils; import com.buession.beans.DefaultBeanConverter; import com.buession.beans.converters.DatePropertyConverter; @@ -46,6 +45,8 @@ */ public class RowArgumentResolver implements EventListenerArgumentResolver { + private DefaultBeanConverter beanConverter; + @Override public boolean supports(MethodParameter parameter) { return parameter.hasAnnotation(Row.class); @@ -60,20 +61,23 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage List data = canalMessage.getData(); Map resultMap = new HashMap<>(data.size()); - data.forEach((row)->{ - resultMap.put(row.getName(), row.getValue()); - }); + data.forEach((row)->resultMap.put(row.getName(), row.getValue())); if(Map.class.isAssignableFrom(parameter.getParameterType())){ return resultMap; }else{ - final DefaultBeanConverter beanConverter = new DefaultBeanConverter(); final Object target = BeanUtils.instantiateClass(parameter.getParameterType()); + return getBeanConverter().convert(resultMap, target); + } + } + private DefaultBeanConverter getBeanConverter() { + if(beanConverter == null){ + beanConverter = new DefaultBeanConverter(); beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss")); - - return beanConverter.convert(resultMap, target); } + + return beanConverter; } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java deleted file mode 100644 index 6553247..0000000 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. - * See the NOTICE file distributed with this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - * ========================================================================================================= - * - * This software consists of voluntary contributions made by many individuals on behalf of the - * Apache Software Foundation. For more information on the Apache Software Foundation, please see - * . - * - * +-------------------------------------------------------------------------------------------------------+ - * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | - * | Author: Yong.Teng | - * | Copyright @ 2013-2023 Buession.com Inc. | - * +-------------------------------------------------------------------------------------------------------+ - */ -package com.buession.canal.core.listener.support; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.buession.canal.annotation.Row; -import com.buession.canal.core.CanalMessage; -import com.buession.canal.core.listener.MethodParameter; - -import java.util.List; - -/** - * 行数据数组参数解析器 - * - * @author Yong.Teng - * @since 0.0.1 - */ -public class RowArrayArgumentResolver implements EventListenerArgumentResolver { - - @Override - public boolean supports(MethodParameter parameter) { - return parameter.hasAnnotation(Row.class) && parameter.getParameterType().isArray(); - } - - @Override - public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getData() == null){ - return null; - } - - //List rowData = canalMessage.getRowChange().getRowDatasList(); - //return rowData.toArray(new CanalEntry.RowData[]{}); - return null; - } - -} diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java deleted file mode 100644 index 81b53ce..0000000 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. - * See the NOTICE file distributed with this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - * ========================================================================================================= - * - * This software consists of voluntary contributions made by many individuals on behalf of the - * Apache Software Foundation. For more information on the Apache Software Foundation, please see - * . - * - * +-------------------------------------------------------------------------------------------------------+ - * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | - * | Author: Yong.Teng | - * | Copyright @ 2013-2023 Buession.com Inc. | - * +-------------------------------------------------------------------------------------------------------+ - */ -package com.buession.canal.core.listener.support; - -import com.buession.canal.annotation.Row; -import com.buession.canal.core.CanalMessage; -import com.buession.canal.core.listener.MethodParameter; - -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -/** - * 行数据集合参数解析器 - * - * @author Yong.Teng - * @since 0.0.1 - */ -public class RowCollectionArgumentResolver implements EventListenerArgumentResolver { - - @Override - public boolean supports(MethodParameter parameter) { - return parameter.hasAnnotation(Row.class) && Collection.class.isAssignableFrom(parameter.getParameterType()); - } - - @Override - public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getData() == null){ - return null; - } - - /* - if(List.class.isAssignableFrom(parameter.getParameterType())){ - return canalMessage.getRowChange().getRowDatasList(); - }else if(Set.class.isAssignableFrom(parameter.getParameterType())){ - return new LinkedHashSet<>(canalMessage.getRowChange().getRowDatasList()); - }else{ - return null; - } - - */ - return null; - } - -} diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java index 17b754e..058843d 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java @@ -52,7 +52,7 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage return canalMessage.getTable(); } - if(parameter.hasAnnotation(Table.class)){ + if(parameter.hasAnnotation(Table.class) && CharSequence.class.isAssignableFrom(parameter.getType())){ return canalMessage.getTable().getName(); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java index 095130a..acc62b5 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java @@ -45,9 +45,13 @@ public static String buildEventListenerName(final String destination, final Stri sb.append(destination).append("$$"); if(Validate.hasText(schema) && Validate.hasText(table)){ - sb.append(schema).append('.').append(table).append("$$"); + sb.append(schema).append('.').append(table); + }else if(Validate.hasText(schema)){ + sb.append("schema:").append(schema); + }else if(Validate.hasText(table)){ + sb.append("table:").append(table); } - sb.append(eventType.name()); + sb.append("$$").append(eventType.name()); return sb.toString(); } diff --git a/buession-canal-parent/pom.xml b/buession-canal-parent/pom.xml index 8a2abcb..20e3ca1 100644 --- a/buession-canal-parent/pom.xml +++ b/buession-canal-parent/pom.xml @@ -7,13 +7,13 @@ com.buession parent - 2.3.1 + 2.3.2 com.buession.canal buession-canal-parent https://canal.buession.com/ Buession Canal Framework Parent - 0.0.1 + 0.0.2 pom @@ -61,7 +61,7 @@ - 2.3.1 + 2.3.2 1.1.7 diff --git a/buession-canal-spring/pom.xml b/buession-canal-spring/pom.xml index 168c10a..38aa30c 100644 --- a/buession-canal-spring/pom.xml +++ b/buession-canal-spring/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-spring https://canal.buession.com/ diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java index 3ab6a1f..310ae6c 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java @@ -25,7 +25,7 @@ package com.buession.canal.spring.annotation; import com.buession.canal.annotation.CanalBinding; -import com.buession.core.validator.Validate; +import com.buession.core.utils.Assert; import org.springframework.aop.scope.ScopedProxyFactoryBean; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; @@ -44,9 +44,7 @@ import org.springframework.lang.NonNull; import org.springframework.util.ClassUtils; -import java.util.HashSet; import java.util.Optional; -import java.util.Set; /** * {@link CanalBinding} 扫描器 @@ -61,8 +59,6 @@ class CanalBindingClassPathMapperScanner extends ClassPathBeanDefinitionScanner */ private boolean lazyInitialization; - private final Set destinations = new HashSet<>(); - /** * 构造函数 * @@ -123,23 +119,13 @@ private void processBeanDefinition(final AbstractBeanDefinition beanDefinition) return; } - if(Validate.isBlank(canalBinding.destination())){ - throw new IllegalStateException( - "Either 'destination' must be required in @CanalBinding for: " + beanClassName); - } - - if(destinations.contains(canalBinding.destination())){ - throw new IllegalStateException( - "The destination: " + canalBinding.destination() + " already exists in @CanalBinding for: " + - beanClassName); - } + Assert.isBlank(canalBinding.destination(), ()->new IllegalStateException( + "Either 'destination' must be required in @CanalBinding for: " + beanClassName)); beanDefinition.setLazyInit(lazyInitialization); beanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, beanClassName); - - destinations.add(canalBinding.destination()); } } diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java index e638bb7..c46b2ff 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java @@ -27,13 +27,13 @@ import com.buession.canal.annotation.CanalBinding; import com.buession.canal.annotation.CanalEventListener; import com.buession.canal.client.dispatcher.AbstractDispatcher; -import com.buession.canal.client.dispatcher.Dispatcher; import com.buession.canal.core.listener.utils.EventListenerUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.lang.NonNull; import org.springframework.util.ReflectionUtils; @@ -58,28 +58,29 @@ public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull Stri CanalBinding canalBinding = AnnotationUtils.findAnnotation(bean.getClass(), CanalBinding.class); if(canalBinding != null){ - Dispatcher dispatcher = beanFactory.getBean(Dispatcher.class); - detectBindingMethods(bean, bean.getClass(), (AbstractDispatcher) dispatcher, canalBinding.destination()); + AbstractDispatcher dispatcher = beanFactory.getBean(AbstractDispatcher.class); + + ReflectionUtils.doWithMethods(bean.getClass(), (method)->{ + CanalEventListener canalEventListener = AnnotatedElementUtils.findMergedAnnotation(method, + CanalEventListener.class); + + if(canalEventListener != null){ + Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); + detectBindingMethod(bean, invocableMethod, canalEventListener, dispatcher, + canalBinding.destination()); + } + }); } return bean; } - protected void detectBindingMethods(final Object bean, final Class beanType, final AbstractDispatcher dispatcher, - final String destination) { - ReflectionUtils.doWithMethods(beanType, (method)->{ - Method invocableMethod = AopUtils.selectInvocableMethod(method, beanType); - CanalEventListener canalEventListener = AnnotationUtils.findAnnotation(invocableMethod, - CanalEventListener.class); - - if(canalEventListener == null){ - return; - } - - String listenerName = EventListenerUtils.buildEventListenerName(destination, - canalEventListener.schema(), canalEventListener.table(), canalEventListener.eventType()); - dispatcher.getEventListenerRegistry().register(listenerName, bean, invocableMethod); - }); + protected void detectBindingMethod(final Object bean, final Method method, + final CanalEventListener canalEventListener, + final AbstractDispatcher dispatcher, final String destination) { + final String listenerName = EventListenerUtils.buildEventListenerName(destination, + canalEventListener.schema(), canalEventListener.table(), canalEventListener.eventType()); + dispatcher.getEventListenerRegistry().register(listenerName, bean, method); } } diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java index d00398b..fe806cc 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java @@ -57,9 +57,8 @@ public void afterPropertiesSet() throws Exception { Assert.isNull(getContext(), "Property 'context' is required"); Assert.isNull(getDispatcher(), "Property 'dispatcher' is required"); - canalClient = new DefaultCanalClient(getContext(), getDispatcher(), getExecutor()); - - if(canalClient.isRunning() == false){ + if(canalClient == null){ + canalClient = new DefaultCanalClient(getContext(), getDispatcher(), getExecutor()); canalClient.start(); } }