Skip to content

Commit

Permalink
修复了若干问题:
Browse files Browse the repository at this point in the history
1)回调不应该在io线程中执行,会阻塞IO,dubbo是设置一个大小为CPU核数的线程池
2)客户端初始化时就连接服务器,而不是等调用方法时再连接。以后可以根据check来判断是否连接。
3)injvm不考虑注册中心
  • Loading branch information
songxinjianqwe committed Jul 26, 2018
1 parent 561aaac commit 33333e9
Show file tree
Hide file tree
Showing 38 changed files with 360 additions and 289 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ ReferenceConfig#get -> RPCProxyFactory#createProxy && ReferenceConfig#refer ->

proxy#invoke -> invoker#invoker
1) filter.invoke
2) doCustomProcess -> invokeHandler#invoke -> loadBalancer#select && endpoint#submit
2) doCustomProcess -> invokeHandler#invoke -> loadBalancer#select && client#submit

### Provider

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ rpc.application.name=app-4
rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
rpc.protocol.executor.server.threads=100
rpc.protocol.executor.server.type=threadpool
#rpc.protocol.executor.client.threads=2
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=consistent_hash
rpc.cluster.faulttolerance=failover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.sinjinsong.toy.sample.spring.client.call.AsyncCallService;
import com.sinjinsong.toy.sample.spring.client.call.CallbackCallService;
import com.sinjinsong.toy.sample.spring.client.call.SyncCallService;
import com.sinjinsong.toy.sample.spring.client.generic.GenericService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
Expand All @@ -22,9 +21,7 @@ public class ClientApplication implements CommandLineRunner {
private AsyncCallService asyncCallService;
@Autowired
private CallbackCallService callbackCallService;
@Autowired
private GenericService genericService;


public static void main(String[] args) throws Exception {
SpringApplication app = new SpringApplication(ClientApplication.class);
app.setWebEnvironment(false);
Expand All @@ -35,7 +32,7 @@ public static void main(String[] args) throws Exception {
public void run(String... args) throws Exception {
// genericService.test();
// syncCallService.testOnceCall();
syncCallService.concurrentTest();
// syncCallService.concurrentTest();
// syncCallService.test();
// asyncCallService.testOnceCall();
// asyncCallService.test();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ rpc.application.name=app-1
rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
rpc.protocol.executor.server.threads=100
rpc.protocol.executor.server.type=threadpool
#rpc.protocol.executor.client.threads=2
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=consistent_hash
rpc.cluster.faulttolerance=failover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@ rpc.application.name=app-3
rpc.application.serialize=protostuff
rpc.application.proxy=jdk
rpc.protocol.type=injvm
rpc.protocol.executor.server.threads=100
rpc.protocol.executor.server.type=threadpool
#rpc.protocol.executor.client.threads=2
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=LEAST_ACTIVE
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.concurrent.CountDownLatch;

/**
* @author songx
* @author sinjinsong
* @date 2017/7/30
*/
@SpringBootApplication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ rpc.application.serialize=jdk
rpc.application.proxy=jdk
rpc.protocol.type=toy
#rpc.protocol.port=8000
rpc.protocol.executor.threads=100
rpc.protocol.executor.type=threadpool
rpc.protocol.executor.server.threads=100
rpc.protocol.executor.server.type=threadpool
#rpc.protocol.executor.client.threads=2
rpc.protocol.executor.client.type=threadpool
rpc.registry.address=127.0.0.1:2181
rpc.cluster.loadbalance=LEAST_ACTIVE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.sinjinsong.toy.protocol.api.InvokeParam;
import com.sinjinsong.toy.protocol.api.Invoker;
import com.sinjinsong.toy.protocol.api.support.AbstractRemoteProtocol;
import com.sinjinsong.toy.protocol.injvm.InJvmProtocol;
import com.sinjinsong.toy.registry.api.ServiceURL;
import com.sinjinsong.toy.transport.api.domain.RPCResponse;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -34,28 +35,27 @@ public class ClusterInvoker<T> implements Invoker<T> {
* key是address,value是一个invoker
*/
private Map<String, Invoker<T>> addressInvokers = new ConcurrentHashMap<>();
private ClusterConfig clusterConfig;
private RegistryConfig registryConfig;
private ProtocolConfig protocolConfig;
private ApplicationConfig applicationConfig;
private GlobalConfig globalConfig;


public ClusterInvoker(Class<T> interfaceClass, String interfaceName, ApplicationConfig applicationConfig, ClusterConfig clusterConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) {
public ClusterInvoker(Class<T> interfaceClass, String interfaceName, GlobalConfig globalConfig) {
this.interfaceClass = interfaceClass;
this.interfaceName = interfaceName;
this.clusterConfig = clusterConfig;
this.registryConfig = registryConfig;
this.protocolConfig = protocolConfig;
this.applicationConfig = applicationConfig;
this.globalConfig = globalConfig;
init();
}

//TODO 这里写的比较僵硬,如果是injvm协议,就完全不考虑注册中心了
private void init() {
this.registryConfig.getRegistryInstance().discover(interfaceName, (newServiceURLs -> {
removeNotExisted(newServiceURLs);
}), (serviceURL -> {
addOrUpdate(serviceURL);
}));
if (globalConfig.getProtocol() instanceof InJvmProtocol) {
addOrUpdate(ServiceURL.DEFAULT_SERVICE_URL);
} else {
globalConfig.getServiceRegistry().discover(interfaceName, (newServiceURLs -> {
removeNotExisted(newServiceURLs);
}), (serviceURL -> {
addOrUpdate(serviceURL);
}));
}
}

/**
Expand All @@ -75,17 +75,16 @@ private synchronized void addOrUpdate(ServiceURL serviceURL) {
// 我们知道只有远程服务才有可能会更新
// 更新配置与invoker无关,只需要Protocol负责
//TODO refactor this

if (protocolConfig.getProtocolInstance() instanceof AbstractRemoteProtocol) {
AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) protocolConfig.getProtocolInstance();
log.info("update config:{},当前interface为:{}", serviceURL,interfaceName);
if (globalConfig.getProtocol() instanceof AbstractRemoteProtocol) {
AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) globalConfig.getProtocol();
log.info("update config:{},当前interface为:{}", serviceURL, interfaceName);
protocol.updateEndpointConfig(serviceURL);
}
} else {
// 添加
// 需要修改
log.info("add invoker:{},serviceURL:{}", interfaceName, serviceURL);
Invoker invoker = protocolConfig.getProtocolInstance().refer(ReferenceConfig.getReferenceConfigByInterfaceName(interfaceName), serviceURL);
Invoker invoker = globalConfig.getProtocol().refer(ReferenceConfig.getReferenceConfigByInterfaceName(interfaceName), serviceURL);
// refer拿到的是InvokerDelegate
addressInvokers.put(serviceURL.getAddress(), invoker);
}
Expand All @@ -112,9 +111,9 @@ public synchronized void removeNotExisted(List<ServiceURL> newServiceURLs) {
for (Iterator<Map.Entry<String, Invoker<T>>> it = addressInvokers.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Invoker<T>> curr = it.next();
if (!newAddressesMap.containsKey(curr.getKey())) {
log.info("remove address:{},当前interface为:{}", curr.getKey(),interfaceName);
if (protocolConfig.getProtocolInstance() instanceof AbstractRemoteProtocol) {
AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) protocolConfig.getProtocolInstance();
log.info("remove address:{},当前interface为:{}", curr.getKey(), interfaceName);
if (globalConfig.getProtocol() instanceof AbstractRemoteProtocol) {
AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) globalConfig.getProtocol();
protocol.closeEndpoint(curr.getKey());
}
it.remove();
Expand Down Expand Up @@ -154,7 +153,7 @@ private Invoker doSelect(List<Invoker> availableInvokers, InvokeParam invokePara
throw new RPCException(ErrorEnum.NO_SERVER_AVAILABLE, "未找到可用服务器");
}
}
invoker = clusterConfig.getLoadBalanceInstance().select(availableInvokers, InvokeParamUtil.extractRequestFromInvokeParam(invokeParam));
invoker = globalConfig.getLoadBalancer().select(availableInvokers, InvokeParamUtil.extractRequestFromInvokeParam(invokeParam));
if (invoker.isAvailable()) {
return invoker;
} else {
Expand Down Expand Up @@ -183,7 +182,7 @@ public RPCResponse invoke(InvokeParam invokeParam) throws RPCException {
} catch (RPCException e) {
// 重试后OK
// 在这里再抛出异常,就没有返回值了
return clusterConfig.getFaultToleranceHandlerInstance().handle(this, invokeParam, e);
return globalConfig.getFaultToleranceHandler().handle(this, invokeParam, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@
/**
* @author sinjinsong
* @date 2018/6/10
*
* <p>
* invoker是对应一个interface的一个address
* endpoint是对应一个address
* 多个invoker可能会共享同一个endpoint
* Endpoint由Protocol管理
*/
@Slf4j
public abstract class AbstractLoadBalancer implements LoadBalancer {
private RegistryConfig registryConfig;
private ProtocolConfig protocolConfig;
private ApplicationConfig applicationConfig;
private ClusterConfig clusterConfig;

private GlobalConfig globalConfig;
/**
* key是接口名,value的key是IP地址,value是Endpoint
* <p>
Expand All @@ -37,7 +33,6 @@ public abstract class AbstractLoadBalancer implements LoadBalancer {
* key : BService, value: 192.168.1.1,Endpoint1
*/
private Map<String, ClusterInvoker> interfaceInvokers = new ConcurrentHashMap<>();

/**
* 分配address的形式
*
Expand All @@ -50,7 +45,7 @@ public <T> Invoker<T> referCluster(ReferenceConfig<T> referenceConfig) {
String interfaceName = referenceConfig.getInterfaceName();
ClusterInvoker clusterInvoker;
if (!interfaceInvokers.containsKey(interfaceName)) {
clusterInvoker = new ClusterInvoker(referenceConfig.getInterfaceClass(), interfaceName, applicationConfig, clusterConfig, registryConfig, protocolConfig);
clusterInvoker = new ClusterInvoker(referenceConfig.getInterfaceClass(), interfaceName, globalConfig);
interfaceInvokers.put(interfaceName, clusterInvoker);
return clusterInvoker;
}
Expand All @@ -73,19 +68,23 @@ public Invoker select(List<Invoker> invokers, RPCRequest request) {

protected abstract Invoker doSelect(List<Invoker> invokers, RPCRequest request);

public void setRegistryConfig(RegistryConfig registryConfig) {
this.registryConfig = registryConfig;
}

public void setProtocolConfig(ProtocolConfig protocolConfig) {
this.protocolConfig = protocolConfig;
}

public void setApplicationConfig(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
}

public void setClusterConfig(ClusterConfig clusterConfig) {
this.clusterConfig = clusterConfig;

public void updateGlobalConfig(GlobalConfig globalConfig) {
if(this.globalConfig == null) {
this.globalConfig = globalConfig;
}else {
if(globalConfig.getApplicationConfig() != null) {
this.globalConfig.setApplicationConfig(globalConfig.getApplicationConfig());
}
if(globalConfig.getProtocolConfig() != null) {
this.globalConfig.setProtocolConfig(globalConfig.getProtocolConfig());
}
if(globalConfig.getRegistryConfig() != null) {
this.globalConfig.setRegistryConfig(globalConfig.getRegistryConfig());
}
if(globalConfig.getClusterConfig() != null) {
this.globalConfig.setClusterConfig(globalConfig.getClusterConfig());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ public class ExecutorConfig {
private Integer threads;
private String type;
private TaskExecutor executorInstance;

public int getThreads() {
if (threads != null) {
return threads;
}
return DEFAULT_THREADS;
}


public void close() {
executorInstance.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.sinjinsong.toy.config;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author sinjinsong
* @date 2018/7/26
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Executors {
private ExecutorConfig client;
private ExecutorConfig server;

public void close() {
if(client != null) {
client.close();
}
if(server != null) {
server.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.sinjinsong.toy.config;

import com.sinjinsong.toy.cluster.FaultToleranceHandler;
import com.sinjinsong.toy.cluster.LoadBalancer;
import com.sinjinsong.toy.executor.api.TaskExecutor;
import com.sinjinsong.toy.protocol.api.Protocol;
import com.sinjinsong.toy.proxy.api.RPCProxyFactory;
import com.sinjinsong.toy.registry.api.ServiceRegistry;
import com.sinjinsong.toy.serialize.api.Serializer;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author sinjinsong
* @date 2018/7/26
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class GlobalConfig {
private ApplicationConfig applicationConfig;
private ClusterConfig clusterConfig;
private RegistryConfig registryConfig;
private ProtocolConfig protocolConfig;

public Serializer getSerializer() {
return applicationConfig.getSerializerInstance();
}

public RPCProxyFactory getProxyFactory() {
return applicationConfig.getProxyFactoryInstance();
}


public LoadBalancer getLoadBalancer() {
return clusterConfig.getLoadBalanceInstance();
}

public FaultToleranceHandler getFaultToleranceHandler() {
return clusterConfig.getFaultToleranceHandlerInstance();
}

public ServiceRegistry getServiceRegistry() {
return registryConfig.getRegistryInstance();
}


public Protocol getProtocol() {
return protocolConfig.getProtocolInstance();
}

public TaskExecutor getClientExecutor() {
return protocolConfig.getExecutor().getClient().getExecutorInstance();
}

public TaskExecutor getServerExecutor() {
return protocolConfig.getExecutor().getServer().getExecutorInstance();
}

public int getPort() {
return protocolConfig.getPort();
}
}
Loading

0 comments on commit 33333e9

Please sign in to comment.