From 457063023f414dacadcb94e9ae50ebc045979b1b Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Mon, 20 Nov 2023 13:14:36 +0800 Subject: [PATCH 01/10] Upgrade 0.0.2 --- CHANGELOG.md | 10 ++++++++++ buession-canal-annotation/pom.xml | 2 +- buession-canal-client/pom.xml | 2 +- buession-canal-core/pom.xml | 2 +- buession-canal-parent/pom.xml | 4 ++-- buession-canal-spring/pom.xml | 2 +- 6 files changed, 16 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75da976..7c93fba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ =========================== +## [0.0.2](https://github.com/buession/buession-canal/releases/tag/v0.0.2) (2023-xx-xx) + +### 🔨依赖升级 + +- [依赖库版本升级和安全漏洞修复](https://github.com/buession/buession-parent/releases/tag/v2.3.2) + + +--- + + ## [0.0.1](https://github.com/buession/buession-canal/releases/tag/v0.0.1) (2023-11-19) ### 🔨依赖升级 diff --git a/buession-canal-annotation/pom.xml b/buession-canal-annotation/pom.xml index aaea51b..6fe10e6 100644 --- a/buession-canal-annotation/pom.xml +++ b/buession-canal-annotation/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-annotation https://canal.buession.com/ diff --git a/buession-canal-client/pom.xml b/buession-canal-client/pom.xml index 8b24895..beb2d90 100644 --- a/buession-canal-client/pom.xml +++ b/buession-canal-client/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-client https://canal.buession.com/ diff --git a/buession-canal-core/pom.xml b/buession-canal-core/pom.xml index 7fad21b..95f8751 100644 --- a/buession-canal-core/pom.xml +++ b/buession-canal-core/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-core https://canal.buession.com/ diff --git a/buession-canal-parent/pom.xml b/buession-canal-parent/pom.xml index 8a2abcb..ae4e563 100644 --- a/buession-canal-parent/pom.xml +++ b/buession-canal-parent/pom.xml @@ -7,13 +7,13 @@ com.buession parent - 2.3.1 + 2.3.2 com.buession.canal buession-canal-parent https://canal.buession.com/ Buession Canal Framework Parent - 0.0.1 + 0.0.2 pom diff --git a/buession-canal-spring/pom.xml b/buession-canal-spring/pom.xml index 168c10a..38aa30c 100644 --- a/buession-canal-spring/pom.xml +++ b/buession-canal-spring/pom.xml @@ -7,7 +7,7 @@ com.buession.canal buession-canal-parent ../buession-canal-parent - 0.0.1 + 0.0.2 buession-canal-spring https://canal.buession.com/ From 9b89e77b8e6d2109a4ef05494643e4306395c4f5 Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Mon, 20 Nov 2023 23:14:21 +0800 Subject: [PATCH 02/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20@CanalEventListener?= =?UTF-8?q?=20=E4=BB=85=E6=8C=87=E5=AE=9A=20schema=20=E6=88=96=20table=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=98=A0=E5=B0=84=E6=96=B9=E6=B3=95=E7=9A=84?= =?UTF-8?q?=20BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 6 ++++++ .../canal/core/listener/utils/EventListenerUtils.java | 8 ++++++-- buession-canal-parent/pom.xml | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c93fba..eb4feab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ - [依赖库版本升级和安全漏洞修复](https://github.com/buession/buession-parent/releases/tag/v2.3.2) +### 🐞 Bug 修复 + +- **buession-canal-core:** 修复 buession-beans bean 转换导致的数据丢失的 BUG +- **buession-canal-core:** 修复 @CanalEventListener 仅指定 schema 或 table 服务映射方法的 BUG + + --- diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java index 095130a..acc62b5 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java @@ -45,9 +45,13 @@ public static String buildEventListenerName(final String destination, final Stri sb.append(destination).append("$$"); if(Validate.hasText(schema) && Validate.hasText(table)){ - sb.append(schema).append('.').append(table).append("$$"); + sb.append(schema).append('.').append(table); + }else if(Validate.hasText(schema)){ + sb.append("schema:").append(schema); + }else if(Validate.hasText(table)){ + sb.append("table:").append(table); } - sb.append(eventType.name()); + sb.append("$$").append(eventType.name()); return sb.toString(); } diff --git a/buession-canal-parent/pom.xml b/buession-canal-parent/pom.xml index ae4e563..20e3ca1 100644 --- a/buession-canal-parent/pom.xml +++ b/buession-canal-parent/pom.xml @@ -61,7 +61,7 @@ - 2.3.1 + 2.3.2 1.1.7 From ed683e70d3ece0d4ef1be57d99adfc0d7510924b Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Wed, 22 Nov 2023 22:32:10 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/dispatcher/AbstractDispatcher.java | 6 +- .../listener/support/RowArgumentResolver.java | 24 ++++--- .../support/RowArrayArgumentResolver.java | 58 ---------------- .../RowCollectionArgumentResolver.java | 68 ------------------- 4 files changed, 17 insertions(+), 139 deletions(-) delete mode 100644 buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java delete mode 100644 buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java index 4e00dc5..414c5a6 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java @@ -37,9 +37,7 @@ import com.buession.canal.core.listener.support.EventTypeArgumentResolver; import com.buession.canal.core.listener.support.HeaderArgumentResolver; import com.buession.canal.core.listener.support.RowArgumentResolver; -import com.buession.canal.core.listener.support.RowArrayArgumentResolver; import com.buession.canal.core.listener.support.RowChangeArgumentResolver; -import com.buession.canal.core.listener.support.RowCollectionArgumentResolver; import com.buession.canal.core.listener.support.RowDataArgumentResolver; import com.buession.canal.core.listener.support.RowDataArrayArgumentResolver; import com.buession.canal.core.listener.support.RowDataCollectionArgumentResolver; @@ -112,7 +110,7 @@ protected void doDispatch(final CanalMessage canalMessage) throws Exception { protected abstract EventListenerMethod findMethod(final CanalMessage canalMessage); protected static List getDefaultArgumentResolvers() { - return ListBuilder.create(13) + return ListBuilder.create(11) .add(new DestinationArgumentResolver()) .add(new EntryTypeArgumentResolver()) .add(new EventTypeArgumentResolver()) @@ -122,8 +120,6 @@ protected static List getDefaultArgumentResolvers .add(new RowDataCollectionArgumentResolver()) .add(new RowDataArrayArgumentResolver()) .add(new RowArgumentResolver()) - .add(new RowCollectionArgumentResolver()) - .add(new RowArrayArgumentResolver()) .add(new SchemaArgumentResolver()) .add(new TableArgumentResolver()) .build(); diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java index f7cd1e1..616901f 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java @@ -25,7 +25,6 @@ package com.buession.canal.core.listener.support; import com.alibaba.otter.canal.protocol.CanalEntry; -import com.buession.beans.BeanConverter; import com.buession.beans.BeanUtils; import com.buession.beans.DefaultBeanConverter; import com.buession.beans.converters.DatePropertyConverter; @@ -46,6 +45,8 @@ */ public class RowArgumentResolver implements EventListenerArgumentResolver { + private volatile DefaultBeanConverter beanConverter; + @Override public boolean supports(MethodParameter parameter) { return parameter.hasAnnotation(Row.class); @@ -60,20 +61,27 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage List data = canalMessage.getData(); Map resultMap = new HashMap<>(data.size()); - data.forEach((row)->{ - resultMap.put(row.getName(), row.getValue()); - }); + data.forEach((row)->resultMap.put(row.getName(), row.getValue())); if(Map.class.isAssignableFrom(parameter.getParameterType())){ return resultMap; }else{ - final DefaultBeanConverter beanConverter = new DefaultBeanConverter(); final Object target = BeanUtils.instantiateClass(parameter.getParameterType()); + return getBeanConverter().convert(resultMap, target); + } + } - beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss")); - - return beanConverter.convert(resultMap, target); + private DefaultBeanConverter getBeanConverter() { + if(beanConverter == null){ + synchronized(this){ + if(beanConverter == null){ + beanConverter = new DefaultBeanConverter(); + beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss")); + } + } } + + return beanConverter; } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java deleted file mode 100644 index 6553247..0000000 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. - * See the NOTICE file distributed with this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - * ========================================================================================================= - * - * This software consists of voluntary contributions made by many individuals on behalf of the - * Apache Software Foundation. For more information on the Apache Software Foundation, please see - * . - * - * +-------------------------------------------------------------------------------------------------------+ - * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | - * | Author: Yong.Teng | - * | Copyright @ 2013-2023 Buession.com Inc. | - * +-------------------------------------------------------------------------------------------------------+ - */ -package com.buession.canal.core.listener.support; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.buession.canal.annotation.Row; -import com.buession.canal.core.CanalMessage; -import com.buession.canal.core.listener.MethodParameter; - -import java.util.List; - -/** - * 行数据数组参数解析器 - * - * @author Yong.Teng - * @since 0.0.1 - */ -public class RowArrayArgumentResolver implements EventListenerArgumentResolver { - - @Override - public boolean supports(MethodParameter parameter) { - return parameter.hasAnnotation(Row.class) && parameter.getParameterType().isArray(); - } - - @Override - public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getData() == null){ - return null; - } - - //List rowData = canalMessage.getRowChange().getRowDatasList(); - //return rowData.toArray(new CanalEntry.RowData[]{}); - return null; - } - -} diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java deleted file mode 100644 index 81b53ce..0000000 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. - * See the NOTICE file distributed with this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and limitations under the License. - * - * ========================================================================================================= - * - * This software consists of voluntary contributions made by many individuals on behalf of the - * Apache Software Foundation. For more information on the Apache Software Foundation, please see - * . - * - * +-------------------------------------------------------------------------------------------------------+ - * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | - * | Author: Yong.Teng | - * | Copyright @ 2013-2023 Buession.com Inc. | - * +-------------------------------------------------------------------------------------------------------+ - */ -package com.buession.canal.core.listener.support; - -import com.buession.canal.annotation.Row; -import com.buession.canal.core.CanalMessage; -import com.buession.canal.core.listener.MethodParameter; - -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -/** - * 行数据集合参数解析器 - * - * @author Yong.Teng - * @since 0.0.1 - */ -public class RowCollectionArgumentResolver implements EventListenerArgumentResolver { - - @Override - public boolean supports(MethodParameter parameter) { - return parameter.hasAnnotation(Row.class) && Collection.class.isAssignableFrom(parameter.getParameterType()); - } - - @Override - public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getData() == null){ - return null; - } - - /* - if(List.class.isAssignableFrom(parameter.getParameterType())){ - return canalMessage.getRowChange().getRowDatasList(); - }else if(Set.class.isAssignableFrom(parameter.getParameterType())){ - return new LinkedHashSet<>(canalMessage.getRowChange().getRowDatasList()); - }else{ - return null; - } - - */ - return null; - } - -} From 3d568f07ee4177c4de8e30b150bf4a6904082e27 Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Thu, 30 Nov 2023 22:31:20 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=20@CanalEventListener=20schema=20=E5=92=8C?= =?UTF-8?q?=20table=20=E7=9A=84=20BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 1 + .../canal/annotation/CanalEventListener.java | 2 - .../client/dispatcher/AbstractDispatcher.java | 4 ++ .../com/buession/canal/core/CanalMessage.java | 4 +- .../canal/core/listener/MethodParameter.java | 5 +-- .../listener/support/RowArgumentResolver.java | 10 ++--- .../support/TableArgumentResolver.java | 2 +- .../CanalBindingClassPathMapperScanner.java | 17 +++------ .../CanalBindingBeanPostProcessor.java | 37 ++++++++++--------- 9 files changed, 38 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb4feab..3f35ffd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - **buession-canal-core:** 修复 buession-beans bean 转换导致的数据丢失的 BUG - **buession-canal-core:** 修复 @CanalEventListener 仅指定 schema 或 table 服务映射方法的 BUG +- **buession-canal-core:** 修复无法获取 @CanalEventListener schema 和 table 的 BUG --- diff --git a/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java b/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java index 8daba51..88140ba 100644 --- a/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java +++ b/buession-canal-annotation/src/main/java/com/buession/canal/annotation/CanalEventListener.java @@ -28,7 +28,6 @@ import java.lang.annotation.Documented; import java.lang.annotation.ElementType; -import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @@ -41,7 +40,6 @@ */ @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) -@Inherited @Documented public @interface CanalEventListener { diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java index 414c5a6..45986a7 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java @@ -80,6 +80,10 @@ public void dispatch(AdapterClient adapterClient) { Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(), TimeUnit.MILLISECONDS); + if(logger.isDebugEnabled()){ + logger.debug("Return {} messages.", result == null ? 0 : result.getMessages().size()); + } + if(result != null && result.getMessages() != null){ for(CanalMessage message : result.getMessages()){ doDispatch(message); diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java b/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java index f1c7a96..e355337 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java @@ -28,7 +28,6 @@ import java.io.Serializable; import java.util.List; -import java.util.Map; import java.util.StringJoiner; /** @@ -183,13 +182,14 @@ public void setData(List data) { @Override public String toString() { return new StringJoiner(", ", "CanalMessage[", "]") - .add("destination='" + destination + "'") + .add("destination=" + destination) .add("table=" + table) .add("entryType=" + entryType) .add("eventType=" + eventType) .add("isDdl=" + isDdl) .add("header=" + header) .add("rowChange=" + rowChange) + .add("data=" + data) .toString(); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java index ffc8691..f2bd564 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java @@ -235,9 +235,8 @@ public boolean equals(Object obj) { if(obj instanceof MethodParameter){ MethodParameter that = (MethodParameter) obj; return Objects.equals(name, that.getName()) && Objects.equals(parameter, that.getParameter()) && - Objects.equals(getParameterType(), that.getParameterType()) && - Objects.equals(type, that.getType()) && - Arrays.equals(getAnnotations(), that.getAnnotations()); + Objects.equals(getParameterType(), that.getParameterType()) && Objects.equals(type, + that.getType()) && Arrays.equals(getAnnotations(), that.getAnnotations()); } return false; diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java index 616901f..528fae8 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java @@ -45,7 +45,7 @@ */ public class RowArgumentResolver implements EventListenerArgumentResolver { - private volatile DefaultBeanConverter beanConverter; + private DefaultBeanConverter beanConverter; @Override public boolean supports(MethodParameter parameter) { @@ -73,12 +73,8 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage private DefaultBeanConverter getBeanConverter() { if(beanConverter == null){ - synchronized(this){ - if(beanConverter == null){ - beanConverter = new DefaultBeanConverter(); - beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss")); - } - } + beanConverter = new DefaultBeanConverter(); + beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss")); } return beanConverter; diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java index 17b754e..058843d 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java @@ -52,7 +52,7 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage return canalMessage.getTable(); } - if(parameter.hasAnnotation(Table.class)){ + if(parameter.hasAnnotation(Table.class) && CharSequence.class.isAssignableFrom(parameter.getType())){ return canalMessage.getTable().getName(); } diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java index 3ab6a1f..7cc9f63 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java @@ -25,7 +25,7 @@ package com.buession.canal.spring.annotation; import com.buession.canal.annotation.CanalBinding; -import com.buession.core.validator.Validate; +import com.buession.core.utils.Assert; import org.springframework.aop.scope.ScopedProxyFactoryBean; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition; @@ -123,16 +123,11 @@ private void processBeanDefinition(final AbstractBeanDefinition beanDefinition) return; } - if(Validate.isBlank(canalBinding.destination())){ - throw new IllegalStateException( - "Either 'destination' must be required in @CanalBinding for: " + beanClassName); - } - - if(destinations.contains(canalBinding.destination())){ - throw new IllegalStateException( - "The destination: " + canalBinding.destination() + " already exists in @CanalBinding for: " + - beanClassName); - } + Assert.isBlank(canalBinding.destination(), ()->new IllegalStateException( + "Either 'destination' must be required in @CanalBinding for: " + beanClassName)); + Assert.isTrue(destinations.contains(canalBinding.destination()), ()->new IllegalStateException( + "The destination: " + canalBinding.destination() + " already exists in @CanalBinding for: " + + beanClassName)); beanDefinition.setLazyInit(lazyInitialization); beanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java index e638bb7..c46b2ff 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/factory/CanalBindingBeanPostProcessor.java @@ -27,13 +27,13 @@ import com.buession.canal.annotation.CanalBinding; import com.buession.canal.annotation.CanalEventListener; import com.buession.canal.client.dispatcher.AbstractDispatcher; -import com.buession.canal.client.dispatcher.Dispatcher; import com.buession.canal.core.listener.utils.EventListenerUtils; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.lang.NonNull; import org.springframework.util.ReflectionUtils; @@ -58,28 +58,29 @@ public Object postProcessAfterInitialization(@NonNull Object bean, @NonNull Stri CanalBinding canalBinding = AnnotationUtils.findAnnotation(bean.getClass(), CanalBinding.class); if(canalBinding != null){ - Dispatcher dispatcher = beanFactory.getBean(Dispatcher.class); - detectBindingMethods(bean, bean.getClass(), (AbstractDispatcher) dispatcher, canalBinding.destination()); + AbstractDispatcher dispatcher = beanFactory.getBean(AbstractDispatcher.class); + + ReflectionUtils.doWithMethods(bean.getClass(), (method)->{ + CanalEventListener canalEventListener = AnnotatedElementUtils.findMergedAnnotation(method, + CanalEventListener.class); + + if(canalEventListener != null){ + Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); + detectBindingMethod(bean, invocableMethod, canalEventListener, dispatcher, + canalBinding.destination()); + } + }); } return bean; } - protected void detectBindingMethods(final Object bean, final Class beanType, final AbstractDispatcher dispatcher, - final String destination) { - ReflectionUtils.doWithMethods(beanType, (method)->{ - Method invocableMethod = AopUtils.selectInvocableMethod(method, beanType); - CanalEventListener canalEventListener = AnnotationUtils.findAnnotation(invocableMethod, - CanalEventListener.class); - - if(canalEventListener == null){ - return; - } - - String listenerName = EventListenerUtils.buildEventListenerName(destination, - canalEventListener.schema(), canalEventListener.table(), canalEventListener.eventType()); - dispatcher.getEventListenerRegistry().register(listenerName, bean, invocableMethod); - }); + protected void detectBindingMethod(final Object bean, final Method method, + final CanalEventListener canalEventListener, + final AbstractDispatcher dispatcher, final String destination) { + final String listenerName = EventListenerUtils.buildEventListenerName(destination, + canalEventListener.schema(), canalEventListener.table(), canalEventListener.eventType()); + dispatcher.getEventListenerRegistry().register(listenerName, bean, method); } } From 7a671ca680e23ca01d172ed55510e650abb8d864 Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Sat, 2 Dec 2023 00:16:52 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=9A=E5=AE=9E?= =?UTF-8?q?=E4=BE=8B=E4=B8=8B=EF=BC=8C=E5=AE=9E=E4=BE=8B=E4=B8=8D=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E7=9A=84=20BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../canal/client/AbstractCanalClient.java | 14 ++------ .../buession/canal/client/CanalClient.java | 1 + .../canal/client/DefaultCanalClient.java | 6 +--- .../client/adapter/AbstractAdapterClient.java | 14 ++++++++ .../canal/client/adapter/AdapterClient.java | 9 +++++ .../client/dispatcher/AbstractDispatcher.java | 34 ++++++++++--------- 6 files changed, 46 insertions(+), 32 deletions(-) diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java index 86b0386..f2b767f 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java @@ -56,8 +56,6 @@ public abstract class AbstractCanalClient implements CanalClient { */ private final Dispatcher dispatcher; - private volatile boolean running = false; - private final Logger logger = LoggerFactory.getLogger(getClass()); /** @@ -89,7 +87,8 @@ public AbstractCanalClient(final CanalContext context, final Dispatcher dispatch Assert.isNull(executor, "The ExecutorService is required"); this.context = context; this.dispatcher = dispatcher; - this.executor = executor; + this.executor = new DefaultCanalThreadPoolExecutor("canal", context.getAdapterClients().size(), + context.getAdapterClients().size(), 10 * 1000L); } @Override @@ -100,20 +99,13 @@ public void start() { adapterClient.init(); process(adapterClient, dispatcher, executor); }); - - running = true; } @Override public void stop() { logger.info("CanalClient stopping..."); + context.getAdapterClients().forEach(AdapterClient::close); executor.shutdown(); - running = false; - } - - @Override - public boolean isRunning() { - return running; } protected abstract void process(final AdapterClient adapterClient, final Dispatcher dispatcher, diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java index 31b7cfb..0031c9a 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/CanalClient.java @@ -47,6 +47,7 @@ public interface CanalClient { * * @return true / false */ + @Deprecated default boolean isRunning() { return false; } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java index 965fe1d..62e9c46 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java @@ -66,11 +66,7 @@ public DefaultCanalClient(final CanalContext context, final Dispatcher dispatche @Override protected void process(final AdapterClient adapterClient, final Dispatcher dispatcher, final ExecutorService executor) { - executor.submit(()->{ - while(isRunning()){ - dispatcher.dispatch(adapterClient); - } - }); + executor.submit(()->dispatcher.dispatch(adapterClient)); } } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java index 8a8cb91..0100898 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java @@ -65,6 +65,13 @@ public abstract class AbstractAdapterClient implements @SuppressWarnings({"rawtypes"}) private MessageConverter messageConverter = new DefaultMessageConverter(); + /** + * 是否在运行 + * + * @since 0.0.2 + */ + private volatile boolean running = false; + /** * 构造函数 * @@ -123,6 +130,7 @@ public void init() throws CanalClientException { // 回滚到未进行 ack 的地方,下次 fetch 时,可以从最后一个没有 ack 的位置获取数据 connector.rollback(); + running = true; } @Override @@ -145,8 +153,14 @@ public void rollback() throws CanalClientException { connector.rollback(); } + @Override + public boolean isRunning() { + return running; + } + @Override public void close() throws CanalClientException { + running = false; connector.unsubscribe(); connector.disconnect(); } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java index a5d770a..0c18e86 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java @@ -149,6 +149,15 @@ public interface AdapterClient { */ void rollback() throws CanalClientException; + /** + * 判断 canal 客户端是否是开启状态 + * + * @return true / false + * + * @since 0.0.1 + */ + boolean isRunning(); + /** * 关闭客户端 * diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java index 45986a7..bbb1b23 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java @@ -76,27 +76,29 @@ public EventListenerRegistry getEventListenerRegistry() { public void dispatch(AdapterClient adapterClient) { Configuration configuration = adapterClient.getConfiguration(); - try{ - Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(), - TimeUnit.MILLISECONDS); + while(adapterClient.isRunning()){ + try{ + Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(), + TimeUnit.MILLISECONDS); - if(logger.isDebugEnabled()){ - logger.debug("Return {} messages.", result == null ? 0 : result.getMessages().size()); - } + if(logger.isDebugEnabled()){ + logger.debug("Return {} messages.", result == null ? 0 : result.getMessages().size()); + } - if(result != null && result.getMessages() != null){ - for(CanalMessage message : result.getMessages()){ - doDispatch(message); + if(result != null && result.getMessages() != null){ + for(CanalMessage message : result.getMessages()){ + doDispatch(message); + } } - } - if(result != null && result.getId() > -1){ - adapterClient.ack(result.getId()); - }else{ - adapterClient.ack(); + if(result != null && result.getId() > -1){ + adapterClient.ack(result.getId()); + }else{ + adapterClient.ack(); + } + }catch(Exception e){ + logger.error("Message handle error", e); } - }catch(Exception e){ - logger.error("Message handle error", e); } } From 833739b8ea44b1c9b5b9e29909e3b886e44aa944 Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Sat, 2 Dec 2023 00:18:10 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E7=BA=BF=E7=A8=8B=E5=BA=8F=E5=8F=B7=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E7=9A=84=20BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../canal/core/concurrent/DefaultCanalThreadFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java b/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java index 312088e..e1bff20 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/concurrent/DefaultCanalThreadFactory.java @@ -40,7 +40,7 @@ public class DefaultCanalThreadFactory implements ThreadFactory { private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger(1); + private final AtomicInteger threadNumber = new AtomicInteger(0); private final String namePrefix; @@ -57,7 +57,7 @@ public Thread newThread(@NonNull Runnable runnable) { if(thread.getPriority() != Thread.NORM_PRIORITY){ thread.setPriority(Thread.NORM_PRIORITY); } - + thread.setDaemon(true); return thread; From 11d494a0ac8c66d3034bf91691a4641130f01d64 Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Sat, 2 Dec 2023 00:19:06 +0800 Subject: [PATCH 07/10] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Table=20=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E6=95=B0=E6=8D=AE=E5=BA=93=E5=90=8D=E5=92=8C=E8=A1=A8?= =?UTF-8?q?=E5=90=8D=E9=94=99=E8=AF=AF=E7=9A=84=20BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../buession/canal/core/convert/DefaultMessageConverter.java | 4 ++-- .../com/buession/canal/core/convert/FlatMessageConverter.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java index 13f647f..e103b5f 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java @@ -64,11 +64,11 @@ private CanalMessage doParseEntry(final String destination, final CanalEntry.Ent final List data = new ArrayList<>(rowChange.getRowDatasList().size()); rowChange.getRowDatasList().forEach((rowData)->data.addAll(rowData.getAfterColumnsList())); - + final CanalMessage canalMessage = new CanalMessage(); canalMessage.setDestination(destination); - canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName())); + canalMessage.setTable(new Table(entry.getHeader().getTableName(), entry.getHeader().getSchemaName())); canalMessage.setEntryType(entry.getEntryType()); canalMessage.setEventType(entry.getHeader().getEventType()); canalMessage.setHeader(entry.getHeader()); diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java index 7a02ca1..02278a5 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java @@ -65,7 +65,7 @@ private CanalMessage doParseEntry(final String destination, final FlatMessage me } canalMessage.setDestination(destination); - canalMessage.setTable(new Table(message.getDatabase(), message.getTable())); + canalMessage.setTable(new Table(message.getTable(), message.getDatabase())); canalMessage.setEventType(CanalEntry.EventType.valueOf(message.getType())); canalMessage.setData(columns); canalMessage.setDdl(message.getIsDdl()); From 083aaa0fe8b1888822fff93ba794fcd3f2b54fde Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Sat, 2 Dec 2023 00:19:54 +0800 Subject: [PATCH 08/10] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 4 ++++ .../com/buession/canal/client/AbstractCanalClient.java | 3 +-- .../canal/core/listener/EventListenerMethod.java | 9 ++++----- .../support/EventListenerArgumentResolverComposite.java | 7 +++---- .../annotation/CanalBindingClassPathMapperScanner.java | 9 --------- 5 files changed, 12 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f35ffd..38de585 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ - **buession-canal-core:** 修复 buession-beans bean 转换导致的数据丢失的 BUG - **buession-canal-core:** 修复 @CanalEventListener 仅指定 schema 或 table 服务映射方法的 BUG - **buession-canal-core:** 修复无法获取 @CanalEventListener schema 和 table 的 BUG +- **buession-canal-core:** 修复线程池线程序号错误的 BUG +- **buession-canal-core:** 修复 Table 设置数据库名和表名错误的 BUG +- **buession-canal-client:** 修复多实例下,实例不执行的 BUG +- **buession-canal-spring:** 修复注解 CanalBinding 无法重复定义 destination 的 BUG --- diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java index f2b767f..e971ff5 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java @@ -87,8 +87,7 @@ public AbstractCanalClient(final CanalContext context, final Dispatcher dispatch Assert.isNull(executor, "The ExecutorService is required"); this.context = context; this.dispatcher = dispatcher; - this.executor = new DefaultCanalThreadPoolExecutor("canal", context.getAdapterClients().size(), - context.getAdapterClients().size(), 10 * 1000L); + this.executor = executor; } @Override diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java index 40a14a7..d7270d8 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java @@ -111,7 +111,7 @@ public Object invoke(final CanalMessage canalMessage) throws Exception { return doInvoke(args); } - protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) throws Exception { + private Object[] getMethodArgumentValues(final CanalMessage canalMessage) throws Exception { MethodParameter[] parameters = getParameters(); if(Validate.isEmpty(parameters)){ @@ -123,9 +123,8 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro for(int i = 0; i < parameters.length; i++){ MethodParameter parameter = parameters[i]; - if(argumentResolvers.supports(parameter) == false){ - throw new IllegalStateException(formatArgumentError(parameter, "No suitable resolver")); - } + Assert.isFalse(argumentResolvers.supports(parameter), + ()->new IllegalStateException(formatArgumentError(parameter, "No suitable resolver"))); try{ args[i] = argumentResolvers.resolve(parameter, canalMessage); @@ -143,7 +142,7 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro return args; } - protected Object doInvoke(Object... args) throws Exception { + private Object doInvoke(Object... args) throws Exception { ReflectionUtils.makeAccessible(getBridgedMethod()); return getBridgedMethod().invoke(getTarget(), args); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java index cb6f9fd..0e545b4 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/EventListenerArgumentResolverComposite.java @@ -26,6 +26,7 @@ import com.buession.canal.core.CanalMessage; import com.buession.canal.core.listener.MethodParameter; +import com.buession.core.utils.Assert; import java.util.ArrayList; import java.util.Collections; @@ -72,10 +73,8 @@ public boolean supports(final MethodParameter parameter) { @Override public Object resolve(final MethodParameter parameter, final CanalMessage canalMessage) throws Exception { EventListenerArgumentResolver resolver = getArgumentResolver(parameter); - if(resolver == null){ - throw new IllegalArgumentException("Unsupported parameter type [" + - parameter.getType().getName() + "]. supports should be called first."); - } + Assert.isNull(resolver, ()->new IllegalArgumentException("Unsupported parameter type [" + + parameter.getType().getName() + "]. supports should be called first.")); return resolver.resolve(parameter, canalMessage); } diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java index 7cc9f63..310ae6c 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/annotation/CanalBindingClassPathMapperScanner.java @@ -44,9 +44,7 @@ import org.springframework.lang.NonNull; import org.springframework.util.ClassUtils; -import java.util.HashSet; import java.util.Optional; -import java.util.Set; /** * {@link CanalBinding} 扫描器 @@ -61,8 +59,6 @@ class CanalBindingClassPathMapperScanner extends ClassPathBeanDefinitionScanner */ private boolean lazyInitialization; - private final Set destinations = new HashSet<>(); - /** * 构造函数 * @@ -125,16 +121,11 @@ private void processBeanDefinition(final AbstractBeanDefinition beanDefinition) Assert.isBlank(canalBinding.destination(), ()->new IllegalStateException( "Either 'destination' must be required in @CanalBinding for: " + beanClassName)); - Assert.isTrue(destinations.contains(canalBinding.destination()), ()->new IllegalStateException( - "The destination: " + canalBinding.destination() + " already exists in @CanalBinding for: " + - beanClassName)); beanDefinition.setLazyInit(lazyInitialization); beanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, beanClassName); - - destinations.add(canalBinding.destination()); } } From 2ccb261ef71a5998258281796a3e76ba64eb4663 Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Wed, 6 Dec 2023 13:04:27 +0800 Subject: [PATCH 09/10] =?UTF-8?q?=E4=BC=98=E5=8C=96=20afterPropertiesSet?= =?UTF-8?q?=20=E9=87=8D=E5=A4=8D=E5=88=9D=E5=A7=8B=E5=8C=96=20bean=20?= =?UTF-8?q?=E5=AE=9E=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 7 +++++++ .../spring/client/factory/CanalClientFactoryBean.java | 5 ++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38de585..77a9776 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,13 @@ - **buession-canal-spring:** 修复注解 CanalBinding 无法重复定义 destination 的 BUG +### ⏪ 优化 + +- **buession-canal-core:** 代码优化 +- **buession-canal-spring:** 代码优化 +- **buession-canal-spring:** 优化 CanalClientFactoryBean 多次调用 afterPropertiesSet 时,重复初始化 CanalClient + + --- diff --git a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java index d00398b..fe806cc 100644 --- a/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java +++ b/buession-canal-spring/src/main/java/com/buession/canal/spring/client/factory/CanalClientFactoryBean.java @@ -57,9 +57,8 @@ public void afterPropertiesSet() throws Exception { Assert.isNull(getContext(), "Property 'context' is required"); Assert.isNull(getDispatcher(), "Property 'dispatcher' is required"); - canalClient = new DefaultCanalClient(getContext(), getDispatcher(), getExecutor()); - - if(canalClient.isRunning() == false){ + if(canalClient == null){ + canalClient = new DefaultCanalClient(getContext(), getDispatcher(), getExecutor()); canalClient.start(); } } From 6bc59ddf3313221b17e76dd1e221fc77dd2d6dbf Mon Sep 17 00:00:00 2001 From: "yong.teng" Date: Wed, 27 Dec 2023 11:44:58 +0800 Subject: [PATCH 10/10] Release 0.0.2 --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 77a9776..d50c029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ =========================== -## [0.0.2](https://github.com/buession/buession-canal/releases/tag/v0.0.2) (2023-xx-xx) +## [0.0.2](https://github.com/buession/buession-canal/releases/tag/v0.0.2) (2023-12-27) ### 🔨依赖升级