diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java index 405497f..86b0386 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/AbstractCanalClient.java @@ -24,15 +24,13 @@ */ package com.buession.canal.client; -import com.buession.canal.client.handler.DefaultMessageHandler; -import com.buession.canal.client.handler.MessageHandlerFactory; -import com.buession.canal.core.binding.CanalBinding; +import com.buession.canal.client.adapter.AdapterClient; +import com.buession.canal.client.dispatcher.Dispatcher; import com.buession.canal.core.concurrent.DefaultCanalThreadPoolExecutor; import com.buession.core.utils.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; import java.util.concurrent.ExecutorService; /** @@ -44,16 +42,19 @@ public abstract class AbstractCanalClient implements CanalClient { /** - * {@link Binder} 列表 + * {@link CanalContext} */ - private final List binders; + private final CanalContext context; /** * {@link ExecutorService} */ private final ExecutorService executor; - private final MessageHandlerFactory messageHandlerFactory; + /** + * 分发器 + */ + private final Dispatcher dispatcher; private volatile boolean running = false; @@ -62,37 +63,43 @@ public abstract class AbstractCanalClient implements CanalClient { /** * 构造函数 * - * @param binders - * {@link Binder} 列表 + * @param context + * {@link CanalContext} + * @param dispatcher + * 分发器 */ - public AbstractCanalClient(final List binders) { - this(binders, new DefaultCanalThreadPoolExecutor()); + public AbstractCanalClient(final CanalContext context, final Dispatcher dispatcher) { + this(context, dispatcher, new DefaultCanalThreadPoolExecutor()); } /** * 构造函数 * - * @param binders - * {@link Binder} 列表 - * {@link CanalBinding} 列表 + * @param context + * {@link CanalContext} + * @param dispatcher + * 分发器 * @param executor * {@link ExecutorService} */ - public AbstractCanalClient(final List binders, final ExecutorService executor) { - Assert.isNull(binders, "The Binder cloud not be null"); - Assert.isNull(executor, "The ExecutorService cloud not be null"); - this.binders = binders; + public AbstractCanalClient(final CanalContext context, final Dispatcher dispatcher, + final ExecutorService executor) { + Assert.isNull(context, "The CanalContext is required"); + Assert.isNull(dispatcher, "The Dispatcher is required"); + Assert.isNull(executor, "The ExecutorService is required"); + this.context = context; + this.dispatcher = dispatcher; this.executor = executor; - this.messageHandlerFactory = DefaultMessageHandler::new; } @Override public void start() { logger.info("CanalClient starting..."); - for(Binder binder : binders){ - process(binder, messageHandlerFactory, executor); - } + context.getAdapterClients().forEach((adapterClient)->{ + adapterClient.init(); + process(adapterClient, dispatcher, executor); + }); running = true; } @@ -109,7 +116,7 @@ public boolean isRunning() { return running; } - protected abstract void process(final Binder binder, final MessageHandlerFactory messageHandlerFactory, + protected abstract void process(final AdapterClient adapterClient, final Dispatcher dispatcher, final ExecutorService executor); } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/CanalContext.java b/buession-canal-client/src/main/java/com/buession/canal/client/CanalContext.java index 627cbcc..2092c4a 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/CanalContext.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/CanalContext.java @@ -21,10 +21,26 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.client;/** - * + */ +package com.buession.canal.client; + +import com.buession.canal.client.adapter.AdapterClient; + +import java.util.Set; + +/** + * Canal 上下文 * * @author Yong.Teng * @since 0.0.1 - */public interface CanalContext { + */ +public interface CanalContext { + + /** + * 返回所有 Canal 适配器 + * + * @return 所有 Canal 适配器 + */ + Set getAdapterClients(); + } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java index 287101e..c0907e4 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java @@ -24,9 +24,9 @@ */ package com.buession.canal.client; -import com.buession.canal.client.handler.MessageHandlerFactory; +import com.buession.canal.client.adapter.AdapterClient; +import com.buession.canal.client.dispatcher.Dispatcher; -import java.util.List; import java.util.concurrent.ExecutorService; /** @@ -40,29 +40,37 @@ public class DefaultCanalClient extends AbstractCanalClient { /** * 构造函数 * - * @param binders - * {@link Binder} 列表 + * @param context + * {@link CanalContext} + * @param dispatcher + * 分发器 */ - public DefaultCanalClient(final List binders) { - super(binders); + public DefaultCanalClient(final CanalContext context, final Dispatcher dispatcher) { + super(context, dispatcher); } /** * 构造函数 * - * @param binders - * {@link Binder} 列表 + * @param context + * {@link CanalContext} + * @param dispatcher + * 分发器 * @param executor * {@link ExecutorService} */ - public DefaultCanalClient(final List binders, final ExecutorService executor) { - super(binders, executor); + public DefaultCanalClient(final CanalContext context, final Dispatcher dispatcher, final ExecutorService executor) { + super(context, dispatcher, executor); } @Override - protected void process(final Binder binder, final MessageHandlerFactory messageHandlerFactory, + protected void process(final AdapterClient adapterClient, final Dispatcher dispatcher, final ExecutorService executor) { - executor.submit(messageHandlerFactory.newHandler(binder)); + executor.submit(()->{ + if(isRunning()){ + dispatcher.dispatch(adapterClient, 5); + } + }); } } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalContext.java b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalContext.java index f9a1a90..6163f36 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalContext.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalContext.java @@ -21,10 +21,39 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.client;/** - * + */ +package com.buession.canal.client; + +import com.buession.canal.client.adapter.AdapterClient; + +import java.util.Set; + +/** + * Canal 上下文 * * @author Yong.Teng * @since 0.0.1 - */public class DefaultCanalContext { + */ +public class DefaultCanalContext implements CanalContext { + + /** + * Canal 适配器 + */ + private final Set adapterClients; + + /** + * 构造函数 + * + * @param adapterClients + * Canal 适配器 + */ + public DefaultCanalContext(final Set adapterClients) { + this.adapterClients = adapterClients; + } + + @Override + public Set getAdapterClients() { + return adapterClients; + } + } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java index 8bc366d..82c4a88 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java @@ -45,7 +45,7 @@ * @author Yong.Teng * @since 0.0.1 */ -public abstract class AbstractCanalAdapterClient implements CanalAdapterClient { +public abstract class AbstractAdapterClient implements AdapterClient { /** * Canal 数据操作客户端 @@ -81,7 +81,7 @@ public abstract class AbstractCanalAdapterClient imple * @param destination * 指令 */ - public AbstractCanalAdapterClient(final C connector, final String destination) { + public AbstractAdapterClient(final C connector, final String destination) { this(connector, destination, 1); } @@ -95,7 +95,7 @@ public AbstractCanalAdapterClient(final C connector, final String destination) { * @param batchSize * 批处理条数 */ - public AbstractCanalAdapterClient(final C connector, final String destination, final int batchSize) { + public AbstractAdapterClient(final C connector, final String destination, final int batchSize) { Assert.isNull(connector, "CanalConnector cloud not be null."); this.connector = connector; this.destination = destination; diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java index ee66dfe..126dad2 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java @@ -43,8 +43,8 @@ * @author Yong.Teng * @since 0.0.1 */ -public abstract class AbstractCanalMqAdapterClient extends AbstractCanalAdapterClient - implements CanalMqAdapterClient { +public abstract class AbstractMqAdapterClient extends AbstractAdapterClient + implements MqAdapterClient { private final boolean flatMessage; @@ -56,7 +56,7 @@ public abstract class AbstractCanalMqAdapterClient e * @param destination * 指令 */ - public AbstractCanalMqAdapterClient(final C connector, final String destination) { + public AbstractMqAdapterClient(final C connector, final String destination) { this(connector, destination, DEFAULT_FLAT_MESSAGE); } @@ -70,7 +70,7 @@ public AbstractCanalMqAdapterClient(final C connector, final String destination) * @param batchSize * 批处理条数 */ - public AbstractCanalMqAdapterClient(final C connector, final String destination, final int batchSize) { + public AbstractMqAdapterClient(final C connector, final String destination, final int batchSize) { this(connector, destination, batchSize, DEFAULT_FLAT_MESSAGE); } @@ -84,7 +84,7 @@ public AbstractCanalMqAdapterClient(final C connector, final String destination, * @param flatMessage * true / false */ - public AbstractCanalMqAdapterClient(final C connector, final String destination, final boolean flatMessage) { + public AbstractMqAdapterClient(final C connector, final String destination, final boolean flatMessage) { super(connector, destination); this.flatMessage = flatMessage; } @@ -101,8 +101,8 @@ public AbstractCanalMqAdapterClient(final C connector, final String destination, * @param flatMessage * true / false */ - public AbstractCanalMqAdapterClient(final C connector, final String destination, final int batchSize, - final boolean flatMessage) { + public AbstractMqAdapterClient(final C connector, final String destination, final int batchSize, + final boolean flatMessage) { super(connector, destination, batchSize); this.flatMessage = flatMessage; } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java index 4266fef..9eb371e 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java @@ -37,7 +37,7 @@ * @author Yong.Teng * @since 0.0.1 */ -public interface CanalAdapterClient { +public interface AdapterClient { int DEFAULT_BATCH_SIZE = 1; diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/KafkaAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/KafkaAdapterClient.java index 3edde64..b17ad65 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/KafkaAdapterClient.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/KafkaAdapterClient.java @@ -32,7 +32,7 @@ * @author Yong.Teng * @since 0.0.1 */ -public class KafkaCanalAdapterClient extends AbstractCanalMqAdapterClient { +public class KafkaAdapterClient extends AbstractMqAdapterClient { /** * 构造函数 @@ -44,7 +44,7 @@ public class KafkaCanalAdapterClient extends AbstractCanalMqAdapterClient { +public class PulsarMQAdapterClient extends AbstractMqAdapterClient { public final static int DEFAULT_GET_BATCH_TIMEOUT = 30; @@ -60,8 +60,8 @@ public class PulsarMQCanalAdapterClient extends AbstractCanalMqAdapterClient { +public class RabbitMQAdapterClient extends AbstractMqAdapterClient { /** * 构造函数 @@ -48,8 +48,8 @@ public class RabbitMQCanalAdapterClient extends AbstractCanalMqAdapterClient { +public class RocketMQAdapterClient extends AbstractMqAdapterClient { /** * 构造函数 @@ -50,9 +50,9 @@ public class RocketMQCanalAdapterClient extends AbstractCanalMqAdapterClient { +public class TcpAdapterClient extends AbstractAdapterClient { public final static int DEFAULT_PORT = 1111; @@ -66,8 +66,8 @@ public class TcpCanalAdapterClient extends AbstractCanalAdapterClient serverSocketAddresses = Stream.of(servers).map( - TcpCanalAdapterClient::createSocketAddressFromHostAndPort).collect(Collectors.toList()); + TcpAdapterClient::createSocketAddressFromHostAndPort).collect(Collectors.toList()); ClusterCanalConnector canalConnector = (ClusterCanalConnector) CanalConnectors.newClusterConnector(serverSocketAddresses, destination, diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java index eb2f89e..9391c5e 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java @@ -21,10 +21,131 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.client.dispatcher;/** - * + */ +package com.buession.canal.client.dispatcher; + +import com.buession.canal.client.adapter.AdapterClient; +import com.buession.canal.core.CanalMessage; +import com.buession.canal.core.listener.EventListenerMethod; +import com.buession.canal.core.listener.EventListenerRegistry; +import com.buession.canal.core.listener.support.DestinationArgumentResolver; +import com.buession.canal.core.listener.support.EntryTypeArgumentResolver; +import com.buession.canal.core.listener.support.EventListenerArgumentResolver; +import com.buession.canal.core.listener.support.EventListenerArgumentResolverComposite; +import com.buession.canal.core.listener.support.EventTypeArgumentResolver; +import com.buession.canal.core.listener.support.HeaderArgumentResolver; +import com.buession.canal.core.listener.support.RowChangeArgumentResolver; +import com.buession.canal.core.listener.support.RowDataArgumentResolver; +import com.buession.canal.core.listener.support.RowDataArrayArgumentResolver; +import com.buession.canal.core.listener.support.RowDataCollectionArgumentResolver; +import com.buession.canal.core.listener.support.SchemaArgumentResolver; +import com.buession.canal.core.listener.support.TableArgumentResolver; +import com.buession.core.builder.ListBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * 分发器抽象类 * * @author Yong.Teng * @since 0.0.1 - */public class AbstractDispatcher { + */ +public abstract class AbstractDispatcher implements Dispatcher { + + private final EventListenerRegistry eventListenerRegistry = new EventListenerRegistry(); + + private final EventListenerArgumentResolverComposite argumentResolvers = new EventListenerArgumentResolverComposite(); + + private volatile boolean running = true; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + public AbstractDispatcher() { + argumentResolvers.addResolvers(getDefaultArgumentResolvers()); + } + + public EventListenerRegistry getEventListenerRegistry() { + return eventListenerRegistry; + } + + @Override + public void dispatch(AdapterClient adapterClient, long timeout) { + while(running){ + try{ + List messages = adapterClient.getListWithoutAck(timeout, TimeUnit.SECONDS); + + if(messages != null){ + for(CanalMessage message : messages){ + doDispatch(message); + } + } + + adapterClient.ack(); + }catch(Exception e){ + logger.error("Message handle error", e); + } + } + + running = false; + } + + protected void doDispatch(final CanalMessage canalMessage) { + EventListenerMethod method = eventListenerRegistry.getMethod(buildEventListenerName(canalMessage)); + + if(method == null){ + method = eventListenerRegistry.getMethod(buildEventListenerNameWithoutTable(canalMessage)); + } + + if(method == null){ + return; + } + + method.setArgumentResolvers(argumentResolvers); + + try{ + method.invoke(canalMessage); + }catch(Exception e){ + e.printStackTrace(); + } + } + + private static String buildEventListenerName(final CanalMessage canalMessage) { + final StringBuilder sb = new StringBuilder(); + + sb.append(canalMessage.getDestination()).append("$$"); + sb.append(canalMessage.getTable().getSchema()).append('.').append(canalMessage.getTable().getName()) + .append("$$"); + sb.append(canalMessage.getEventType().name()); + + return sb.toString(); + } + + private static String buildEventListenerNameWithoutTable(final CanalMessage canalMessage) { + final StringBuilder sb = new StringBuilder(); + + sb.append(canalMessage.getDestination()).append("$$"); + sb.append('.').append("$$"); + sb.append(canalMessage.getEventType().name()); + + return sb.toString(); + } + + protected static List getDefaultArgumentResolvers() { + return ListBuilder.create(10) + .add(new DestinationArgumentResolver()) + .add(new EntryTypeArgumentResolver()) + .add(new EventTypeArgumentResolver()) + .add(new HeaderArgumentResolver()) + .add(new RowChangeArgumentResolver()) + .add(new RowDataArgumentResolver()) + .add(new RowDataCollectionArgumentResolver()) + .add(new RowDataArrayArgumentResolver()) + .add(new SchemaArgumentResolver()) + .add(new TableArgumentResolver()) + .build(); + } + } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/DefaultDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/DefaultDispatcher.java index 86f5f4c..c896b0c 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/DefaultDispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/DefaultDispatcher.java @@ -21,10 +21,19 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.client.dispatcher;/** - * + */ +package com.buession.canal.client.dispatcher; + +/** + * 默认分发器 * * @author Yong.Teng * @since 0.0.1 - */public class DefaultDispatcher { + */ +public class DefaultDispatcher extends AbstractDispatcher { + + public DefaultDispatcher() { + super(); + } + } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/Dispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/Dispatcher.java index 91ce0f3..58d30b7 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/Dispatcher.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/Dispatcher.java @@ -21,10 +21,28 @@ * | Author: Yong.Teng | * | Copyright @ 2013-2023 Buession.com Inc. | * +-------------------------------------------------------------------------------------------------------+ - */package com.buession.canal.client.dispatcher;/** - * + */ +package com.buession.canal.client.dispatcher; + +import com.buession.canal.client.adapter.AdapterClient; + +/** + * 分发器 * * @author Yong.Teng * @since 0.0.1 - */public interface Dispatcher { + */ +@FunctionalInterface +public interface Dispatcher { + + /** + * 分发 + * + * @param adapterClient + * Canal 适配器 + * @param timeout + * 超时时长,单位:秒 + */ + void dispatch(AdapterClient adapterClient, long timeout); + } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/package-info.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/package-info.java index 28d212a..b2f6529 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/package-info.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/package-info.java @@ -26,4 +26,4 @@ * @author Yong.Teng * @since 0.0.1 */ -package com.buession.canal.client; \ No newline at end of file +package com.buession.canal.client.dispatcher; \ No newline at end of file diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/handler/AbstractMessageHandler.java b/buession-canal-client/src/main/java/com/buession/canal/client/handler/AbstractMessageHandler.java index 43f6b1f..8658234 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/handler/AbstractMessageHandler.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/handler/AbstractMessageHandler.java @@ -24,24 +24,10 @@ */ package com.buession.canal.client.handler; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.buession.canal.client.Binder; -import com.buession.canal.client.adapter.CanalAdapterClient; import com.buession.canal.core.CanalMessage; -import com.buession.canal.core.ParameterType; -import com.buession.canal.core.binding.CanalBinding; -import com.buession.canal.core.listener.CanalEventListener; -import com.buession.canal.core.listener.ParameterMapping; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.buession.canal.core.listener.EventHandler; +import com.buession.canal.core.listener.MethodParameter; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.TimeUnit; import java.util.function.Function; /** @@ -52,74 +38,17 @@ */ public abstract class AbstractMessageHandler implements MessageHandler { - private final CanalAdapterClient adapterClient; - - private final CanalBinding binding; - - private final long timeout; - - private volatile boolean running = true; - - protected final Logger logger = LoggerFactory.getLogger(getClass()); - - public AbstractMessageHandler(final Binder binder) { - this.adapterClient = binder.getAdapterClient(); - this.binding = binder.getBinding(); - this.timeout = binder.getTimeout(); - adapterClient.init(); + public AbstractMessageHandler() { } @Override - public void run() { - while(running){ - try{ - List messages = adapterClient.getListWithoutAck(timeout, TimeUnit.SECONDS); - - for(CanalMessage message : messages){ - distributeEvent(message); - } - - adapterClient.ack(); - }catch(Exception e){ - logger.error("Message handle error", e); - } - } - - running = false; - } - - protected CanalBinding getBinding() { - return binding; - } - - protected Object[] getArgs(final CanalEventListener eventListener, final CanalMessage message) { - return Arrays.stream(eventListener.getParameterMappings()).map(convertParameter(message)).toArray(); + public Object handle(final EventHandler eventHandler) { + return null; } - protected abstract void distributeEvent(final CanalMessage message); - - protected Function convertParameter(final CanalMessage message) { + protected Function convertParameter(final CanalMessage message) { return (pm)->{ - switch(pm.getParameterType()){ - case ROW_CHANGE: - return message.getRowChange(); - case ROW_DATA: - return message.getRowChange().getRowDatasList().get(0); - case HEADER: - return message.getHeader(); - case ENTRY_TYPE: - return message.getEntryType(); - case EVENT_TYPE: - return message.getEventType(); - case DESTINATION: - return message.getDestination(); - case SCHEMA: - return message.getTable().getSchema(); - case TABLE: - return message.getTable().getName(); - default: - break; - } + /* ParameterizedType parameterizedType = ((ParameterizedType) pm.getType()); Type type = parameterizedType.getRawType(); @@ -131,6 +60,8 @@ protected Function convertParameter(final CanalMessage } } + */ + return null; }; } diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/handler/DefaultMessageHandler.java b/buession-canal-client/src/main/java/com/buession/canal/client/handler/DefaultMessageHandler.java index 5e96c7e..25b7316 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/handler/DefaultMessageHandler.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/handler/DefaultMessageHandler.java @@ -25,11 +25,8 @@ package com.buession.canal.client.handler; import com.alibaba.otter.canal.protocol.CanalEntry; -import com.buession.canal.client.Binder; -import com.buession.canal.core.CanalMessage; -import com.buession.canal.core.listener.CanalEventListener; +import com.buession.canal.core.listener.EventListener; import com.buession.core.validator.Validate; -import org.slf4j.Logger; import java.util.Objects; import java.util.function.Predicate; @@ -42,43 +39,24 @@ */ public class DefaultMessageHandler extends AbstractMessageHandler { - public DefaultMessageHandler(final Binder binder) { - super(binder); + public DefaultMessageHandler() { + super(); } - @Override - protected void distributeEvent(final CanalMessage message) { - if(getBinding().getListeners() == null){ - return; - } - - getBinding().getListeners().stream() - .filter(eventListenerFilter(message.getTable().getSchema(), message.getTable().getName(), - message.getEventType())) - .forEach((listener)->{ - try{ - listener.getInvoker() - .invoke(listener.getObject(), listener.getMethod(), getArgs(listener, message)); - }catch(Throwable e){ - logger.error("Invoker invoke error", e); - } - }); - } - - protected Predicate eventListenerFilter(final String schemaName, final String tableName, - final CanalEntry.EventType eventType) { + protected Predicate eventListenerFilter(final String schemaName, final String tableName, + final CanalEntry.EventType eventType) { // 判断数据库名是否一致 - Predicate sf = + Predicate sf = listener->schemaName == null || Validate.isBlank(listener.getTable().getSchema()) || Objects.equals(schemaName, listener.getTable().getSchema()); // 判断数据表名是否一致 - Predicate tf = + Predicate tf = listener->tableName == null || Validate.isBlank(listener.getTable().getName()) || Objects.equals(tableName, listener.getTable().getName()); // 判断事件类型是否一致 - Predicate ef = + Predicate ef = listener->eventType == null || listener.getEventType() == null || eventType == listener.getEventType(); diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/handler/MessageHandler.java b/buession-canal-client/src/main/java/com/buession/canal/client/handler/MessageHandler.java index 29340ad..4f24eef 100644 --- a/buession-canal-client/src/main/java/com/buession/canal/client/handler/MessageHandler.java +++ b/buession-canal-client/src/main/java/com/buession/canal/client/handler/MessageHandler.java @@ -24,12 +24,25 @@ */ package com.buession.canal.client.handler; +import com.buession.canal.core.listener.EventHandler; + /** * 信息处理接口 * * @author Yong.Teng * @since 0.0.1 */ -public interface MessageHandler extends Runnable { +@FunctionalInterface +public interface MessageHandler { + + /** + * 消息处理 + * + * @param eventHandler + * 消息 + * + * @return 返回数据 + */ + Object handle(final EventHandler eventHandler); } diff --git a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java index 2dbb261..3a95495 100644 --- a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java +++ b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java @@ -25,7 +25,6 @@ package com.buession.canal.client.adapter; import com.buession.canal.core.CanalMessage; -import com.buession.core.utils.StringUtils; import org.junit.Test; import java.util.List; @@ -35,9 +34,9 @@ * @author Yong.Teng * @since 0.0.1 */ -public class KafkaCanalAdapterClientTest { +public class KafkaAdapterClientTest { - private final static KafkaCanalAdapterClient client = new KafkaCanalAdapterClient("127.0.0.1:9092", + private final static KafkaAdapterClient client = new KafkaAdapterClient("127.0.0.1:9092", "user", "user", null, 1, true); static { diff --git a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java index cb29d7a..b22ddfa 100644 --- a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java +++ b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java @@ -34,9 +34,9 @@ * @author Yong.Teng * @since 0.0.1 */ -public class TcpCanalAdapterClientTest { +public class TcpAdapterClientTest { - private final static TcpCanalAdapterClient client = new TcpCanalAdapterClient("127.0.0.1:11111", + private final static TcpAdapterClient client = new TcpAdapterClient("127.0.0.1:11111", null, "customer", "admin", "admin", 5); static {