diff --git a/buession-canal-core/pom.xml b/buession-canal-core/pom.xml index 7a4364e..7fad21b 100644 --- a/buession-canal-core/pom.xml +++ b/buession-canal-core/pom.xml @@ -63,6 +63,11 @@ buession-core ${buession.version} + + com.buession + buession-beans + ${buession.version} + org.springframework diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java b/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java index f13a3c5..e4b655a 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/CanalMessage.java @@ -28,9 +28,12 @@ import java.io.Serializable; import java.util.List; +import java.util.Map; import java.util.StringJoiner; /** + * 消息 + * * @author Yong.Teng * @since 0.0.1 */ @@ -67,6 +70,8 @@ public class CanalMessage implements Serializable { private CanalEntry.RowChange rowChange; + private Object data; + /** * 返回指令 * @@ -167,4 +172,25 @@ public void setRowChange(CanalEntry.RowChange rowChange) { this.rowChange = rowChange; } + public Object getData() { + return data; + } + + public void setData(Object data) { + this.data = data; + } + + @Override + public String toString() { + return new StringJoiner(", ", "CanalMessage[", "]") + .add("destination='" + destination + "'") + .add("table=" + table) + .add("entryType=" + entryType) + .add("eventType=" + eventType) + .add("isDdl=" + isDdl) + .add("header=" + header) + .add("rowChange=" + rowChange) + .toString(); + } + } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/Configuration.java b/buession-canal-core/src/main/java/com/buession/canal/core/Configuration.java index abe5fa2..2ec755a 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/Configuration.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/Configuration.java @@ -136,7 +136,7 @@ public void setTimeout(Duration timeout) { @Override public String toString() { - return new StringJoiner(", ", Configuration.class.getSimpleName() + "[", "]") + return new StringJoiner(", ", "Configuration[", "]") .add("destination='" + destination + "'") .add("filter='" + filter + "'") .add("batchSize=" + batchSize) diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/Result.java b/buession-canal-core/src/main/java/com/buession/canal/core/Result.java index 34dd657..1bf9108 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/Result.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/Result.java @@ -21,10 +21,106 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.core;/** - * + */ +package com.buession.canal.core; + +import java.io.Serializable; +import java.util.List; +import java.util.StringJoiner; + +/** + * 结果 * * @author Yong.Teng * @since 0.0.1 - */public class Result { + */ +public class Result implements Serializable { + + private final static long serialVersionUID = 5375957586654168072L; + + /** + * ID + */ + private long id; + + /** + * 消息列表 + */ + private List messages; + + /** + * 构造函数 + */ + public Result() { + } + + /** + * 构造函数 + * + * @param messages + * 消息列表 + */ + public Result(List messages) { + this(-1, messages); + } + + /** + * 构造函数 + * + * @param id + * ID + * @param messages + * 消息列表 + */ + public Result(long id, List messages) { + this.id = id; + this.messages = messages; + } + + /** + * 返回 ID + * + * @return ID + */ + public long getId() { + return id; + } + + /** + * 设置 ID + * + * @param id + * ID + */ + public void setId(long id) { + this.id = id; + } + + /** + * 返回消息列表 + * + * @return 消息列表 + */ + public List getMessages() { + return messages; + } + + /** + * 设置消息列表 + * + * @param messages + * 消息列表 + */ + public void setMessages(List messages) { + this.messages = messages; + } + + @Override + public String toString() { + return new StringJoiner(", ", "Result[", "]") + .add("id=" + id) + .add("messages=" + messages) + .toString(); + } + } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java index 7d96334..8c106e7 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/DefaultMessageConverter.java @@ -44,15 +44,15 @@ public class DefaultMessageConverter extends AbstractMessageConverter { private final List ignoreEntryTypes = getIgnoreEntryTypes(); @Override - public List convert(final Message message) { + public List convert(final String destination, final Message message) { List entries = message.getEntries(); return entries.stream() .filter((entry)->ignoreEntryTypes.stream().anyMatch(t->entry.getEntryType() == t) == false) - .map(this::doParseEntry).collect(Collectors.toList()); + .map((entry)->this.doParseEntry(destination, entry)).collect(Collectors.toList()); } - private CanalMessage doParseEntry(final CanalEntry.Entry entry) { + private CanalMessage doParseEntry(final String destination, final CanalEntry.Entry entry) { CanalEntry.RowChange rowChange; try{ rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); @@ -62,11 +62,13 @@ private CanalMessage doParseEntry(final CanalEntry.Entry entry) { final CanalMessage canalMessage = new CanalMessage(); + canalMessage.setDestination(destination); canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName())); canalMessage.setEntryType(entry.getEntryType()); canalMessage.setEventType(entry.getHeader().getEventType()); canalMessage.setHeader(entry.getHeader()); canalMessage.setRowChange(rowChange); + canalMessage.setData(rowChange.getRowDatasList()); canalMessage.setDdl(rowChange.getIsDdl()); return canalMessage; diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java index 7d96334..db81ba9 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/FlatMessageConverter.java @@ -25,12 +25,12 @@ package com.buession.canal.core.convert; import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.Message; +import com.alibaba.otter.canal.protocol.FlatMessage; import com.buession.canal.core.CanalMessage; import com.buession.canal.core.Table; -import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -39,43 +39,25 @@ * @author Yong.Teng * @since 0.0.1 */ -public class DefaultMessageConverter extends AbstractMessageConverter { - - private final List ignoreEntryTypes = getIgnoreEntryTypes(); +public class FlatMessageConverter extends AbstractMessageConverter { @Override - public List convert(final Message message) { - List entries = message.getEntries(); - - return entries.stream() - .filter((entry)->ignoreEntryTypes.stream().anyMatch(t->entry.getEntryType() == t) == false) - .map(this::doParseEntry).collect(Collectors.toList()); + public List convert(final String destination, final FlatMessage message) { + return message.getData().stream().map((row)->this.doParseEntry(destination, message, row)) + .collect(Collectors.toList()); } - private CanalMessage doParseEntry(final CanalEntry.Entry entry) { - CanalEntry.RowChange rowChange; - try{ - rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); - }catch(Exception e){ - throw new RuntimeException("parse event has an error, data: " + entry, e); - } - + private CanalMessage doParseEntry(final String destination, final FlatMessage message, + final Map row) { final CanalMessage canalMessage = new CanalMessage(); - canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName())); - canalMessage.setEntryType(entry.getEntryType()); - canalMessage.setEventType(entry.getHeader().getEventType()); - canalMessage.setHeader(entry.getHeader()); - canalMessage.setRowChange(rowChange); - canalMessage.setDdl(rowChange.getIsDdl()); + canalMessage.setDestination(destination); + canalMessage.setTable(new Table(message.getDatabase(), message.getTable())); + canalMessage.setEventType(CanalEntry.EventType.valueOf(message.getType())); + canalMessage.setData(row); + canalMessage.setDdl(message.getIsDdl()); return canalMessage; } - @Override - protected List getIgnoreEntryTypes() { - return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND, - CanalEntry.EntryType.HEARTBEAT); - } - } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/convert/MessageConverter.java b/buession-canal-core/src/main/java/com/buession/canal/core/convert/MessageConverter.java index 749a491..e21d4ae 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/convert/MessageConverter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/convert/MessageConverter.java @@ -25,7 +25,6 @@ package com.buession.canal.core.convert; import com.buession.canal.core.CanalMessage; -import com.buession.core.converter.Converter; import java.util.List; @@ -39,17 +38,18 @@ * @since 0.0.1 */ @FunctionalInterface -public interface MessageConverter extends Converter> { +public interface MessageConverter { /** * 将原始消息转换成 {@link CanalMessage} * + * @param destination + * 指令 * @param message * 原始消息 * * @return {@link CanalMessage} */ - @Override - List convert(final M message); + List convert(final String destination, final M message); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java index 8fc9276..40a14a7 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerMethod.java @@ -30,7 +30,6 @@ import com.buession.core.validator.Validate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.BeanFactory; import org.springframework.core.BridgeMethodResolver; import org.springframework.util.ClassUtils; import org.springframework.util.ReflectionUtils; @@ -40,16 +39,16 @@ import java.util.Arrays; /** + * {@link com.buession.canal.annotation.CanalEventListener} 方法 + * * @author Yong.Teng * @since 0.0.1 */ -public class EventListenerMethod { +public final class EventListenerMethod { private final static Object[] EMPTY_ARGS = new Object[0]; - private final BeanFactory beanFactory; - - private EventListenerArgumentResolverComposite argumentResolvers = new EventListenerArgumentResolverComposite(); + private EventListenerArgumentResolverComposite argumentResolvers; private final Object target; @@ -63,10 +62,10 @@ public class EventListenerMethod { private final static Logger logger = LoggerFactory.getLogger(EventListenerMethod.class); - public EventListenerMethod(Object target, Method method) { + public EventListenerMethod(Object target, Method method, EventListenerArgumentResolverComposite argumentResolvers) { Assert.isNull(target, "Target is required"); Assert.isNull(method, "Method is required"); - this.beanFactory = null; + this.argumentResolvers = argumentResolvers; this.target = target; this.targetType = ClassUtils.getUserClass(target); this.method = method; @@ -74,28 +73,11 @@ public EventListenerMethod(Object target, Method method) { this.parameters = initMethodParameters(); } - public EventListenerMethod(String beanName, BeanFactory beanFactory, Method method) { - Assert.isBlank(beanName, "Bean name is required"); - Assert.isNull(beanFactory, "BeanFactory is required"); - Assert.isNull(method, "Method is required"); - this.beanFactory = beanFactory; - this.target = beanName; - Class targetType = beanFactory.getType(beanName); - if(targetType == null){ - throw new IllegalStateException("Cannot resolve bean type for bean with name '" + beanName + "'"); - } - this.targetType = ClassUtils.getUserClass(targetType); - this.method = method; - this.bridgedMethod = BridgeMethodResolver.findBridgedMethod(method); - this.parameters = initMethodParameters(); - } - public EventListenerArgumentResolverComposite getArgumentResolvers() { return argumentResolvers; } - public void setArgumentResolvers( - EventListenerArgumentResolverComposite argumentResolvers) { + public void setArgumentResolvers(EventListenerArgumentResolverComposite argumentResolvers) { this.argumentResolvers = argumentResolvers; } @@ -148,10 +130,9 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro try{ args[i] = argumentResolvers.resolve(parameter, canalMessage); }catch(Exception e){ - // Leave stack trace for later, exception may actually be resolved and handled... if(logger.isDebugEnabled()){ String exMsg = e.getMessage(); - if(exMsg != null && !exMsg.contains(parameter.getMethod().toGenericString())){ + if(exMsg != null && exMsg.contains(parameter.getMethod().toGenericString()) == false){ logger.debug(formatArgumentError(parameter, exMsg)); } } @@ -179,9 +160,16 @@ private MethodParameter[] initMethodParameters() { } private static String formatArgumentError(final MethodParameter methodParameter, final String message) { - return "Could not resolve parameter [" + methodParameter.getIndex() + "] in " + - methodParameter.getMethod().toGenericString() + - (Validate.hasText(message) ? ": " + message : ""); + final StringBuilder sb = new StringBuilder("Could not resolve parameter ["); + + sb.append('[').append(methodParameter.getIndex()).append(']'); + sb.append(" in ").append(methodParameter.getMethod().toGenericString()); + + if(Validate.hasText(message)){ + sb.append(": ").append(message); + } + + return sb.toString(); } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerRegistry.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerRegistry.java index c2712f3..44679a9 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerRegistry.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/EventListenerRegistry.java @@ -24,6 +24,8 @@ */ package com.buession.canal.core.listener; +import com.buession.canal.core.listener.support.EventListenerArgumentResolverComposite; + import java.lang.reflect.Method; import java.util.LinkedHashMap; import java.util.Map; @@ -34,6 +36,8 @@ */ public class EventListenerRegistry { + private final EventListenerArgumentResolverComposite argumentResolvers = new EventListenerArgumentResolverComposite(); + private final Map methods = new LinkedHashMap<>(); public Map getMethods() { @@ -45,11 +49,7 @@ public EventListenerMethod getMethod(String name) { } public void register(String name, Object object, Method method) { - methods.put(name, createEventListenerMethod(object, method)); - } - - protected EventListenerMethod createEventListenerMethod(Object object, Method method) { - return new EventListenerMethod(object, method); + methods.put(name, new EventListenerMethod(object, method, argumentResolvers)); } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java index 87a430b..ffc8691 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/MethodParameter.java @@ -31,6 +31,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.lang.reflect.Parameter; +import java.util.Arrays; import java.util.Objects; /** @@ -222,7 +223,7 @@ public Annotation[] getAnnotations() { @Override public int hashCode() { - return Objects.hash(name, index, parameter, getParameterType(), type, getAnnotations()); + return Objects.hash(name, index, parameter, getParameterType(), type, Arrays.hashCode(getAnnotations())); } @Override @@ -236,7 +237,7 @@ public boolean equals(Object obj) { return Objects.equals(name, that.getName()) && Objects.equals(parameter, that.getParameter()) && Objects.equals(getParameterType(), that.getParameterType()) && Objects.equals(type, that.getType()) && - Objects.equals(getAnnotations(), that.getAnnotations()); + Arrays.equals(getAnnotations(), that.getAnnotations()); } return false; diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java index db44664..b540f54 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArgumentResolver.java @@ -24,34 +24,47 @@ */ package com.buession.canal.core.listener.support; -import com.alibaba.otter.canal.protocol.CanalEntry; +import com.buession.beans.BeanConverter; +import com.buession.beans.BeanUtils; +import com.buession.beans.DefaultBeanConverter; +import com.buession.beans.converters.DatePropertyConverter; +import com.buession.canal.annotation.Row; import com.buession.canal.core.CanalMessage; import com.buession.canal.core.listener.MethodParameter; -import com.buession.core.validator.Validate; -import java.util.List; +import java.util.Date; +import java.util.Map; /** - * {@link CanalEntry.RowData} 参数解析器 + * 行数据参数解析器 * * @author Yong.Teng * @since 0.0.1 */ -public class RowDataArgumentResolver implements EventListenerArgumentResolver { +public class RowArgumentResolver implements EventListenerArgumentResolver { @Override public boolean supports(MethodParameter parameter) { - return CanalEntry.RowData.class.isAssignableFrom(parameter.getParameterType()); + return parameter.hasAnnotation(Row.class); } @Override public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getRowChange() == null){ + if(canalMessage == null || canalMessage.getData() == null){ return null; } - List rowData = canalMessage.getRowChange().getRowDatasList(); - return rowData.get(0); + Object data = canalMessage.getData(); + if(Map.class.isAssignableFrom(parameter.getParameterType())){ + return data; + }else{ + final DefaultBeanConverter beanConverter = new DefaultBeanConverter(); + final Object target = BeanUtils.instantiateClass(parameter.getParameterType()); + + beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss")); + + return beanConverter.convert(data, target); + } } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java index aa51422..6553247 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowArrayArgumentResolver.java @@ -25,36 +25,34 @@ package com.buession.canal.core.listener.support; import com.alibaba.otter.canal.protocol.CanalEntry; +import com.buession.canal.annotation.Row; import com.buession.canal.core.CanalMessage; import com.buession.canal.core.listener.MethodParameter; import java.util.List; /** - * {@link CanalEntry.RowData} 数组参数解析器 + * 行数据数组参数解析器 * * @author Yong.Teng * @since 0.0.1 */ -public class RowDataArrayArgumentResolver implements EventListenerArgumentResolver { +public class RowArrayArgumentResolver implements EventListenerArgumentResolver { @Override public boolean supports(MethodParameter parameter) { - if(parameter.getParameterType().isArray()){ - return CanalEntry.RowData.class.isAssignableFrom(parameter.getParameterType().getComponentType()); - } - - return false; + return parameter.hasAnnotation(Row.class) && parameter.getParameterType().isArray(); } @Override public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getRowChange() == null){ + if(canalMessage == null || canalMessage.getData() == null){ return null; } - List rowData = canalMessage.getRowChange().getRowDatasList(); - return rowData.toArray(new CanalEntry.RowData[]{}); + //List rowData = canalMessage.getRowChange().getRowDatasList(); + //return rowData.toArray(new CanalEntry.RowData[]{}); + return null; } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java index 42b5b3d..81b53ce 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowCollectionArgumentResolver.java @@ -24,49 +24,35 @@ */ package com.buession.canal.core.listener.support; -import com.alibaba.otter.canal.protocol.CanalEntry; +import com.buession.canal.annotation.Row; import com.buession.canal.core.CanalMessage; import com.buession.canal.core.listener.MethodParameter; -import com.buession.core.validator.Validate; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; -import java.util.Objects; import java.util.Set; /** - * {@link CanalEntry.RowData} 集合参数解析器 + * 行数据集合参数解析器 * * @author Yong.Teng * @since 0.0.1 */ -public class RowDataCollectionArgumentResolver implements EventListenerArgumentResolver { +public class RowCollectionArgumentResolver implements EventListenerArgumentResolver { @Override public boolean supports(MethodParameter parameter) { - if(Collection.class.isAssignableFrom(parameter.getParameterType())){ - Type[] actualTypeArguments = - ((ParameterizedType) parameter.getParameter().getParameterizedType()).getActualTypeArguments(); - - if(Validate.isEmpty(actualTypeArguments)){ - return false; - } - - return Objects.equals(actualTypeArguments[0], CanalEntry.RowData.class); - } - - return false; + return parameter.hasAnnotation(Row.class) && Collection.class.isAssignableFrom(parameter.getParameterType()); } @Override public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null || canalMessage.getRowChange() == null){ + if(canalMessage == null || canalMessage.getData() == null){ return null; } + /* if(List.class.isAssignableFrom(parameter.getParameterType())){ return canalMessage.getRowChange().getRowDatasList(); }else if(Set.class.isAssignableFrom(parameter.getParameterType())){ @@ -74,6 +60,9 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage }else{ return null; } + + */ + return null; } } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArgumentResolver.java index 5b9268a..db44664 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArgumentResolver.java @@ -46,7 +46,7 @@ public boolean supports(MethodParameter parameter) { @Override public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null){ + if(canalMessage == null || canalMessage.getRowChange() == null){ return null; } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArrayArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArrayArgumentResolver.java index fe2a6c0..aa51422 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArrayArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataArrayArgumentResolver.java @@ -49,7 +49,7 @@ public boolean supports(MethodParameter parameter) { @Override public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null){ + if(canalMessage == null || canalMessage.getRowChange() == null){ return null; } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataCollectionArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataCollectionArgumentResolver.java index 6d1919d..42b5b3d 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataCollectionArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/RowDataCollectionArgumentResolver.java @@ -57,13 +57,13 @@ public boolean supports(MethodParameter parameter) { return Objects.equals(actualTypeArguments[0], CanalEntry.RowData.class); } - + return false; } @Override public Object resolve(MethodParameter parameter, final CanalMessage canalMessage) throws Exception { - if(canalMessage == null){ + if(canalMessage == null || canalMessage.getRowChange() == null){ return null; } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java index dc34e9e..17b754e 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/support/TableArgumentResolver.java @@ -36,23 +36,10 @@ */ public class TableArgumentResolver implements EventListenerArgumentResolver { - private boolean isTableObject; - - private boolean isTableName; - @Override public boolean supports(final MethodParameter parameter) { - isTableObject = com.buession.canal.core.Table.class.isAssignableFrom(parameter.getType()); - if(isTableObject){ - return true; - } - - isTableName = parameter.hasAnnotation(Table.class); - if(isTableName){ - return CharSequence.class.isAssignableFrom(parameter.getType()); - } - - return false; + return com.buession.canal.core.Table.class.isAssignableFrom(parameter.getType()) || + parameter.hasAnnotation(Table.class) && CharSequence.class.isAssignableFrom(parameter.getType()); } @Override @@ -61,11 +48,11 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage return null; } - if(isTableObject){ + if(com.buession.canal.core.Table.class.isAssignableFrom(parameter.getType())){ return canalMessage.getTable(); } - if(isTableName){ + if(parameter.hasAnnotation(Table.class)){ return canalMessage.getTable().getName(); } diff --git a/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java b/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java index aa7cb7d..095130a 100644 --- a/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java +++ b/buession-canal-core/src/main/java/com/buession/canal/core/listener/utils/EventListenerUtils.java @@ -27,6 +27,8 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.buession.core.validator.Validate; +import java.lang.reflect.Method; + /** * @author Yong.Teng * @since 0.0.1