Skip to content

Commit

Permalink
Merge pull request #4 from buession/0.0.x
Browse files Browse the repository at this point in the history
Release 0.0.2
  • Loading branch information
eduosi authored Dec 27, 2023
2 parents 1ce8a9d + c5b78fb commit e693b7f
Show file tree
Hide file tree
Showing 28 changed files with 139 additions and 235 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,34 @@
===========================


## [0.0.2](https://github.com/buession/buession-canal/releases/tag/v0.0.2) (2023-12-27)

### 🔨依赖升级

- [依赖库版本升级和安全漏洞修复](https://github.com/buession/buession-parent/releases/tag/v2.3.2)


### 🐞 Bug 修复

- **buession-canal-core:** 修复 buession-beans bean 转换导致的数据丢失的 BUG
- **buession-canal-core:** 修复 @CanalEventListener 仅指定 schema 或 table 服务映射方法的 BUG
- **buession-canal-core:** 修复无法获取 @CanalEventListener schema 和 table 的 BUG
- **buession-canal-core:** 修复线程池线程序号错误的 BUG
- **buession-canal-core:** 修复 Table 设置数据库名和表名错误的 BUG
- **buession-canal-client:** 修复多实例下,实例不执行的 BUG
- **buession-canal-spring:** 修复注解 CanalBinding 无法重复定义 destination 的 BUG


### ⏪ 优化

- **buession-canal-core:** 代码优化
- **buession-canal-spring:** 代码优化
- **buession-canal-spring:** 优化 CanalClientFactoryBean 多次调用 afterPropertiesSet 时,重复初始化 CanalClient


---


## [0.0.1](https://github.com/buession/buession-canal/releases/tag/v0.0.1) (2023-11-19)

### 🔨依赖升级
Expand Down
2 changes: 1 addition & 1 deletion buession-canal-annotation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.buession.canal</groupId>
<artifactId>buession-canal-parent</artifactId>
<relativePath>../buession-canal-parent</relativePath>
<version>0.0.1</version>
<version>0.0.2</version>
</parent>
<artifactId>buession-canal-annotation</artifactId>
<url>https://canal.buession.com/</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
Expand All @@ -41,7 +40,6 @@
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface CanalEventListener {

Expand Down
2 changes: 1 addition & 1 deletion buession-canal-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.buession.canal</groupId>
<artifactId>buession-canal-parent</artifactId>
<relativePath>../buession-canal-parent</relativePath>
<version>0.0.1</version>
<version>0.0.2</version>
</parent>
<artifactId>buession-canal-client</artifactId>
<url>https://canal.buession.com/</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ public abstract class AbstractCanalClient implements CanalClient {
*/
private final Dispatcher dispatcher;

private volatile boolean running = false;

private final Logger logger = LoggerFactory.getLogger(getClass());

/**
Expand Down Expand Up @@ -100,20 +98,13 @@ public void start() {
adapterClient.init();
process(adapterClient, dispatcher, executor);
});

running = true;
}

@Override
public void stop() {
logger.info("CanalClient stopping...");
context.getAdapterClients().forEach(AdapterClient::close);
executor.shutdown();
running = false;
}

@Override
public boolean isRunning() {
return running;
}

protected abstract void process(final AdapterClient adapterClient, final Dispatcher dispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public interface CanalClient {
*
* @return true / false
*/
@Deprecated
default boolean isRunning() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ public DefaultCanalClient(final CanalContext context, final Dispatcher dispatche
@Override
protected void process(final AdapterClient adapterClient, final Dispatcher dispatcher,
final ExecutorService executor) {
executor.submit(()->{
while(isRunning()){
dispatcher.dispatch(adapterClient);
}
});
executor.submit(()->dispatcher.dispatch(adapterClient));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public abstract class AbstractAdapterClient<C extends CanalConnector> implements
@SuppressWarnings({"rawtypes"})
private MessageConverter messageConverter = new DefaultMessageConverter();

/**
* 是否在运行
*
* @since 0.0.2
*/
private volatile boolean running = false;

/**
* 构造函数
*
Expand Down Expand Up @@ -123,6 +130,7 @@ public void init() throws CanalClientException {

// 回滚到未进行 ack 的地方,下次 fetch 时,可以从最后一个没有 ack 的位置获取数据
connector.rollback();
running = true;
}

@Override
Expand All @@ -145,8 +153,14 @@ public void rollback() throws CanalClientException {
connector.rollback();
}

@Override
public boolean isRunning() {
return running;
}

@Override
public void close() throws CanalClientException {
running = false;
connector.unsubscribe();
connector.disconnect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ public interface AdapterClient {
*/
void rollback() throws CanalClientException;

/**
* 判断 canal 客户端是否是开启状态
*
* @return true / false
*
* @since 0.0.1
*/
boolean isRunning();

/**
* 关闭客户端
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@
import com.buession.canal.core.listener.support.EventTypeArgumentResolver;
import com.buession.canal.core.listener.support.HeaderArgumentResolver;
import com.buession.canal.core.listener.support.RowArgumentResolver;
import com.buession.canal.core.listener.support.RowArrayArgumentResolver;
import com.buession.canal.core.listener.support.RowChangeArgumentResolver;
import com.buession.canal.core.listener.support.RowCollectionArgumentResolver;
import com.buession.canal.core.listener.support.RowDataArgumentResolver;
import com.buession.canal.core.listener.support.RowDataArrayArgumentResolver;
import com.buession.canal.core.listener.support.RowDataCollectionArgumentResolver;
Expand Down Expand Up @@ -78,23 +76,29 @@ public EventListenerRegistry getEventListenerRegistry() {
public void dispatch(AdapterClient adapterClient) {
Configuration configuration = adapterClient.getConfiguration();

try{
Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(),
TimeUnit.MILLISECONDS);
while(adapterClient.isRunning()){
try{
Result result = adapterClient.getListWithoutAck(configuration.getTimeout().toMillis(),
TimeUnit.MILLISECONDS);

if(result != null && result.getMessages() != null){
for(CanalMessage message : result.getMessages()){
doDispatch(message);
if(logger.isDebugEnabled()){
logger.debug("Return {} messages.", result == null ? 0 : result.getMessages().size());
}

if(result != null && result.getMessages() != null){
for(CanalMessage message : result.getMessages()){
doDispatch(message);
}
}
}

if(result != null && result.getId() > -1){
adapterClient.ack(result.getId());
}else{
adapterClient.ack();
if(result != null && result.getId() > -1){
adapterClient.ack(result.getId());
}else{
adapterClient.ack();
}
}catch(Exception e){
logger.error("Message handle error", e);
}
}catch(Exception e){
logger.error("Message handle error", e);
}
}

Expand All @@ -112,7 +116,7 @@ protected void doDispatch(final CanalMessage canalMessage) throws Exception {
protected abstract EventListenerMethod findMethod(final CanalMessage canalMessage);

protected static List<EventListenerArgumentResolver> getDefaultArgumentResolvers() {
return ListBuilder.<EventListenerArgumentResolver>create(13)
return ListBuilder.<EventListenerArgumentResolver>create(11)
.add(new DestinationArgumentResolver())
.add(new EntryTypeArgumentResolver())
.add(new EventTypeArgumentResolver())
Expand All @@ -122,8 +126,6 @@ protected static List<EventListenerArgumentResolver> getDefaultArgumentResolvers
.add(new RowDataCollectionArgumentResolver())
.add(new RowDataArrayArgumentResolver())
.add(new RowArgumentResolver())
.add(new RowCollectionArgumentResolver())
.add(new RowArrayArgumentResolver())
.add(new SchemaArgumentResolver())
.add(new TableArgumentResolver())
.build();
Expand Down
2 changes: 1 addition & 1 deletion buession-canal-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.buession.canal</groupId>
<artifactId>buession-canal-parent</artifactId>
<relativePath>../buession-canal-parent</relativePath>
<version>0.0.1</version>
<version>0.0.2</version>
</parent>
<artifactId>buession-canal-core</artifactId>
<url>https://canal.buession.com/</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

/**
Expand Down Expand Up @@ -183,13 +182,14 @@ public void setData(List<CanalEntry.Column> data) {
@Override
public String toString() {
return new StringJoiner(", ", "CanalMessage[", "]")
.add("destination='" + destination + "'")
.add("destination=" + destination)
.add("table=" + table)
.add("entryType=" + entryType)
.add("eventType=" + eventType)
.add("isDdl=" + isDdl)
.add("header=" + header)
.add("rowChange=" + rowChange)
.add("data=" + data)
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class DefaultCanalThreadFactory implements ThreadFactory {

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(0);

private final String namePrefix;

Expand All @@ -57,7 +57,7 @@ public Thread newThread(@NonNull Runnable runnable) {
if(thread.getPriority() != Thread.NORM_PRIORITY){
thread.setPriority(Thread.NORM_PRIORITY);
}

thread.setDaemon(true);

return thread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ private CanalMessage doParseEntry(final String destination, final CanalEntry.Ent
final List<CanalEntry.Column> data = new ArrayList<>(rowChange.getRowDatasList().size());

rowChange.getRowDatasList().forEach((rowData)->data.addAll(rowData.getAfterColumnsList()));

final CanalMessage canalMessage = new CanalMessage();

canalMessage.setDestination(destination);
canalMessage.setTable(new Table(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()));
canalMessage.setTable(new Table(entry.getHeader().getTableName(), entry.getHeader().getSchemaName()));
canalMessage.setEntryType(entry.getEntryType());
canalMessage.setEventType(entry.getHeader().getEventType());
canalMessage.setHeader(entry.getHeader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private CanalMessage doParseEntry(final String destination, final FlatMessage me
}

canalMessage.setDestination(destination);
canalMessage.setTable(new Table(message.getDatabase(), message.getTable()));
canalMessage.setTable(new Table(message.getTable(), message.getDatabase()));
canalMessage.setEventType(CanalEntry.EventType.valueOf(message.getType()));
canalMessage.setData(columns);
canalMessage.setDdl(message.getIsDdl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Object invoke(final CanalMessage canalMessage) throws Exception {
return doInvoke(args);
}

protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) throws Exception {
private Object[] getMethodArgumentValues(final CanalMessage canalMessage) throws Exception {
MethodParameter[] parameters = getParameters();

if(Validate.isEmpty(parameters)){
Expand All @@ -123,9 +123,8 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro
for(int i = 0; i < parameters.length; i++){
MethodParameter parameter = parameters[i];

if(argumentResolvers.supports(parameter) == false){
throw new IllegalStateException(formatArgumentError(parameter, "No suitable resolver"));
}
Assert.isFalse(argumentResolvers.supports(parameter),
()->new IllegalStateException(formatArgumentError(parameter, "No suitable resolver")));

try{
args[i] = argumentResolvers.resolve(parameter, canalMessage);
Expand All @@ -143,7 +142,7 @@ protected Object[] getMethodArgumentValues(final CanalMessage canalMessage) thro
return args;
}

protected Object doInvoke(Object... args) throws Exception {
private Object doInvoke(Object... args) throws Exception {
ReflectionUtils.makeAccessible(getBridgedMethod());
return getBridgedMethod().invoke(getTarget(), args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,8 @@ public boolean equals(Object obj) {
if(obj instanceof MethodParameter){
MethodParameter that = (MethodParameter) obj;
return Objects.equals(name, that.getName()) && Objects.equals(parameter, that.getParameter()) &&
Objects.equals(getParameterType(), that.getParameterType()) &&
Objects.equals(type, that.getType()) &&
Arrays.equals(getAnnotations(), that.getAnnotations());
Objects.equals(getParameterType(), that.getParameterType()) && Objects.equals(type,
that.getType()) && Arrays.equals(getAnnotations(), that.getAnnotations());
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.buession.canal.core.CanalMessage;
import com.buession.canal.core.listener.MethodParameter;
import com.buession.core.utils.Assert;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -72,10 +73,8 @@ public boolean supports(final MethodParameter parameter) {
@Override
public Object resolve(final MethodParameter parameter, final CanalMessage canalMessage) throws Exception {
EventListenerArgumentResolver resolver = getArgumentResolver(parameter);
if(resolver == null){
throw new IllegalArgumentException("Unsupported parameter type [" +
parameter.getType().getName() + "]. supports should be called first.");
}
Assert.isNull(resolver, ()->new IllegalArgumentException("Unsupported parameter type [" +
parameter.getType().getName() + "]. supports should be called first."));

return resolver.resolve(parameter, canalMessage);
}
Expand Down
Loading

0 comments on commit e693b7f

Please sign in to comment.