Skip to content

Commit

Permalink
【core】命名优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yong.teng committed Nov 12, 2023
1 parent 50a1b9e commit 523064a
Show file tree
Hide file tree
Showing 22 changed files with 402 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
*/
package com.buession.canal.client;

import com.buession.canal.client.handler.DefaultMessageHandler;
import com.buession.canal.client.handler.MessageHandlerFactory;
import com.buession.canal.core.binding.CanalBinding;
import com.buession.canal.client.adapter.AdapterClient;
import com.buession.canal.client.dispatcher.Dispatcher;
import com.buession.canal.core.concurrent.DefaultCanalThreadPoolExecutor;
import com.buession.core.utils.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutorService;

/**
Expand All @@ -44,16 +42,19 @@
public abstract class AbstractCanalClient implements CanalClient {

/**
* {@link Binder} 列表
* {@link CanalContext}
*/
private final List<Binder> binders;
private final CanalContext context;

/**
* {@link ExecutorService}
*/
private final ExecutorService executor;

private final MessageHandlerFactory messageHandlerFactory;
/**
* 分发器
*/
private final Dispatcher dispatcher;

private volatile boolean running = false;

Expand All @@ -62,37 +63,43 @@ public abstract class AbstractCanalClient implements CanalClient {
/**
* 构造函数
*
* @param binders
* {@link Binder} 列表
* @param context
* {@link CanalContext}
* @param dispatcher
* 分发器
*/
public AbstractCanalClient(final List<Binder> binders) {
this(binders, new DefaultCanalThreadPoolExecutor());
public AbstractCanalClient(final CanalContext context, final Dispatcher dispatcher) {
this(context, dispatcher, new DefaultCanalThreadPoolExecutor());
}

/**
* 构造函数
*
* @param binders
* {@link Binder} 列表
* {@link CanalBinding} 列表
* @param context
* {@link CanalContext}
* @param dispatcher
* 分发器
* @param executor
* {@link ExecutorService}
*/
public AbstractCanalClient(final List<Binder> binders, final ExecutorService executor) {
Assert.isNull(binders, "The Binder cloud not be null");
Assert.isNull(executor, "The ExecutorService cloud not be null");
this.binders = binders;
public AbstractCanalClient(final CanalContext context, final Dispatcher dispatcher,
final ExecutorService executor) {
Assert.isNull(context, "The CanalContext is required");
Assert.isNull(dispatcher, "The Dispatcher is required");
Assert.isNull(executor, "The ExecutorService is required");
this.context = context;
this.dispatcher = dispatcher;
this.executor = executor;
this.messageHandlerFactory = DefaultMessageHandler::new;
}

@Override
public void start() {
logger.info("CanalClient starting...");

for(Binder binder : binders){
process(binder, messageHandlerFactory, executor);
}
context.getAdapterClients().forEach((adapterClient)->{
adapterClient.init();
process(adapterClient, dispatcher, executor);
});

running = true;
}
Expand All @@ -109,7 +116,7 @@ public boolean isRunning() {
return running;
}

protected abstract void process(final Binder binder, final MessageHandlerFactory messageHandlerFactory,
protected abstract void process(final AdapterClient adapterClient, final Dispatcher dispatcher,
final ExecutorService executor);

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,26 @@
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/package com.buession.canal.client;/**
*
*/
package com.buession.canal.client;

import com.buession.canal.client.adapter.AdapterClient;

import java.util.Set;

/**
* Canal 上下文
*
* @author Yong.Teng
* @since 0.0.1
*/public interface CanalContext {
*/
public interface CanalContext {

/**
* 返回所有 Canal 适配器
*
* @return 所有 Canal 适配器
*/
Set<AdapterClient> getAdapterClients();

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
*/
package com.buession.canal.client;

import com.buession.canal.client.handler.MessageHandlerFactory;
import com.buession.canal.client.adapter.AdapterClient;
import com.buession.canal.client.dispatcher.Dispatcher;

import java.util.List;
import java.util.concurrent.ExecutorService;

/**
Expand All @@ -40,29 +40,37 @@ public class DefaultCanalClient extends AbstractCanalClient {
/**
* 构造函数
*
* @param binders
* {@link Binder} 列表
* @param context
* {@link CanalContext}
* @param dispatcher
* 分发器
*/
public DefaultCanalClient(final List<Binder> binders) {
super(binders);
public DefaultCanalClient(final CanalContext context, final Dispatcher dispatcher) {
super(context, dispatcher);
}

/**
* 构造函数
*
* @param binders
* {@link Binder} 列表
* @param context
* {@link CanalContext}
* @param dispatcher
* 分发器
* @param executor
* {@link ExecutorService}
*/
public DefaultCanalClient(final List<Binder> binders, final ExecutorService executor) {
super(binders, executor);
public DefaultCanalClient(final CanalContext context, final Dispatcher dispatcher, final ExecutorService executor) {
super(context, dispatcher, executor);
}

@Override
protected void process(final Binder binder, final MessageHandlerFactory messageHandlerFactory,
protected void process(final AdapterClient adapterClient, final Dispatcher dispatcher,
final ExecutorService executor) {
executor.submit(messageHandlerFactory.newHandler(binder));
executor.submit(()->{
if(isRunning()){
dispatcher.dispatch(adapterClient, 5);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,39 @@
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/package com.buession.canal.client;/**
*
*/
package com.buession.canal.client;

import com.buession.canal.client.adapter.AdapterClient;

import java.util.Set;

/**
* Canal 上下文
*
* @author Yong.Teng
* @since 0.0.1
*/public class DefaultCanalContext {
*/
public class DefaultCanalContext implements CanalContext {

/**
* Canal 适配器
*/
private final Set<AdapterClient> adapterClients;

/**
* 构造函数
*
* @param adapterClients
* Canal 适配器
*/
public DefaultCanalContext(final Set<AdapterClient> adapterClients) {
this.adapterClients = adapterClients;
}

@Override
public Set<AdapterClient> getAdapterClients() {
return adapterClients;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* @author Yong.Teng
* @since 0.0.1
*/
public abstract class AbstractCanalAdapterClient<C extends CanalConnector> implements CanalAdapterClient {
public abstract class AbstractAdapterClient<C extends CanalConnector> implements AdapterClient {

/**
* Canal 数据操作客户端
Expand Down Expand Up @@ -81,7 +81,7 @@ public abstract class AbstractCanalAdapterClient<C extends CanalConnector> imple
* @param destination
* 指令
*/
public AbstractCanalAdapterClient(final C connector, final String destination) {
public AbstractAdapterClient(final C connector, final String destination) {
this(connector, destination, 1);
}

Expand All @@ -95,7 +95,7 @@ public AbstractCanalAdapterClient(final C connector, final String destination) {
* @param batchSize
* 批处理条数
*/
public AbstractCanalAdapterClient(final C connector, final String destination, final int batchSize) {
public AbstractAdapterClient(final C connector, final String destination, final int batchSize) {
Assert.isNull(connector, "CanalConnector cloud not be null.");
this.connector = connector;
this.destination = destination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
* @author Yong.Teng
* @since 0.0.1
*/
public abstract class AbstractCanalMqAdapterClient<C extends CanalMQConnector> extends AbstractCanalAdapterClient<C>
implements CanalMqAdapterClient {
public abstract class AbstractMqAdapterClient<C extends CanalMQConnector> extends AbstractAdapterClient<C>
implements MqAdapterClient {

private final boolean flatMessage;

Expand All @@ -56,7 +56,7 @@ public abstract class AbstractCanalMqAdapterClient<C extends CanalMQConnector> e
* @param destination
* 指令
*/
public AbstractCanalMqAdapterClient(final C connector, final String destination) {
public AbstractMqAdapterClient(final C connector, final String destination) {
this(connector, destination, DEFAULT_FLAT_MESSAGE);
}

Expand All @@ -70,7 +70,7 @@ public AbstractCanalMqAdapterClient(final C connector, final String destination)
* @param batchSize
* 批处理条数
*/
public AbstractCanalMqAdapterClient(final C connector, final String destination, final int batchSize) {
public AbstractMqAdapterClient(final C connector, final String destination, final int batchSize) {
this(connector, destination, batchSize, DEFAULT_FLAT_MESSAGE);
}

Expand All @@ -84,7 +84,7 @@ public AbstractCanalMqAdapterClient(final C connector, final String destination,
* @param flatMessage
* true / false
*/
public AbstractCanalMqAdapterClient(final C connector, final String destination, final boolean flatMessage) {
public AbstractMqAdapterClient(final C connector, final String destination, final boolean flatMessage) {
super(connector, destination);
this.flatMessage = flatMessage;
}
Expand All @@ -101,8 +101,8 @@ public AbstractCanalMqAdapterClient(final C connector, final String destination,
* @param flatMessage
* true / false
*/
public AbstractCanalMqAdapterClient(final C connector, final String destination, final int batchSize,
final boolean flatMessage) {
public AbstractMqAdapterClient(final C connector, final String destination, final int batchSize,
final boolean flatMessage) {
super(connector, destination, batchSize);
this.flatMessage = flatMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* @author Yong.Teng
* @since 0.0.1
*/
public interface CanalAdapterClient {
public interface AdapterClient {

int DEFAULT_BATCH_SIZE = 1;

Expand Down
Loading

0 comments on commit 523064a

Please sign in to comment.