Skip to content

Commit

Permalink
【ALL】消息转换
Browse files Browse the repository at this point in the history
  • Loading branch information
yong.teng committed Nov 16, 2023
1 parent afdd070 commit d1ff76f
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Canal MQ 适配器抽象类
Expand Down
6 changes: 6 additions & 0 deletions buession-canal-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>io.github.shuigedeng</groupId>
<artifactId>taotao-cloud-starter-canal</artifactId>
<version>2023.11</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class CanalMessage implements Serializable {

private CanalEntry.RowChange rowChange;

private Object data;
private List<CanalEntry.Column> data;

/**
* 返回指令
Expand Down Expand Up @@ -172,11 +172,11 @@ public void setRowChange(CanalEntry.RowChange rowChange) {
this.rowChange = rowChange;
}

public Object getData() {
public List<CanalEntry.Column> getData() {
return data;
}

public void setData(Object data) {
public void setData(List<CanalEntry.Column> data) {
this.data = data;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Table;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -60,6 +61,10 @@ private CanalMessage doParseEntry(final String destination, final CanalEntry.Ent
throw new RuntimeException("parse event has an error, data: " + entry, e);
}

final List<CanalEntry.Column> data = new ArrayList<>(rowChange.getRowDatasList().size());

rowChange.getRowDatasList().forEach((rowData)->data.addAll(rowData.getAfterColumnsList()));

final CanalMessage canalMessage = new CanalMessage();

canalMessage.setDestination(destination);
Expand All @@ -68,7 +73,7 @@ private CanalMessage doParseEntry(final String destination, final CanalEntry.Ent
canalMessage.setEventType(entry.getHeader().getEventType());
canalMessage.setHeader(entry.getHeader());
canalMessage.setRowChange(rowChange);
canalMessage.setData(rowChange.getRowDatasList());
canalMessage.setData(data);
canalMessage.setDdl(rowChange.getIsDdl());

return canalMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Table;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -50,11 +51,23 @@ public List<CanalMessage> convert(final String destination, final FlatMessage me
private CanalMessage doParseEntry(final String destination, final FlatMessage message,
final Map<String, String> row) {
final CanalMessage canalMessage = new CanalMessage();
final List<CanalEntry.Column> columns = new ArrayList<>(row.size());
int i = 0;

for(Map.Entry<String, String> e : row.entrySet()){
CanalEntry.Column.Builder builder = CanalEntry.Column.newBuilder();

builder.setIndex(i++);
builder.setName(e.getKey());
builder.setValue(e.getValue());

columns.add(builder.build());
}

canalMessage.setDestination(destination);
canalMessage.setTable(new Table(message.getDatabase(), message.getTable()));
canalMessage.setEventType(CanalEntry.EventType.valueOf(message.getType()));
canalMessage.setData(row);
canalMessage.setData(columns);
canalMessage.setDdl(message.getIsDdl());

return canalMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
package com.buession.canal.core.listener.support;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.buession.beans.BeanConverter;
import com.buession.beans.BeanUtils;
import com.buession.beans.DefaultBeanConverter;
Expand All @@ -33,6 +34,8 @@
import com.buession.canal.core.listener.MethodParameter;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -54,16 +57,22 @@ public Object resolve(MethodParameter parameter, final CanalMessage canalMessage
return null;
}

Object data = canalMessage.getData();
List<CanalEntry.Column> data = canalMessage.getData();
Map<String, Object> resultMap = new HashMap<>(data.size());

data.forEach((row)->{
resultMap.put(row.getName(), row.getValue());
});

if(Map.class.isAssignableFrom(parameter.getParameterType())){
return data;
return resultMap;
}else{
final DefaultBeanConverter beanConverter = new DefaultBeanConverter();
final Object target = BeanUtils.instantiateClass(parameter.getParameterType());

beanConverter.registerConverter(Date.class, new DatePropertyConverter("yyyy-MM-dd HH:mm:ss"));

return beanConverter.convert(data, target);
return beanConverter.convert(resultMap, target);
}
}

Expand Down

0 comments on commit d1ff76f

Please sign in to comment.