diff --git a/buession-canal-client/pom.xml b/buession-canal-client/pom.xml
index 78eb033..8b24895 100644
--- a/buession-canal-client/pom.xml
+++ b/buession-canal-client/pom.xml
@@ -66,17 +66,39 @@
com.alibaba.otter
- canal.client
+ canal.protocol
com.alibaba.otter
- canal.protocol
+ canal.client
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ provided
org.apache.kafka
kafka-clients
+
+
+ com.alibaba
+ fastjson
+
+
+
+ com.google.guava
+ guava
+
+
+ com.google.protobuf
+ protobuf-java
+ provided
+
+
org.slf4j
slf4j-api
diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java
index fcd42d1..965fe1d 100644
--- a/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java
+++ b/buession-canal-client/src/main/java/com/buession/canal/client/DefaultCanalClient.java
@@ -67,7 +67,7 @@ public DefaultCanalClient(final CanalContext context, final Dispatcher dispatche
protected void process(final AdapterClient adapterClient, final Dispatcher dispatcher,
final ExecutorService executor) {
executor.submit(()->{
- if(isRunning()){
+ while(isRunning()){
dispatcher.dispatch(adapterClient);
}
});
diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java
index db98b41..8a8cb91 100644
--- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java
+++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractAdapterClient.java
@@ -164,13 +164,7 @@ protected C getConnector() {
@SuppressWarnings({"unchecked"})
protected List messagesConvert(final Message message) {
- List messages = getMessageConverter().convert(message);
-
- if(messages != null){
- messages.forEach((m)->m.setDestination(configuration.getDestination()));
- }
-
- return messages;
+ return getMessageConverter().convert(configuration.getDestination(), message);
}
protected List messagesConvert(final List messages) {
diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java
index 2d25857..fcbc8be 100644
--- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java
+++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AbstractMqAdapterClient.java
@@ -30,6 +30,8 @@
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Configuration;
+import com.buession.canal.core.Result;
+import com.buession.canal.core.convert.FlatMessageConverter;
import java.util.ArrayList;
import java.util.List;
@@ -88,6 +90,9 @@ public AbstractMqAdapterClient(final C connector, final String destination, fina
public AbstractMqAdapterClient(final C connector, final String destination, final boolean flatMessage) {
super(connector, destination);
this.flatMessage = flatMessage;
+ if(this.flatMessage){
+ setMessageConverter(new FlatMessageConverter());
+ }
}
/**
@@ -106,6 +111,9 @@ public AbstractMqAdapterClient(final C connector, final String destination, fina
final boolean flatMessage) {
super(connector, destination, configuration);
this.flatMessage = flatMessage;
+ if(this.flatMessage){
+ setMessageConverter(new FlatMessageConverter());
+ }
}
@Override
@@ -114,24 +122,24 @@ public boolean isFlatMessage() {
}
@Override
- public List getList(Long timeout, TimeUnit unit) throws CanalClientException {
+ public Result getList(Long timeout, TimeUnit unit) throws CanalClientException {
if(isFlatMessage()){
List messages = getConnector().getFlatList(timeout, unit);
- return flatMessagesConvert(messages);
+ return new Result(flatMessagesConvert(messages));
}else{
List messages = getConnector().getList(timeout, unit);
- return messagesConvert(messages);
+ return new Result(messagesConvert(messages));
}
}
@Override
- public List getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+ public Result getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
if(isFlatMessage()){
List messages = getConnector().getFlatListWithoutAck(timeout, unit);
- return flatMessagesConvert(messages);
+ return new Result(flatMessagesConvert(messages));
}else{
List messages = getConnector().getListWithoutAck(timeout, unit);
- return messagesConvert(messages);
+ return new Result(messagesConvert(messages));
}
}
@@ -145,7 +153,7 @@ protected List flatMessagesConvert(final List message
List result = new ArrayList<>(messages.size());
for(FlatMessage flatMessage : messages){
- //result.addAll(getMessageTransponder().convert(flatMessage));
+ result.addAll(getMessageConverter().convert(configuration.getDestination(), flatMessage));
}
return result;
diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java
index 66fc599..a5d770a 100644
--- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java
+++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/AdapterClient.java
@@ -25,11 +25,10 @@
package com.buession.canal.client.adapter;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Configuration;
+import com.buession.canal.core.Result;
import com.buession.canal.core.convert.MessageConverter;
-import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -85,12 +84,12 @@ public interface AdapterClient {
* @param unit
* 超时时长单位
*
- * @return 数据列表
+ * @return 数据结果
*
* @throws CanalClientException
* Canal 客户端异常
*/
- List getList(Long timeout, TimeUnit unit) throws CanalClientException;
+ Result getList(Long timeout, TimeUnit unit) throws CanalClientException;
/**
* 获取数据,设置 timeout 时间直到拿到数据为止
@@ -105,12 +104,12 @@ public interface AdapterClient {
* @param unit
* 超时时长单位
*
- * @return 数据列表
+ * @return 数据结果
*
* @throws CanalClientException
* Canal 客户端异常
*/
- List getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException;
+ Result getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException;
/**
* 进行 batch id 的消费确认,确认之后,小于等于此 batchId 的 Message 都会被确认
diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/TcpAdapterClient.java b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/TcpAdapterClient.java
index a6c0173..52c0600 100644
--- a/buession-canal-client/src/main/java/com/buession/canal/client/adapter/TcpAdapterClient.java
+++ b/buession-canal-client/src/main/java/com/buession/canal/client/adapter/TcpAdapterClient.java
@@ -30,8 +30,8 @@
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Configuration;
+import com.buession.canal.core.Result;
import com.buession.core.converter.mapper.PropertyMapper;
import com.buession.core.utils.StringUtils;
import com.buession.core.validator.Validate;
@@ -146,15 +146,15 @@ public TcpAdapterClient(final String server, final String zkServers, final Strin
}
@Override
- public List getList(Long timeout, TimeUnit unit) throws CanalClientException {
+ public Result getList(Long timeout, TimeUnit unit) throws CanalClientException {
Message message = getConnector().get(configuration.getBatchSize(), timeout, unit);
- return messagesConvert(message);
+ return new Result(message.getId(), messagesConvert(message));
}
@Override
- public List getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
+ public Result getListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
Message message = getConnector().getWithoutAck(configuration.getBatchSize(), timeout, unit);
- return messagesConvert(message);
+ return new Result(message.getId(), messagesConvert(message));
}
protected static CanalConnector createCanalConnector(final String server, final String zkServers,
diff --git a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java
index 895641b..4e00dc5 100644
--- a/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java
+++ b/buession-canal-client/src/main/java/com/buession/canal/client/dispatcher/AbstractDispatcher.java
@@ -27,6 +27,7 @@
import com.buession.canal.client.adapter.AdapterClient;
import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Configuration;
+import com.buession.canal.core.Result;
import com.buession.canal.core.listener.EventListenerMethod;
import com.buession.canal.core.listener.EventListenerRegistry;
import com.buession.canal.core.listener.support.DestinationArgumentResolver;
@@ -35,7 +36,10 @@
import com.buession.canal.core.listener.support.EventListenerArgumentResolverComposite;
import com.buession.canal.core.listener.support.EventTypeArgumentResolver;
import com.buession.canal.core.listener.support.HeaderArgumentResolver;
+import com.buession.canal.core.listener.support.RowArgumentResolver;
+import com.buession.canal.core.listener.support.RowArrayArgumentResolver;
import com.buession.canal.core.listener.support.RowChangeArgumentResolver;
+import com.buession.canal.core.listener.support.RowCollectionArgumentResolver;
import com.buession.canal.core.listener.support.RowDataArgumentResolver;
import com.buession.canal.core.listener.support.RowDataArrayArgumentResolver;
import com.buession.canal.core.listener.support.RowDataCollectionArgumentResolver;
@@ -60,8 +64,6 @@ public abstract class AbstractDispatcher implements Dispatcher {
private final EventListenerArgumentResolverComposite argumentResolvers = new EventListenerArgumentResolverComposite();
- private volatile boolean running = true;
-
private final Logger logger = LoggerFactory.getLogger(getClass());
public AbstractDispatcher() {
@@ -75,24 +77,25 @@ public EventListenerRegistry getEventListenerRegistry() {
@Override
public void dispatch(AdapterClient adapterClient) {
Configuration configuration = adapterClient.getConfiguration();
- while(running){
- try{
- List messages = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(),
- TimeUnit.MILLISECONDS);
-
- if(messages != null){
- for(CanalMessage message : messages){
- doDispatch(message);
- }
+
+ try{
+ Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ if(result != null && result.getMessages() != null){
+ for(CanalMessage message : result.getMessages()){
+ doDispatch(message);
}
+ }
+ if(result != null && result.getId() > -1){
+ adapterClient.ack(result.getId());
+ }else{
adapterClient.ack();
- }catch(Exception e){
- logger.error("Message handle error", e);
}
+ }catch(Exception e){
+ logger.error("Message handle error", e);
}
-
- running = false;
}
protected void doDispatch(final CanalMessage canalMessage) throws Exception {
@@ -109,7 +112,7 @@ protected void doDispatch(final CanalMessage canalMessage) throws Exception {
protected abstract EventListenerMethod findMethod(final CanalMessage canalMessage);
protected static List getDefaultArgumentResolvers() {
- return ListBuilder.create(10)
+ return ListBuilder.create(13)
.add(new DestinationArgumentResolver())
.add(new EntryTypeArgumentResolver())
.add(new EventTypeArgumentResolver())
@@ -118,6 +121,9 @@ protected static List getDefaultArgumentResolvers
.add(new RowDataArgumentResolver())
.add(new RowDataCollectionArgumentResolver())
.add(new RowDataArrayArgumentResolver())
+ .add(new RowArgumentResolver())
+ .add(new RowCollectionArgumentResolver())
+ .add(new RowArrayArgumentResolver())
.add(new SchemaArgumentResolver())
.add(new TableArgumentResolver())
.build();
diff --git a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java
index 87c3693..70140b3 100644
--- a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java
+++ b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/KafkaAdapterClientTest.java
@@ -24,10 +24,9 @@
*/
package com.buession.canal.client.adapter;
-import com.buession.canal.core.CanalMessage;
+import com.buession.canal.core.Result;
import org.junit.Test;
-import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -45,8 +44,8 @@ public class KafkaAdapterClientTest {
@Test
public void getList() {
- List messages = client.getList(5L, TimeUnit.SECONDS);
- System.out.println(messages);
+ Result result = client.getList(5L, TimeUnit.SECONDS);
+ System.out.println(result);
}
}
diff --git a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java
index 4c9aea4..3f44a6c 100644
--- a/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java
+++ b/buession-canal-client/src/test/java/com/buession/canal/client/adapter/TcpAdapterClientTest.java
@@ -25,6 +25,7 @@
package com.buession.canal.client.adapter;
import com.buession.canal.core.CanalMessage;
+import com.buession.canal.core.Result;
import org.junit.Test;
import java.util.List;
@@ -45,8 +46,8 @@ public class TcpAdapterClientTest {
@Test
public void getList() {
- List messages = client.getList(5L, TimeUnit.SECONDS);
- System.out.println(messages);
+ Result result = client.getList(5L, TimeUnit.SECONDS);
+ System.out.println(result);
}
}