Skip to content

Commit

Permalink
【core】数据转换优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yong.teng committed Nov 16, 2023
1 parent 1edc221 commit 79181b8
Show file tree
Hide file tree
Showing 18 changed files with 228 additions and 139 deletions.
5 changes: 5 additions & 0 deletions buession-canal-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>buession-core</artifactId>
<version>${buession.version}</version>
</dependency>
<dependency>
<groupId>com.buession</groupId>
<artifactId>buession-beans</artifactId>
<version>${buession.version}</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

/**
* 消息
*
* @author Yong.Teng
* @since 0.0.1
*/
Expand Down Expand Up @@ -67,6 +70,8 @@ public class CanalMessage implements Serializable {

private CanalEntry.RowChange rowChange;

private Object data;

/**
* 返回指令
*
Expand Down Expand Up @@ -167,4 +172,25 @@ public void setRowChange(CanalEntry.RowChange rowChange) {
this.rowChange = rowChange;
}

public Object getData() {
return data;
}

public void setData(Object data) {
this.data = data;
}

@Override
public String toString() {
return new StringJoiner(", ", "CanalMessage[", "]")
.add("destination='" + destination + "'")
.add("table=" + table)
.add("entryType=" + entryType)
.add("eventType=" + eventType)
.add("isDdl=" + isDdl)
.add("header=" + header)
.add("rowChange=" + rowChange)
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void setTimeout(Duration timeout) {

@Override
public String toString() {
return new StringJoiner(", ", Configuration.class.getSimpleName() + "[", "]")
return new StringJoiner(", ", "Configuration[", "]")
.add("destination='" + destination + "'")
.add("filter='" + filter + "'")
.add("batchSize=" + batchSize)
Expand Down
102 changes: 99 additions & 3 deletions buession-canal-core/src/main/java/com/buession/canal/core/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,106 @@
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/package com.buession.canal.core;/**
*
*/
package com.buession.canal.core;

import java.io.Serializable;
import java.util.List;
import java.util.StringJoiner;

/**
* 结果
*
* @author Yong.Teng
* @since 0.0.1
*/public class Result {
*/
public class Result implements Serializable {

private final static long serialVersionUID = 5375957586654168072L;

/**
* ID
*/
private long id;

/**
* 消息列表
*/
private List<CanalMessage> messages;

/**
* 构造函数
*/
public Result() {
}

/**
* 构造函数
*
* @param messages
* 消息列表
*/
public Result(List<CanalMessage> messages) {
this(-1, messages);
}

/**
* 构造函数
*
* @param id
* ID
* @param messages
* 消息列表
*/
public Result(long id, List<CanalMessage> messages) {
this.id = id;
this.messages = messages;
}

/**
* 返回 ID
*
* @return ID
*/
public long getId() {
return id;
}

/**
* 设置 ID
*
* @param id
* ID
*/
public void setId(long id) {
this.id = id;
}

/**
* 返回消息列表
*
* @return 消息列表
*/
public List<CanalMessage> getMessages() {
return messages;
}

/**
* 设置消息列表
*
* @param messages
* 消息列表
*/
public void setMessages(List<CanalMessage> messages) {
this.messages = messages;
}

@Override
public String toString() {
return new StringJoiner(", ", "Result[", "]")
.add("id=" + id)
.add("messages=" + messages)
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ public class DefaultMessageConverter extends AbstractMessageConverter<Message> {
private final List<CanalEntry.EntryType> ignoreEntryTypes = getIgnoreEntryTypes();

@Override
public List<CanalMessage> convert(final Message message) {
public List<CanalMessage> convert(final String destination, final Message message) {
List<CanalEntry.Entry> entries = message.getEntries();

return entries.stream()
.filter((entry)->ignoreEntryTypes.stream().anyMatch(t->entry.getEntryType() == t) == false)
.map(this::doParseEntry).collect(Collectors.toList());
.map((entry)->this.doParseEntry(destination, entry)).collect(Collectors.toList());
}

private CanalMessage doParseEntry(final CanalEntry.Entry entry) {
private CanalMessage doParseEntry(final String destination, final CanalEntry.Entry entry) {
CanalEntry.RowChange rowChange;
try{
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
Expand All @@ -62,11 +62,13 @@ private CanalMessage doParseEntry(final CanalEntry.Entry entry) {

final CanalMessage canalMessage = new CanalMessage();

canalMessage.setDestination(destination);
canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()));
canalMessage.setEntryType(entry.getEntryType());
canalMessage.setEventType(entry.getHeader().getEventType());
canalMessage.setHeader(entry.getHeader());
canalMessage.setRowChange(rowChange);
canalMessage.setData(rowChange.getRowDatasList());
canalMessage.setDdl(rowChange.getIsDdl());

return canalMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
package com.buession.canal.core.convert;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.Table;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand All @@ -39,43 +39,25 @@
* @author Yong.Teng
* @since 0.0.1
*/
public class DefaultMessageConverter extends AbstractMessageConverter<Message> {

private final List<CanalEntry.EntryType> ignoreEntryTypes = getIgnoreEntryTypes();
public class FlatMessageConverter extends AbstractMessageConverter<FlatMessage> {

@Override
public List<CanalMessage> convert(final Message message) {
List<CanalEntry.Entry> entries = message.getEntries();

return entries.stream()
.filter((entry)->ignoreEntryTypes.stream().anyMatch(t->entry.getEntryType() == t) == false)
.map(this::doParseEntry).collect(Collectors.toList());
public List<CanalMessage> convert(final String destination, final FlatMessage message) {
return message.getData().stream().map((row)->this.doParseEntry(destination, message, row))
.collect(Collectors.toList());
}

private CanalMessage doParseEntry(final CanalEntry.Entry entry) {
CanalEntry.RowChange rowChange;
try{
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
}catch(Exception e){
throw new RuntimeException("parse event has an error, data: " + entry, e);
}

private CanalMessage doParseEntry(final String destination, final FlatMessage message,
final Map<String, String> row) {
final CanalMessage canalMessage = new CanalMessage();

canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()));
canalMessage.setEntryType(entry.getEntryType());
canalMessage.setEventType(entry.getHeader().getEventType());
canalMessage.setHeader(entry.getHeader());
canalMessage.setRowChange(rowChange);
canalMessage.setDdl(rowChange.getIsDdl());
canalMessage.setDestination(destination);
canalMessage.setTable(new Table(message.getDatabase(), message.getTable()));
canalMessage.setEventType(CanalEntry.EventType.valueOf(message.getType()));
canalMessage.setData(row);
canalMessage.setDdl(message.getIsDdl());

return canalMessage;
}

@Override
protected List<CanalEntry.EntryType> getIgnoreEntryTypes() {
return Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, CanalEntry.EntryType.TRANSACTIONEND,
CanalEntry.EntryType.HEARTBEAT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.buession.canal.core.convert;

import com.buession.canal.core.CanalMessage;
import com.buession.core.converter.Converter;

import java.util.List;

Expand All @@ -39,17 +38,18 @@
* @since 0.0.1
*/
@FunctionalInterface
public interface MessageConverter<M> extends Converter<M, List<CanalMessage>> {
public interface MessageConverter<M> {

/**
* 将原始消息转换成 {@link CanalMessage}
*
* @param destination
* 指令
* @param message
* 原始消息
*
* @return {@link CanalMessage}
*/
@Override
List<CanalMessage> convert(final M message);
List<CanalMessage> convert(final String destination, final M message);

}
Loading

0 comments on commit 79181b8

Please sign in to comment.