From 33333e990945929c1f9ca52824393520d9361b01 Mon Sep 17 00:00:00 2001 From: songxinjianqwe Date: Fri, 27 Jul 2018 00:41:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=86=E8=8B=A5=E5=B9=B2?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=9A=201=EF=BC=89=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E4=B8=8D=E5=BA=94=E8=AF=A5=E5=9C=A8io=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E4=B8=AD=E6=89=A7=E8=A1=8C=EF=BC=8C=E4=BC=9A=E9=98=BB=E5=A1=9E?= =?UTF-8?q?IO=EF=BC=8Cdubbo=E6=98=AF=E8=AE=BE=E7=BD=AE=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F=E4=B8=BACPU=E6=A0=B8=E6=95=B0=E7=9A=84?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=E6=B1=A0=202=EF=BC=89=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E5=88=9D=E5=A7=8B=E5=8C=96=E6=97=B6=E5=B0=B1=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=9C=8D=E5=8A=A1=E5=99=A8=EF=BC=8C=E8=80=8C=E4=B8=8D?= =?UTF-8?q?=E6=98=AF=E7=AD=89=E8=B0=83=E7=94=A8=E6=96=B9=E6=B3=95=E6=97=B6?= =?UTF-8?q?=E5=86=8D=E8=BF=9E=E6=8E=A5=E3=80=82=E4=BB=A5=E5=90=8E=E5=8F=AF?= =?UTF-8?q?=E4=BB=A5=E6=A0=B9=E6=8D=AEcheck=E6=9D=A5=E5=88=A4=E6=96=AD?= =?UTF-8?q?=E6=98=AF=E5=90=A6=E8=BF=9E=E6=8E=A5=E3=80=82=203=EF=BC=89injvm?= =?UTF-8?q?=E4=B8=8D=E8=80=83=E8=99=91=E6=B3=A8=E5=86=8C=E4=B8=AD=E5=BF=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- .../src/main/resources/application.properties | 4 ++ .../spring/client/ClientApplication.java | 7 +- .../src/main/resources/application.properties | 4 ++ .../src/main/resources/application.properties | 4 ++ .../spring/server/ServerApplication.java | 2 +- .../src/main/resources/application.properties | 6 +- .../toy/cluster/ClusterInvoker.java | 47 +++++++------ .../cluster/support/AbstractLoadBalancer.java | 43 ++++++------ .../sinjinsong/toy/config/ExecutorConfig.java | 7 +- .../com/sinjinsong/toy/config/Executors.java | 26 ++++++++ .../sinjinsong/toy/config/GlobalConfig.java | 66 +++++++++++++++++++ .../sinjinsong/toy/config/ProtocolConfig.java | 5 +- .../ThreadPoolTaskExecutorImpl.java | 4 +- .../protocol/api/support/AbstractInvoker.java | 12 ++-- .../api/support/AbstractProtocol.java | 28 ++------ .../api/support/AbstractRemoteInvoker.java | 16 ++--- .../api/support/AbstractRemoteProtocol.java | 20 +++--- .../toy/protocol/http/HttpInvoker.java | 2 +- .../toy/protocol/http/HttpProtocol.java | 18 ++--- .../toy/protocol/injvm/InJvmInvoker.java | 2 +- .../toy/protocol/injvm/InJvmProtocol.java | 20 +++--- .../toy/protocol/toy/ToyInvoker.java | 2 +- .../toy/protocol/toy/ToyProtocol.java | 18 ++--- .../api/{Endpoint.java => Client.java} | 2 +- .../sinjinsong/toy/transport/api/Server.java | 8 --- .../transport/api/support/AbstractClient.java | 36 ++++++++++ .../api/support/AbstractEndpoint.java | 33 ---------- .../transport/api/support/AbstractServer.java | 41 +++--------- ...Endpoint.java => AbstractNettyClient.java} | 48 +++++++------- .../support/netty/AbstractNettyServer.java | 6 +- .../{HttpEndpoint.java => HttpClient.java} | 10 +-- .../http/client/HttpClientHandler.java | 14 ++-- .../toy/transport/http/server/HttpServer.java | 4 +- .../{ToyEndpoint.java => ToyClient.java} | 12 ++-- .../toy/client/ToyClientHandler.java | 18 ++--- .../toy/transport/toy/server/ToyServer.java | 4 +- .../toy/autoconfig/RPCAutoConfiguration.java | 48 +++++++++----- 38 files changed, 360 insertions(+), 289 deletions(-) create mode 100644 toy-rpc-core/src/main/java/com/sinjinsong/toy/config/Executors.java create mode 100644 toy-rpc-core/src/main/java/com/sinjinsong/toy/config/GlobalConfig.java rename toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/{Endpoint.java => Client.java} (96%) create mode 100644 toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractClient.java delete mode 100644 toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractEndpoint.java rename toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/{AbstractNettyEndpoint.java => AbstractNettyClient.java} (80%) rename toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/{HttpEndpoint.java => HttpClient.java} (81%) rename toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/{ToyEndpoint.java => ToyClient.java} (89%) diff --git a/README.md b/README.md index cc53abc..b65b868 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ ReferenceConfig#get -> RPCProxyFactory#createProxy && ReferenceConfig#refer -> proxy#invoke -> invoker#invoker 1) filter.invoke -2) doCustomProcess -> invokeHandler#invoke -> loadBalancer#select && endpoint#submit +2) doCustomProcess -> invokeHandler#invoke -> loadBalancer#select && client#submit ### Provider diff --git a/sample-spring/sample-spring-client-generic/src/main/resources/application.properties b/sample-spring/sample-spring-client-generic/src/main/resources/application.properties index 089f33e..2ed8402 100644 --- a/sample-spring/sample-spring-client-generic/src/main/resources/application.properties +++ b/sample-spring/sample-spring-client-generic/src/main/resources/application.properties @@ -2,6 +2,10 @@ rpc.application.name=app-4 rpc.application.serialize=jdk rpc.application.proxy=jdk rpc.protocol.type=toy +rpc.protocol.executor.server.threads=100 +rpc.protocol.executor.server.type=threadpool +#rpc.protocol.executor.client.threads=2 +rpc.protocol.executor.client.type=threadpool rpc.registry.address=127.0.0.1:2181 rpc.cluster.loadbalance=consistent_hash rpc.cluster.faulttolerance=failover diff --git a/sample-spring/sample-spring-client/src/main/java/com/sinjinsong/toy/sample/spring/client/ClientApplication.java b/sample-spring/sample-spring-client/src/main/java/com/sinjinsong/toy/sample/spring/client/ClientApplication.java index 1cfd1ce..cdbe94b 100644 --- a/sample-spring/sample-spring-client/src/main/java/com/sinjinsong/toy/sample/spring/client/ClientApplication.java +++ b/sample-spring/sample-spring-client/src/main/java/com/sinjinsong/toy/sample/spring/client/ClientApplication.java @@ -3,7 +3,6 @@ import com.sinjinsong.toy.sample.spring.client.call.AsyncCallService; import com.sinjinsong.toy.sample.spring.client.call.CallbackCallService; import com.sinjinsong.toy.sample.spring.client.call.SyncCallService; -import com.sinjinsong.toy.sample.spring.client.generic.GenericService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; @@ -22,9 +21,7 @@ public class ClientApplication implements CommandLineRunner { private AsyncCallService asyncCallService; @Autowired private CallbackCallService callbackCallService; - @Autowired - private GenericService genericService; - + public static void main(String[] args) throws Exception { SpringApplication app = new SpringApplication(ClientApplication.class); app.setWebEnvironment(false); @@ -35,7 +32,7 @@ public static void main(String[] args) throws Exception { public void run(String... args) throws Exception { // genericService.test(); // syncCallService.testOnceCall(); - syncCallService.concurrentTest(); +// syncCallService.concurrentTest(); // syncCallService.test(); // asyncCallService.testOnceCall(); // asyncCallService.test(); diff --git a/sample-spring/sample-spring-client/src/main/resources/application.properties b/sample-spring/sample-spring-client/src/main/resources/application.properties index c43aa3c..5ced627 100644 --- a/sample-spring/sample-spring-client/src/main/resources/application.properties +++ b/sample-spring/sample-spring-client/src/main/resources/application.properties @@ -2,6 +2,10 @@ rpc.application.name=app-1 rpc.application.serialize=jdk rpc.application.proxy=jdk rpc.protocol.type=toy +rpc.protocol.executor.server.threads=100 +rpc.protocol.executor.server.type=threadpool +#rpc.protocol.executor.client.threads=2 +rpc.protocol.executor.client.type=threadpool rpc.registry.address=127.0.0.1:2181 rpc.cluster.loadbalance=consistent_hash rpc.cluster.faulttolerance=failover diff --git a/sample-spring/sample-spring-injvm/src/main/resources/application.properties b/sample-spring/sample-spring-injvm/src/main/resources/application.properties index ff3eded..5c40ad7 100644 --- a/sample-spring/sample-spring-injvm/src/main/resources/application.properties +++ b/sample-spring/sample-spring-injvm/src/main/resources/application.properties @@ -2,5 +2,9 @@ rpc.application.name=app-3 rpc.application.serialize=protostuff rpc.application.proxy=jdk rpc.protocol.type=injvm +rpc.protocol.executor.server.threads=100 +rpc.protocol.executor.server.type=threadpool +#rpc.protocol.executor.client.threads=2 +rpc.protocol.executor.client.type=threadpool rpc.registry.address=127.0.0.1:2181 rpc.cluster.loadbalance=LEAST_ACTIVE \ No newline at end of file diff --git a/sample-spring/sample-spring-server/src/main/java/com/sinjinsong/toy/sample/spring/server/ServerApplication.java b/sample-spring/sample-spring-server/src/main/java/com/sinjinsong/toy/sample/spring/server/ServerApplication.java index d547cfd..c0c7c6c 100644 --- a/sample-spring/sample-spring-server/src/main/java/com/sinjinsong/toy/sample/spring/server/ServerApplication.java +++ b/sample-spring/sample-spring-server/src/main/java/com/sinjinsong/toy/sample/spring/server/ServerApplication.java @@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch; /** - * @author songx + * @author sinjinsong * @date 2017/7/30 */ @SpringBootApplication diff --git a/sample-spring/sample-spring-server/src/main/resources/application.properties b/sample-spring/sample-spring-server/src/main/resources/application.properties index 7042ef4..f7edfa1 100644 --- a/sample-spring/sample-spring-server/src/main/resources/application.properties +++ b/sample-spring/sample-spring-server/src/main/resources/application.properties @@ -3,7 +3,9 @@ rpc.application.serialize=jdk rpc.application.proxy=jdk rpc.protocol.type=toy #rpc.protocol.port=8000 -rpc.protocol.executor.threads=100 -rpc.protocol.executor.type=threadpool +rpc.protocol.executor.server.threads=100 +rpc.protocol.executor.server.type=threadpool +#rpc.protocol.executor.client.threads=2 +rpc.protocol.executor.client.type=threadpool rpc.registry.address=127.0.0.1:2181 rpc.cluster.loadbalance=LEAST_ACTIVE \ No newline at end of file diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/ClusterInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/ClusterInvoker.java index 511b9c5..d9cbc24 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/ClusterInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/ClusterInvoker.java @@ -8,6 +8,7 @@ import com.sinjinsong.toy.protocol.api.InvokeParam; import com.sinjinsong.toy.protocol.api.Invoker; import com.sinjinsong.toy.protocol.api.support.AbstractRemoteProtocol; +import com.sinjinsong.toy.protocol.injvm.InJvmProtocol; import com.sinjinsong.toy.registry.api.ServiceURL; import com.sinjinsong.toy.transport.api.domain.RPCResponse; import lombok.extern.slf4j.Slf4j; @@ -34,28 +35,27 @@ public class ClusterInvoker implements Invoker { * key是address,value是一个invoker */ private Map> addressInvokers = new ConcurrentHashMap<>(); - private ClusterConfig clusterConfig; - private RegistryConfig registryConfig; - private ProtocolConfig protocolConfig; - private ApplicationConfig applicationConfig; + private GlobalConfig globalConfig; - public ClusterInvoker(Class interfaceClass, String interfaceName, ApplicationConfig applicationConfig, ClusterConfig clusterConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) { + public ClusterInvoker(Class interfaceClass, String interfaceName, GlobalConfig globalConfig) { this.interfaceClass = interfaceClass; this.interfaceName = interfaceName; - this.clusterConfig = clusterConfig; - this.registryConfig = registryConfig; - this.protocolConfig = protocolConfig; - this.applicationConfig = applicationConfig; + this.globalConfig = globalConfig; init(); } + //TODO 这里写的比较僵硬,如果是injvm协议,就完全不考虑注册中心了 private void init() { - this.registryConfig.getRegistryInstance().discover(interfaceName, (newServiceURLs -> { - removeNotExisted(newServiceURLs); - }), (serviceURL -> { - addOrUpdate(serviceURL); - })); + if (globalConfig.getProtocol() instanceof InJvmProtocol) { + addOrUpdate(ServiceURL.DEFAULT_SERVICE_URL); + } else { + globalConfig.getServiceRegistry().discover(interfaceName, (newServiceURLs -> { + removeNotExisted(newServiceURLs); + }), (serviceURL -> { + addOrUpdate(serviceURL); + })); + } } /** @@ -75,17 +75,16 @@ private synchronized void addOrUpdate(ServiceURL serviceURL) { // 我们知道只有远程服务才有可能会更新 // 更新配置与invoker无关,只需要Protocol负责 //TODO refactor this - - if (protocolConfig.getProtocolInstance() instanceof AbstractRemoteProtocol) { - AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) protocolConfig.getProtocolInstance(); - log.info("update config:{},当前interface为:{}", serviceURL,interfaceName); + if (globalConfig.getProtocol() instanceof AbstractRemoteProtocol) { + AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) globalConfig.getProtocol(); + log.info("update config:{},当前interface为:{}", serviceURL, interfaceName); protocol.updateEndpointConfig(serviceURL); } } else { // 添加 // 需要修改 log.info("add invoker:{},serviceURL:{}", interfaceName, serviceURL); - Invoker invoker = protocolConfig.getProtocolInstance().refer(ReferenceConfig.getReferenceConfigByInterfaceName(interfaceName), serviceURL); + Invoker invoker = globalConfig.getProtocol().refer(ReferenceConfig.getReferenceConfigByInterfaceName(interfaceName), serviceURL); // refer拿到的是InvokerDelegate addressInvokers.put(serviceURL.getAddress(), invoker); } @@ -112,9 +111,9 @@ public synchronized void removeNotExisted(List newServiceURLs) { for (Iterator>> it = addressInvokers.entrySet().iterator(); it.hasNext(); ) { Map.Entry> curr = it.next(); if (!newAddressesMap.containsKey(curr.getKey())) { - log.info("remove address:{},当前interface为:{}", curr.getKey(),interfaceName); - if (protocolConfig.getProtocolInstance() instanceof AbstractRemoteProtocol) { - AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) protocolConfig.getProtocolInstance(); + log.info("remove address:{},当前interface为:{}", curr.getKey(), interfaceName); + if (globalConfig.getProtocol() instanceof AbstractRemoteProtocol) { + AbstractRemoteProtocol protocol = (AbstractRemoteProtocol) globalConfig.getProtocol(); protocol.closeEndpoint(curr.getKey()); } it.remove(); @@ -154,7 +153,7 @@ private Invoker doSelect(List availableInvokers, InvokeParam invokePara throw new RPCException(ErrorEnum.NO_SERVER_AVAILABLE, "未找到可用服务器"); } } - invoker = clusterConfig.getLoadBalanceInstance().select(availableInvokers, InvokeParamUtil.extractRequestFromInvokeParam(invokeParam)); + invoker = globalConfig.getLoadBalancer().select(availableInvokers, InvokeParamUtil.extractRequestFromInvokeParam(invokeParam)); if (invoker.isAvailable()) { return invoker; } else { @@ -183,7 +182,7 @@ public RPCResponse invoke(InvokeParam invokeParam) throws RPCException { } catch (RPCException e) { // 重试后OK // 在这里再抛出异常,就没有返回值了 - return clusterConfig.getFaultToleranceHandlerInstance().handle(this, invokeParam, e); + return globalConfig.getFaultToleranceHandler().handle(this, invokeParam, e); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/support/AbstractLoadBalancer.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/support/AbstractLoadBalancer.java index 4cba406..6ff0c3f 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/support/AbstractLoadBalancer.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/cluster/support/AbstractLoadBalancer.java @@ -15,7 +15,7 @@ /** * @author sinjinsong * @date 2018/6/10 - * + *

* invoker是对应一个interface的一个address * endpoint是对应一个address * 多个invoker可能会共享同一个endpoint @@ -23,11 +23,7 @@ */ @Slf4j public abstract class AbstractLoadBalancer implements LoadBalancer { - private RegistryConfig registryConfig; - private ProtocolConfig protocolConfig; - private ApplicationConfig applicationConfig; - private ClusterConfig clusterConfig; - + private GlobalConfig globalConfig; /** * key是接口名,value的key是IP地址,value是Endpoint *

@@ -37,7 +33,6 @@ public abstract class AbstractLoadBalancer implements LoadBalancer { * key : BService, value: 192.168.1.1,Endpoint1 */ private Map interfaceInvokers = new ConcurrentHashMap<>(); - /** * 分配address的形式 * @@ -50,7 +45,7 @@ public Invoker referCluster(ReferenceConfig referenceConfig) { String interfaceName = referenceConfig.getInterfaceName(); ClusterInvoker clusterInvoker; if (!interfaceInvokers.containsKey(interfaceName)) { - clusterInvoker = new ClusterInvoker(referenceConfig.getInterfaceClass(), interfaceName, applicationConfig, clusterConfig, registryConfig, protocolConfig); + clusterInvoker = new ClusterInvoker(referenceConfig.getInterfaceClass(), interfaceName, globalConfig); interfaceInvokers.put(interfaceName, clusterInvoker); return clusterInvoker; } @@ -73,19 +68,23 @@ public Invoker select(List invokers, RPCRequest request) { protected abstract Invoker doSelect(List invokers, RPCRequest request); - public void setRegistryConfig(RegistryConfig registryConfig) { - this.registryConfig = registryConfig; - } - - public void setProtocolConfig(ProtocolConfig protocolConfig) { - this.protocolConfig = protocolConfig; - } - - public void setApplicationConfig(ApplicationConfig applicationConfig) { - this.applicationConfig = applicationConfig; - } - - public void setClusterConfig(ClusterConfig clusterConfig) { - this.clusterConfig = clusterConfig; + + public void updateGlobalConfig(GlobalConfig globalConfig) { + if(this.globalConfig == null) { + this.globalConfig = globalConfig; + }else { + if(globalConfig.getApplicationConfig() != null) { + this.globalConfig.setApplicationConfig(globalConfig.getApplicationConfig()); + } + if(globalConfig.getProtocolConfig() != null) { + this.globalConfig.setProtocolConfig(globalConfig.getProtocolConfig()); + } + if(globalConfig.getRegistryConfig() != null) { + this.globalConfig.setRegistryConfig(globalConfig.getRegistryConfig()); + } + if(globalConfig.getClusterConfig() != null) { + this.globalConfig.setClusterConfig(globalConfig.getClusterConfig()); + } + } } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ExecutorConfig.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ExecutorConfig.java index 3d0f5a0..a7d2dea 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ExecutorConfig.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ExecutorConfig.java @@ -19,11 +19,16 @@ public class ExecutorConfig { private Integer threads; private String type; private TaskExecutor executorInstance; - + public int getThreads() { if (threads != null) { return threads; } return DEFAULT_THREADS; } + + + public void close() { + executorInstance.close(); + } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/Executors.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/Executors.java new file mode 100644 index 0000000..b280254 --- /dev/null +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/Executors.java @@ -0,0 +1,26 @@ +package com.sinjinsong.toy.config; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author sinjinsong + * @date 2018/7/26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Executors { + private ExecutorConfig client; + private ExecutorConfig server; + + public void close() { + if(client != null) { + client.close(); + } + if(server != null) { + server.close(); + } + } +} diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/GlobalConfig.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/GlobalConfig.java new file mode 100644 index 0000000..d077a8a --- /dev/null +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/GlobalConfig.java @@ -0,0 +1,66 @@ +package com.sinjinsong.toy.config; + +import com.sinjinsong.toy.cluster.FaultToleranceHandler; +import com.sinjinsong.toy.cluster.LoadBalancer; +import com.sinjinsong.toy.executor.api.TaskExecutor; +import com.sinjinsong.toy.protocol.api.Protocol; +import com.sinjinsong.toy.proxy.api.RPCProxyFactory; +import com.sinjinsong.toy.registry.api.ServiceRegistry; +import com.sinjinsong.toy.serialize.api.Serializer; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author sinjinsong + * @date 2018/7/26 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +public class GlobalConfig { + private ApplicationConfig applicationConfig; + private ClusterConfig clusterConfig; + private RegistryConfig registryConfig; + private ProtocolConfig protocolConfig; + + public Serializer getSerializer() { + return applicationConfig.getSerializerInstance(); + } + + public RPCProxyFactory getProxyFactory() { + return applicationConfig.getProxyFactoryInstance(); + } + + + public LoadBalancer getLoadBalancer() { + return clusterConfig.getLoadBalanceInstance(); + } + + public FaultToleranceHandler getFaultToleranceHandler() { + return clusterConfig.getFaultToleranceHandlerInstance(); + } + + public ServiceRegistry getServiceRegistry() { + return registryConfig.getRegistryInstance(); + } + + + public Protocol getProtocol() { + return protocolConfig.getProtocolInstance(); + } + + public TaskExecutor getClientExecutor() { + return protocolConfig.getExecutor().getClient().getExecutorInstance(); + } + + public TaskExecutor getServerExecutor() { + return protocolConfig.getExecutor().getServer().getExecutorInstance(); + } + + public int getPort() { + return protocolConfig.getPort(); + } +} diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ProtocolConfig.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ProtocolConfig.java index 8eb0706..4ef638f 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ProtocolConfig.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/config/ProtocolConfig.java @@ -23,8 +23,8 @@ public class ProtocolConfig { private Integer port; private Protocol protocolInstance; - private ExecutorConfig executor; - + private Executors executor; + public int getPort() { if (port != null) { return port; @@ -34,5 +34,6 @@ public int getPort() { public void close() { protocolInstance.close(); + executor.close(); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/executor/threadpool/ThreadPoolTaskExecutorImpl.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/executor/threadpool/ThreadPoolTaskExecutorImpl.java index 8b36c29..2938f94 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/executor/threadpool/ThreadPoolTaskExecutorImpl.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/executor/threadpool/ThreadPoolTaskExecutorImpl.java @@ -19,12 +19,12 @@ public void init(Integer threads) { threads, 0, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(100), + new LinkedBlockingDeque<>(), new ThreadFactory() { private AtomicInteger atomicInteger = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { - return new Thread(r,"biz-" + atomicInteger.getAndIncrement()); + return new Thread(r,"pool-" + atomicInteger.getAndIncrement()); } }, new ThreadPoolExecutor.CallerRunsPolicy() diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java index 2d4e232..ea15b2c 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractInvoker.java @@ -3,7 +3,7 @@ import com.sinjinsong.toy.common.enumeration.ErrorEnum; import com.sinjinsong.toy.common.exception.RPCException; import com.sinjinsong.toy.common.util.InvokeParamUtil; -import com.sinjinsong.toy.config.ProtocolConfig; +import com.sinjinsong.toy.config.GlobalConfig; import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.filter.Filter; import com.sinjinsong.toy.invocation.api.support.AbstractInvocation; @@ -31,7 +31,7 @@ public abstract class AbstractInvoker implements Invoker { private Class interfaceClass; private String interfaceName; - private ProtocolConfig protocolConfig; + private GlobalConfig globalConfig; @Override public RPCResponse invoke(InvokeParam invokeParam) throws RPCException { @@ -168,11 +168,11 @@ public boolean isAvailable() { return true; } - public ProtocolConfig getProtocolConfig() { - return protocolConfig; + protected GlobalConfig getGlobalConfig() { + return globalConfig; } - public void setProtocolConfig(ProtocolConfig protocolConfig) { - this.protocolConfig = protocolConfig; + public void setGlobalConfig(GlobalConfig globalConfig) { + this.globalConfig = globalConfig; } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractProtocol.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractProtocol.java index ce08c1d..e1221b1 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractProtocol.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractProtocol.java @@ -17,35 +17,17 @@ @Slf4j public abstract class AbstractProtocol implements Protocol { private Map> exporters = new ConcurrentHashMap<>(); - private ProtocolConfig protocolConfig; - private ApplicationConfig applicationConfig; - private ClusterConfig clusterConfig; - private RegistryConfig registryConfig; + private GlobalConfig globalConfig; - public void init(ApplicationConfig applicationConfig, ClusterConfig clusterConfig, RegistryConfig registryConfig, ProtocolConfig protocolConfig) { - this.applicationConfig = applicationConfig; - this.protocolConfig = protocolConfig; - this.clusterConfig = clusterConfig; - this.registryConfig = registryConfig; + public void init(GlobalConfig globalConfig) { + this.globalConfig = globalConfig; } - protected ProtocolConfig getProtocolConfig() { - return protocolConfig; + protected GlobalConfig getGlobalConfig() { + return globalConfig; } - protected ApplicationConfig getApplicationConfig() { - return applicationConfig; - } - - protected ClusterConfig getClusterConfig() { - return clusterConfig; - } - - protected RegistryConfig getRegistryConfig() { - return registryConfig; - } - protected void putExporter(Class interfaceClass, Exporter exporter) { this.exporters.put(interfaceClass.getName(), exporter); } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteInvoker.java index 3c4a67b..3e47cc6 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteInvoker.java @@ -1,7 +1,7 @@ package com.sinjinsong.toy.protocol.api.support; import com.sinjinsong.toy.registry.api.ServiceURL; -import com.sinjinsong.toy.transport.api.Endpoint; +import com.sinjinsong.toy.transport.api.Client; import lombok.extern.slf4j.Slf4j; /** @@ -10,27 +10,27 @@ */ @Slf4j public abstract class AbstractRemoteInvoker extends AbstractInvoker { - private Endpoint endpoint; + private Client client; @Override public ServiceURL getServiceURL() { - return getEndpoint().getServiceURL(); + return getClient().getServiceURL(); } /** * 拿到一个invoker * @return */ - protected Endpoint getEndpoint() { - return endpoint; + protected Client getClient() { + return client; } @Override public boolean isAvailable() { - return getEndpoint().isAvailable(); + return getClient().isAvailable(); } - public void setEndpoint(Endpoint endpoint) { - this.endpoint = endpoint; + public void setClient(Client client) { + this.client = client; } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteProtocol.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteProtocol.java index fe5b777..b6d8398 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteProtocol.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/api/support/AbstractRemoteProtocol.java @@ -3,7 +3,7 @@ import com.sinjinsong.toy.common.enumeration.ErrorEnum; import com.sinjinsong.toy.common.exception.RPCException; import com.sinjinsong.toy.registry.api.ServiceURL; -import com.sinjinsong.toy.transport.api.Endpoint; +import com.sinjinsong.toy.transport.api.Client; import com.sinjinsong.toy.transport.api.Server; import lombok.extern.slf4j.Slf4j; @@ -23,7 +23,7 @@ public abstract class AbstractRemoteProtocol extends AbstractProtocol { /** * key是address,value是连接到该address上的Endpoint */ - private Map clients = new ConcurrentHashMap<>(); + private Map clients = new ConcurrentHashMap<>(); private Map locks = new ConcurrentHashMap<>(); private Server server; @@ -34,17 +34,17 @@ public abstract class AbstractRemoteProtocol extends AbstractProtocol { * @param serviceURL * @return */ - public final Endpoint initEndpoint(ServiceURL serviceURL) { + public final Client initEndpoint(ServiceURL serviceURL) { String address = serviceURL.getAddress(); locks.putIfAbsent(address, new Object()); synchronized (locks.get(address)) { if (clients.containsKey(address)) { return clients.get(address); } - Endpoint endpoint = doInitEndpoint(serviceURL); - clients.put(address, endpoint); + Client client = doInitEndpoint(serviceURL); + clients.put(address, client); locks.remove(address); - return endpoint; + return client; } } @@ -66,16 +66,16 @@ public final void updateEndpointConfig(ServiceURL serviceURL) { * @param address */ public final void closeEndpoint(String address) { - Endpoint endpoint = clients.remove(address); - if (endpoint != null) { + Client client = clients.remove(address); + if (client != null) { log.info("首次关闭客户端:{}", address); - endpoint.close(); + client.close(); } else { log.info("重复关闭客户端:{}", address); } } - protected abstract Endpoint doInitEndpoint(ServiceURL serviceURL); + protected abstract Client doInitEndpoint(ServiceURL serviceURL); protected synchronized final void openServer() { if(server == null) { diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpInvoker.java index 3cda7a8..89ccca7 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpInvoker.java @@ -16,6 +16,6 @@ public class HttpInvoker extends AbstractRemoteInvoker { @Override protected Function> getProcessor() { - return rpcRequest -> getEndpoint().submit(rpcRequest); + return rpcRequest -> getClient().submit(rpcRequest); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpProtocol.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpProtocol.java index 4b3ab0e..6a60686 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpProtocol.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/http/HttpProtocol.java @@ -8,9 +8,9 @@ import com.sinjinsong.toy.protocol.api.Invoker; import com.sinjinsong.toy.protocol.api.support.AbstractRemoteProtocol; import com.sinjinsong.toy.registry.api.ServiceURL; -import com.sinjinsong.toy.transport.api.Endpoint; +import com.sinjinsong.toy.transport.api.Client; import com.sinjinsong.toy.transport.api.Server; -import com.sinjinsong.toy.transport.http.client.HttpEndpoint; +import com.sinjinsong.toy.transport.http.client.HttpClient; import com.sinjinsong.toy.transport.http.server.HttpServer; import lombok.extern.slf4j.Slf4j; @@ -35,7 +35,7 @@ public Exporter export(Invoker invoker, ServiceConfig serviceConfig openServer(); // export try { - serviceConfig.getRegistryConfig().getRegistryInstance().register(InetAddress.getLocalHost().getHostAddress() + ":" + getProtocolConfig().getPort(), serviceConfig.getInterfaceName()); + serviceConfig.getRegistryConfig().getRegistryInstance().register(InetAddress.getLocalHost().getHostAddress() + ":" + getGlobalConfig().getPort(), serviceConfig.getInterfaceName()); } catch (UnknownHostException e) { throw new RPCException(e, ErrorEnum.READ_LOCALHOST_ERROR, "获取本地Host失败"); } @@ -47,15 +47,15 @@ public Invoker refer(ReferenceConfig referenceConfig, ServiceURL servi HttpInvoker invoker = new HttpInvoker<>(); invoker.setInterfaceClass(referenceConfig.getInterfaceClass()); invoker.setInterfaceName(referenceConfig.getInterfaceName()); - invoker.setEndpoint(initEndpoint(serviceURL)); - invoker.setProtocolConfig(getProtocolConfig()); + invoker.setClient(initEndpoint(serviceURL)); + invoker.setGlobalConfig(getGlobalConfig()); return invoker.buildFilterChain(referenceConfig.getFilters()); } @Override - protected Endpoint doInitEndpoint(ServiceURL serviceURL) { - HttpEndpoint httpEndpoint = new HttpEndpoint(); - httpEndpoint.init(getApplicationConfig(), serviceURL); + protected Client doInitEndpoint(ServiceURL serviceURL) { + HttpClient httpEndpoint = new HttpClient(); + httpEndpoint.init(getGlobalConfig(), serviceURL); return httpEndpoint; } @@ -63,7 +63,7 @@ protected Endpoint doInitEndpoint(ServiceURL serviceURL) { @Override protected Server doOpenServer() { HttpServer httpServer = new HttpServer(); - httpServer.init(getApplicationConfig(), getClusterConfig(), getRegistryConfig(), getProtocolConfig()); + httpServer.init(getGlobalConfig()); httpServer.run(); return httpServer; } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmInvoker.java index f49e39d..9ad9f0a 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmInvoker.java @@ -15,7 +15,7 @@ public class InJvmInvoker extends AbstractInvoker { @Override public RPCResponse invoke(InvokeParam invokeParam) throws RPCException { - Object serviceBean = getProtocolConfig().getProtocolInstance().referLocalService(invokeParam.getInterfaceName()).getRef(); + Object serviceBean = getGlobalConfig().getProtocol().referLocalService(invokeParam.getInterfaceName()).getRef(); Class serviceClass = serviceBean.getClass(); String methodName = invokeParam.getMethodName(); Class[] parameterTypes = invokeParam.getParameterTypes(); diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmProtocol.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmProtocol.java index acad046..097f6ee 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmProtocol.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/injvm/InJvmProtocol.java @@ -1,6 +1,5 @@ package com.sinjinsong.toy.protocol.injvm; -import com.sinjinsong.toy.common.enumeration.ErrorEnum; import com.sinjinsong.toy.common.exception.RPCException; import com.sinjinsong.toy.config.ReferenceConfig; import com.sinjinsong.toy.config.ServiceConfig; @@ -10,9 +9,6 @@ import com.sinjinsong.toy.registry.api.ServiceURL; import lombok.extern.slf4j.Slf4j; -import java.net.InetAddress; -import java.net.UnknownHostException; - /** * @author sinjinsong * @date 2018/7/18 @@ -27,13 +23,13 @@ public Exporter export(Invoker invoker, ServiceConfig serviceConfig exporter.setServiceConfig(serviceConfig); putExporter(invoker.getInterface(), exporter); // export - // injvm 不需要注册到注册中心 - try { - //TODO refactor this - serviceConfig.getRegistryConfig().getRegistryInstance().register(InetAddress.getLocalHost().getHostAddress() + ":" + getProtocolConfig().getPort(), serviceConfig.getInterfaceName()); - } catch (UnknownHostException e) { - throw new RPCException(e,ErrorEnum.READ_LOCALHOST_ERROR,"读取本地Host失败"); - } +// // injvm 不需要注册到注册中心 +// try { +// //TODO refactor this +// serviceConfig.getRegistryConfig().getRegistryInstance().register(InetAddress.getLocalHost().getHostAddress() + ":" + getGlobalConfig().getPort(), serviceConfig.getInterfaceName()); +// } catch (UnknownHostException e) { +// throw new RPCException(e,ErrorEnum.READ_LOCALHOST_ERROR,"读取本地Host失败"); +// } return exporter; } @@ -42,7 +38,7 @@ public Invoker refer(ReferenceConfig referenceConfig,ServiceURL servic InJvmInvoker invoker = new InJvmInvoker<>(); invoker.setInterfaceClass(referenceConfig.getInterfaceClass()); invoker.setInterfaceName(referenceConfig.getInterfaceName()); - invoker.setProtocolConfig(getProtocolConfig()); + invoker.setGlobalConfig(getGlobalConfig()); return invoker.buildFilterChain(referenceConfig.getFilters()); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyInvoker.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyInvoker.java index cf9d3a7..1e1a56d 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyInvoker.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyInvoker.java @@ -17,7 +17,7 @@ public class ToyInvoker extends AbstractRemoteInvoker { @Override protected Function> getProcessor() { - return rpcRequest -> getEndpoint().submit(rpcRequest); + return rpcRequest -> getClient().submit(rpcRequest); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyProtocol.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyProtocol.java index a6b1aa2..6420af9 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyProtocol.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/protocol/toy/ToyProtocol.java @@ -8,9 +8,9 @@ import com.sinjinsong.toy.protocol.api.Invoker; import com.sinjinsong.toy.protocol.api.support.AbstractRemoteProtocol; import com.sinjinsong.toy.registry.api.ServiceURL; -import com.sinjinsong.toy.transport.api.Endpoint; +import com.sinjinsong.toy.transport.api.Client; import com.sinjinsong.toy.transport.api.Server; -import com.sinjinsong.toy.transport.toy.client.ToyEndpoint; +import com.sinjinsong.toy.transport.toy.client.ToyClient; import com.sinjinsong.toy.transport.toy.server.ToyServer; import lombok.extern.slf4j.Slf4j; @@ -33,7 +33,7 @@ public Exporter export(Invoker invoker, ServiceConfig serviceConfig openServer(); // export try { - serviceConfig.getRegistryConfig().getRegistryInstance().register(InetAddress.getLocalHost().getHostAddress() + ":" + getProtocolConfig().getPort(), serviceConfig.getInterfaceName()); + serviceConfig.getRegistryConfig().getRegistryInstance().register(InetAddress.getLocalHost().getHostAddress() + ":" + getGlobalConfig().getPort(), serviceConfig.getInterfaceName()); } catch (UnknownHostException e) { throw new RPCException(e, ErrorEnum.READ_LOCALHOST_ERROR, "获取本地Host失败"); } @@ -45,22 +45,22 @@ public Invoker refer(ReferenceConfig referenceConfig, ServiceURL servi ToyInvoker invoker = new ToyInvoker<>(); invoker.setInterfaceClass(referenceConfig.getInterfaceClass()); invoker.setInterfaceName(referenceConfig.getInterfaceName()); - invoker.setProtocolConfig(getProtocolConfig()); - invoker.setEndpoint(initEndpoint(serviceURL)); + invoker.setGlobalConfig(getGlobalConfig()); + invoker.setClient(initEndpoint(serviceURL)); return invoker.buildFilterChain(referenceConfig.getFilters()); } @Override - protected Endpoint doInitEndpoint(ServiceURL serviceURL) { - ToyEndpoint toyEndpoint = new ToyEndpoint(); - toyEndpoint.init(getApplicationConfig(), serviceURL); + protected Client doInitEndpoint(ServiceURL serviceURL) { + ToyClient toyEndpoint = new ToyClient(); + toyEndpoint.init(getGlobalConfig(), serviceURL); return toyEndpoint; } @Override protected Server doOpenServer() { ToyServer toyServer = new ToyServer(); - toyServer.init(getApplicationConfig(), getClusterConfig(), getRegistryConfig(), getProtocolConfig()); + toyServer.init(getGlobalConfig()); toyServer.run(); return toyServer; } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Endpoint.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Client.java similarity index 96% rename from toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Endpoint.java rename to toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Client.java index ada7f66..5f41098 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Endpoint.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Client.java @@ -11,7 +11,7 @@ * @author sinjinsong * @date 2018/7/19 */ -public interface Endpoint { +public interface Client { Future submit(RPCRequest request); void close(); diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Server.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Server.java index cff669b..aa6db47 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Server.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/Server.java @@ -16,13 +16,5 @@ public interface Server { void handleRPCRequest(RPCRequest request, ChannelHandlerContext ctx); - RegistryConfig getRegistryConfig(); - - ProtocolConfig getProtocolConfig(); - - ApplicationConfig getApplicationConfig(); - - ClusterConfig getClusterConfig(); - void close(); } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractClient.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractClient.java new file mode 100644 index 0000000..0b3828b --- /dev/null +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractClient.java @@ -0,0 +1,36 @@ +package com.sinjinsong.toy.transport.api.support; + +import com.sinjinsong.toy.config.GlobalConfig; +import com.sinjinsong.toy.registry.api.ServiceURL; +import com.sinjinsong.toy.transport.api.Client; + +/** + * @author sinjinsong + * @date 2018/7/19 + */ +public abstract class AbstractClient implements Client { + private ServiceURL serviceURL; + private GlobalConfig globalConfig; + + public void init(GlobalConfig globalConfig, ServiceURL serviceURL) { + this.serviceURL = serviceURL; + this.globalConfig = globalConfig; + // 初始化的时候建立连接,才能检测到服务器是否可用 + connect(); + } + + protected abstract void connect(); + + protected GlobalConfig getGlobalConfig() { + return globalConfig; + } + + public ServiceURL getServiceURL() { + return serviceURL; + } + + @Override + public void updateServiceConfig(ServiceURL serviceURL) { + this.serviceURL = serviceURL; + } +} diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractEndpoint.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractEndpoint.java deleted file mode 100644 index 99504e1..0000000 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractEndpoint.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.sinjinsong.toy.transport.api.support; - -import com.sinjinsong.toy.config.ApplicationConfig; -import com.sinjinsong.toy.registry.api.ServiceURL; -import com.sinjinsong.toy.transport.api.Endpoint; - -/** - * @author sinjinsong - * @date 2018/7/19 - */ -public abstract class AbstractEndpoint implements Endpoint { - private ServiceURL serviceURL; - private ApplicationConfig applicationConfig; - - //TODO 判断一下需不需要在这里就建立连接 - public void init(ApplicationConfig applicationConfig, ServiceURL serviceURL) { - this.serviceURL = serviceURL; - this.applicationConfig = applicationConfig; - } - - public ApplicationConfig getApplicationConfig() { - return applicationConfig; - } - - public ServiceURL getServiceURL() { - return serviceURL; - } - - @Override - public void updateServiceConfig(ServiceURL serviceURL) { - this.serviceURL = serviceURL; - } -} diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractServer.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractServer.java index 8190b03..a00eda2 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractServer.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/AbstractServer.java @@ -1,9 +1,6 @@ package com.sinjinsong.toy.transport.api.support; -import com.sinjinsong.toy.config.ApplicationConfig; -import com.sinjinsong.toy.config.ClusterConfig; -import com.sinjinsong.toy.config.ProtocolConfig; -import com.sinjinsong.toy.config.RegistryConfig; +import com.sinjinsong.toy.config.*; import com.sinjinsong.toy.transport.api.Server; /** @@ -11,39 +8,17 @@ * @date 2018/7/19 */ public abstract class AbstractServer implements Server { - private RegistryConfig registryConfig; - private ProtocolConfig protocolConfig; - private ApplicationConfig applicationConfig; - private ClusterConfig clusterConfig; + private GlobalConfig globalConfig; - public void init(ApplicationConfig applicationConfig, ClusterConfig clusterConfig, RegistryConfig registry, - ProtocolConfig protocolConfig) { - this.applicationConfig = applicationConfig; - this.clusterConfig = clusterConfig; - this.registryConfig = registry; - this.protocolConfig = protocolConfig; + public void init(GlobalConfig globalConfig) { + this.globalConfig = globalConfig; doInit(); } - protected abstract void doInit(); - - @Override - public RegistryConfig getRegistryConfig() { - return registryConfig; - } - - @Override - public ProtocolConfig getProtocolConfig() { - return protocolConfig; + protected GlobalConfig getGlobalConfig() { + return globalConfig; } - @Override - public ApplicationConfig getApplicationConfig() { - return applicationConfig; - } - - @Override - public ClusterConfig getClusterConfig() { - return clusterConfig; - } + protected abstract void doInit(); + } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyEndpoint.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyClient.java similarity index 80% rename from toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyEndpoint.java rename to toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyClient.java index 5ce6833..c73bb96 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyEndpoint.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyClient.java @@ -10,7 +10,7 @@ import com.sinjinsong.toy.transport.api.domain.Message; import com.sinjinsong.toy.transport.api.domain.RPCRequest; import com.sinjinsong.toy.transport.api.domain.RPCResponse; -import com.sinjinsong.toy.transport.api.support.AbstractEndpoint; +import com.sinjinsong.toy.transport.api.support.AbstractClient; import com.sinjinsong.toy.transport.api.support.RPCTaskRunner; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; @@ -26,7 +26,7 @@ * @date 2018/7/19 */ @Slf4j -public abstract class AbstractNettyEndpoint extends AbstractEndpoint { +public abstract class AbstractNettyClient extends AbstractClient { private Bootstrap bootstrap; private Channel futureChannel; private EventLoopGroup group; @@ -50,11 +50,12 @@ public abstract class AbstractNettyEndpoint extends AbstractEndpoint { @Override public boolean isAvailable() { - return !destroyed; + return initialized && !destroyed; } - - private synchronized void initClient() { - if(initialized) { + + @Override + protected synchronized void connect() { + if (initialized) { return; } this.converter = initConverter(); @@ -64,8 +65,15 @@ private synchronized void initClient() { .handler(initPipeline()) .option(ChannelOption.SO_KEEPALIVE, true); try { - this.futureChannel = connect(); + ChannelFuture future; + String address = getServiceURL().getAddress(); + String host = address.split(":")[0]; + Integer port = Integer.parseInt(address.split(":")[1]); + future = bootstrap.connect(host, port).sync(); + this.futureChannel = future.channel(); + log.info("客户端已连接至 {}", address); log.info("客户端初始化完毕"); + initialized = true; } catch (Exception e) { log.error("与服务器的连接出现故障"); e.printStackTrace(); @@ -73,26 +81,16 @@ private synchronized void initClient() { } } - private Channel connect() throws Exception { - ChannelFuture future; - String address = getServiceURL().getAddress(); - String host = address.split(":")[0]; - Integer port = Integer.parseInt(address.split(":")[1]); - future = bootstrap.connect(host, port).sync(); - log.info("客户端已连接至 {}", address); - return future.channel(); - } - /** * 连接失败或IO时失败均会调此方法处理异常 */ @Override public void handleException(Throwable throwable) { - throwable.printStackTrace(); log.info("连接失败策略为直接关闭,关闭客户端"); + log.error("",throwable); close(); - throw new RPCException(ErrorEnum.CONNECT_TO_SERVER_FAILURE,"连接失败,关闭客户端"); + throw new RPCException(ErrorEnum.CONNECT_TO_SERVER_FAILURE, "连接失败,关闭客户端"); } @Override @@ -101,7 +99,8 @@ public void handleCallbackRequest(RPCRequest request, ChannelHandlerContext ctx) ServiceConfig serviceConfig = RPCThreadSharedContext.getAndRemoveHandler( CallbackInvocation.generateCallbackHandlerKey(request) ); - new RPCTaskRunner(ctx, request, serviceConfig, converter).run(); + getGlobalConfig().getClientExecutor() + .submit(new RPCTaskRunner(ctx, request, serviceConfig, converter)); } @Override @@ -119,18 +118,18 @@ public void handleRPCResponse(RPCResponse response) { @Override public Future submit(RPCRequest request) { if (!initialized) { - initClient(); + connect(); initialized = true; } if (destroyed) { - throw new RPCException(ErrorEnum.SUBMIT_AFTER_ENDPOINT_CLOSED,"当前Endpoint: {} 关闭后仍在提交任务", getServiceURL().getAddress()); + throw new RPCException(ErrorEnum.SUBMIT_AFTER_ENDPOINT_CLOSED, "当前Endpoint: {} 关闭后仍在提交任务", getServiceURL().getAddress()); } - log.info("客户端发起请求: {},请求的服务器为: {}", request,getServiceURL().getAddress()); + log.info("客户端发起请求: {},请求的服务器为: {}", request, getServiceURL().getAddress()); CompletableFuture responseFuture = new CompletableFuture<>(); RPCThreadSharedContext.registerResponseFuture(request.getRequestId(), responseFuture); Object data = converter.convert2Object(Message.buildRequest(request)); this.futureChannel.writeAndFlush(data); - log.info("请求已发送至{}",getServiceURL().getAddress()); + log.info("请求已发送至{}", getServiceURL().getAddress()); return responseFuture; } @@ -154,5 +153,4 @@ public void close() { } } } - } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyServer.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyServer.java index 033db22..af603be 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyServer.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/api/support/netty/AbstractNettyServer.java @@ -80,7 +80,7 @@ public void run() { //绑定端口,开始监听 //注意这里可以绑定多个端口,每个端口都针对某一种类型的数据(控制消息,数据消息) String host = InetAddress.getLocalHost().getHostAddress(); - this.channelFuture = bootstrap.bind(host, getProtocolConfig().getPort()).sync(); + this.channelFuture = bootstrap.bind(host, getGlobalConfig().getPort()).sync(); //应用程序会一直等待,直到channel关闭 log.info("服务器启动,当前服务器类型为:{}",this.getClass().getSimpleName()); } catch (InterruptedException e) { @@ -92,7 +92,7 @@ public void run() { @Override public void close() { - getRegistryConfig().close(); + getGlobalConfig().getRegistryConfig().close(); if(workerGroup != null) { workerGroup.shutdownGracefully(); } @@ -106,7 +106,7 @@ public void close() { @Override public void handleRPCRequest(RPCRequest request, ChannelHandlerContext ctx) { - getProtocolConfig().getExecutor().getExecutorInstance().submit(new RPCTaskRunner(ctx, request, getProtocolConfig().getProtocolInstance().referLocalService(request.getInterfaceName()), serverMessageConverter)); + getGlobalConfig().getServerExecutor().submit(new RPCTaskRunner(ctx, request, getGlobalConfig().getProtocol().referLocalService(request.getInterfaceName()), serverMessageConverter)); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpEndpoint.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClient.java similarity index 81% rename from toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpEndpoint.java rename to toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClient.java index b5f5bd4..0b83478 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpEndpoint.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClient.java @@ -1,6 +1,6 @@ package com.sinjinsong.toy.transport.http.client; -import com.sinjinsong.toy.transport.api.support.netty.AbstractNettyEndpoint; +import com.sinjinsong.toy.transport.api.support.netty.AbstractNettyClient; import com.sinjinsong.toy.transport.http.conveter.HttpClientMessageConverter; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -14,11 +14,11 @@ * @date 2018/7/19 */ @Slf4j -public class HttpEndpoint extends AbstractNettyEndpoint { +public class HttpClient extends AbstractNettyClient { @Override protected ChannelInitializer initPipeline() { - log.info("HttpEndpoint initPipeline..."); + log.info("HttpClient initPipeline..."); return new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception { @@ -28,14 +28,14 @@ public void initChannel(SocketChannel channel) throws Exception { .addLast("HttpRequestEncoder", new HttpRequestEncoder()) .addLast("HttpResponseDecoder", new HttpResponseDecoder()) .addLast("HttpObjectAggregator",new HttpObjectAggregator(10*1024*1024)) - .addLast("HttpClientHandler", new HttpClientHandler(HttpEndpoint.this, HttpClientMessageConverter.getInstance(getApplicationConfig().getSerializerInstance()))); + .addLast("HttpClientHandler", new HttpClientHandler(HttpClient.this, HttpClientMessageConverter.getInstance(getGlobalConfig().getSerializer()))); } }; } @Override protected HttpClientMessageConverter initConverter() { - return HttpClientMessageConverter.getInstance(getApplicationConfig().getSerializerInstance()); + return HttpClientMessageConverter.getInstance(getGlobalConfig().getSerializer()); } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClientHandler.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClientHandler.java index e04f97c..29736d3 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClientHandler.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/client/HttpClientHandler.java @@ -1,6 +1,6 @@ package com.sinjinsong.toy.transport.http.client; -import com.sinjinsong.toy.transport.api.Endpoint; +import com.sinjinsong.toy.transport.api.Client; import com.sinjinsong.toy.transport.api.converter.ClientMessageConverter; import com.sinjinsong.toy.transport.api.domain.Message; import io.netty.channel.ChannelHandler; @@ -17,27 +17,27 @@ @AllArgsConstructor @ChannelHandler.Sharable public class HttpClientHandler extends ChannelInboundHandlerAdapter { - private Endpoint endpoint; + private Client client; private ClientMessageConverter converter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("客户端与服务器{}通道已开启...", endpoint.getServiceURL().getAddress()); + log.info("客户端与服务器{}通道已开启...", client.getServiceURL().getAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = converter.convertResponse2Message(msg); - log.info("接收到服务器 {} 响应: {}", endpoint.getServiceURL().getAddress(), message.getResponse()); + log.info("接收到服务器 {} 响应: {}", client.getServiceURL().getAddress(), message.getResponse()); if (message.getType() == Message.RESPONSE) { - endpoint.handleRPCResponse(message.getResponse()); + client.handleRPCResponse(message.getResponse()); } else if (message.getType() == Message.REQUEST) { - endpoint.handleCallbackRequest(message.getRequest(), ctx); + client.handleCallbackRequest(message.getRequest(), ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - endpoint.handleException(cause); + client.handleException(cause); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/server/HttpServer.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/server/HttpServer.java index 46e2786..512c648 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/server/HttpServer.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/http/server/HttpServer.java @@ -35,13 +35,13 @@ protected void initChannel(SocketChannel ch) throws Exception { .addLast("HttpRequestDecoder", new HttpRequestDecoder()) // 接收请求时,作为一个decoder,将http request转为full http request .addLast("HttpObjectAggregator",new HttpObjectAggregator(10*1024*1024)) - .addLast("HttpServerHandler", new HttpServerHandler(HttpServer.this,HttpServerMessageConverter.getInstance(getApplicationConfig().getSerializerInstance()))); + .addLast("HttpServerHandler", new HttpServerHandler(HttpServer.this,HttpServerMessageConverter.getInstance(getGlobalConfig().getSerializer()))); } }; } @Override protected ServerMessageConverter initConverter() { - return HttpServerMessageConverter.getInstance(getApplicationConfig().getSerializerInstance()); + return HttpServerMessageConverter.getInstance(getGlobalConfig().getSerializer()); } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyEndpoint.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClient.java similarity index 89% rename from toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyEndpoint.java rename to toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClient.java index 354bb51..75996c1 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyEndpoint.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClient.java @@ -2,7 +2,7 @@ import com.sinjinsong.toy.transport.api.constant.FrameConstant; import com.sinjinsong.toy.transport.api.converter.ClientMessageConverter; -import com.sinjinsong.toy.transport.api.support.netty.AbstractNettyEndpoint; +import com.sinjinsong.toy.transport.api.support.netty.AbstractNettyClient; import com.sinjinsong.toy.transport.toy.codec.ToyDecoder; import com.sinjinsong.toy.transport.toy.codec.ToyEncoder; import io.netty.channel.ChannelInitializer; @@ -20,11 +20,11 @@ * 每个服务器的每个接口对应一个Endpoint */ @Slf4j -public class ToyEndpoint extends AbstractNettyEndpoint { +public class ToyClient extends AbstractNettyClient { @Override protected ChannelInitializer initPipeline() { - log.info("ToyEndpoint initPipeline..."); + log.info("ToyClient initPipeline..."); return new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception { @@ -33,13 +33,13 @@ public void initChannel(SocketChannel channel) throws Exception { // ByteBuf -> Message .addLast("LengthFieldPrepender", new LengthFieldPrepender(FrameConstant.LENGTH_FIELD_LENGTH, FrameConstant.LENGTH_ADJUSTMENT)) // Message -> ByteBuf - .addLast("ToyEncoder", new ToyEncoder(getApplicationConfig().getSerializerInstance())) + .addLast("ToyEncoder", new ToyEncoder(getGlobalConfig().getSerializer())) // ByteBuf -> Message .addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(FrameConstant.MAX_FRAME_LENGTH, FrameConstant.LENGTH_FIELD_OFFSET, FrameConstant.LENGTH_FIELD_LENGTH, FrameConstant.LENGTH_ADJUSTMENT, FrameConstant.INITIAL_BYTES_TO_STRIP)) // Message -> Message - .addLast("ToyDecoder", new ToyDecoder(getApplicationConfig().getSerializerInstance())) + .addLast("ToyDecoder", new ToyDecoder(getGlobalConfig().getSerializer())) - .addLast("ToyClientHandler", new ToyClientHandler(ToyEndpoint.this)); + .addLast("ToyClientHandler", new ToyClientHandler(ToyClient.this)); } }; } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClientHandler.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClientHandler.java index 913032d..3a12b55 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClientHandler.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/client/ToyClientHandler.java @@ -1,6 +1,6 @@ package com.sinjinsong.toy.transport.toy.client; -import com.sinjinsong.toy.transport.api.Endpoint; +import com.sinjinsong.toy.transport.api.Client; import com.sinjinsong.toy.transport.api.domain.Message; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -17,19 +17,19 @@ @ChannelHandler.Sharable @AllArgsConstructor public class ToyClientHandler extends SimpleChannelInboundHandler { - private Endpoint endpoint; + private Client client; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("客户端捕获到异常"); cause.printStackTrace(); - log.info("与服务器{} 的连接断开", endpoint.getServiceURL().getAddress()); - endpoint.handleException(cause); + log.info("与服务器{} 的连接断开", client.getServiceURL().getAddress()); + client.handleException(cause); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("客户端与服务器{}通道已开启...", endpoint.getServiceURL().getAddress()); + log.info("客户端与服务器{}通道已开启...", client.getServiceURL().getAddress()); } /** @@ -42,7 +42,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - log.info("超过指定时间未发送数据,客户端主动发送心跳信息至{}", endpoint.getServiceURL().getAddress()); + log.info("超过指定时间未发送数据,客户端主动发送心跳信息至{}", client.getServiceURL().getAddress()); ctx.writeAndFlush(Message.PING_MSG); } else { super.userEventTriggered(ctx, evt); @@ -51,14 +51,14 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc @Override protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception { - log.info("接收到服务器 {} 响应: {}", endpoint.getServiceURL().getAddress(), message); + log.info("接收到服务器 {} 响应: {}", client.getServiceURL().getAddress(), message); //服务器不会PING客户端 if (message.getType() == Message.PONG) { log.info("收到服务器的PONG心跳响应"); } else if (message.getType() == Message.RESPONSE) { - endpoint.handleRPCResponse(message.getResponse()); + client.handleRPCResponse(message.getResponse()); } else if (message.getType() == Message.REQUEST) { - endpoint.handleCallbackRequest(message.getRequest(), ctx); + client.handleCallbackRequest(message.getRequest(), ctx); } } } diff --git a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/server/ToyServer.java b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/server/ToyServer.java index 420f181..f4f0e0d 100644 --- a/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/server/ToyServer.java +++ b/toy-rpc-core/src/main/java/com/sinjinsong/toy/transport/toy/server/ToyServer.java @@ -33,11 +33,11 @@ protected void initChannel(SocketChannel ch) throws Exception { // ByteBuf -> Message .addLast("LengthFieldPrepender", new LengthFieldPrepender(FrameConstant.LENGTH_FIELD_LENGTH, FrameConstant.LENGTH_ADJUSTMENT)) // Message -> ByteBuf - .addLast("ToyEncoder", new ToyEncoder(getApplicationConfig().getSerializerInstance())) + .addLast("ToyEncoder", new ToyEncoder(getGlobalConfig().getSerializer())) // ByteBuf -> Message .addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(FrameConstant.MAX_FRAME_LENGTH, FrameConstant.LENGTH_FIELD_OFFSET, FrameConstant.LENGTH_FIELD_LENGTH, FrameConstant.LENGTH_ADJUSTMENT, FrameConstant.INITIAL_BYTES_TO_STRIP)) // Message -> Message - .addLast("ToyDecoder", new ToyDecoder(getApplicationConfig().getSerializerInstance())) + .addLast("ToyDecoder", new ToyDecoder(getGlobalConfig().getSerializer())) .addLast("ToyServerHandler", new ToyServerHandler(ToyServer.this)); } }; diff --git a/toy-rpc-spring-boot-starter/src/main/java/com/sinjinsong/toy/autoconfig/RPCAutoConfiguration.java b/toy-rpc-spring-boot-starter/src/main/java/com/sinjinsong/toy/autoconfig/RPCAutoConfiguration.java index 1b8a059..4ae64c6 100644 --- a/toy-rpc-spring-boot-starter/src/main/java/com/sinjinsong/toy/autoconfig/RPCAutoConfiguration.java +++ b/toy-rpc-spring-boot-starter/src/main/java/com/sinjinsong/toy/autoconfig/RPCAutoConfiguration.java @@ -40,7 +40,7 @@ public class RPCAutoConfiguration implements InitializingBean, ApplicationContex private ApplicationContext ctx; private ExtensionLoader extensionLoader; - + @Bean(initMethod = "init", destroyMethod = "close") public RegistryConfig registryConfig() { RegistryConfig registryConfig = properties.getRegistry(); @@ -69,38 +69,56 @@ public ApplicationConfig applicationConfig() { return application; } + @Bean(destroyMethod = "close") - public ProtocolConfig protocolConfig(ClusterConfig clusterConfig) { + public ProtocolConfig protocolConfig(ApplicationConfig applicationConfig, RegistryConfig registryConfig, ClusterConfig clusterConfig) { ProtocolConfig protocolConfig = properties.getProtocol(); if (protocolConfig == null) { throw new RPCException(ErrorEnum.APP_CONFIG_FILE_ERROR, "必须配置protocolConfig"); } AbstractProtocol protocol = extensionLoader.load(AbstractProtocol.class, ProtocolType.class, protocolConfig.getType()); - protocol.init(ctx.getBean(ApplicationConfig.class),clusterConfig, ctx.getBean(RegistryConfig.class), protocolConfig); + protocol.init(GlobalConfig.builder() + .applicationConfig(applicationConfig) + .protocolConfig(protocolConfig) + .clusterConfig(clusterConfig) + .registryConfig(registryConfig) + .build() + ); protocolConfig.setProtocolInstance(protocol); - ((AbstractLoadBalancer)clusterConfig.getLoadBalanceInstance()).setProtocolConfig(protocolConfig); - ExecutorConfig executorConfig = protocolConfig.getExecutor(); - if (executorConfig != null) { - TaskExecutor executor = extensionLoader.load(TaskExecutor.class, ExecutorType.class, executorConfig.getType()); - executor.init(executorConfig.getThreads()); - executorConfig.setExecutorInstance(executor); + ((AbstractLoadBalancer) clusterConfig.getLoadBalanceInstance()).updateGlobalConfig(GlobalConfig.builder().protocolConfig(protocolConfig).build()); + Executors executors = protocolConfig.getExecutor(); + if (executors != null) { + ExecutorConfig serverExecutor = executors.getServer(); + if (serverExecutor != null) { + TaskExecutor executor = extensionLoader.load(TaskExecutor.class, ExecutorType.class, serverExecutor.getType()); + executor.init(serverExecutor.getThreads()); + serverExecutor.setExecutorInstance(executor); + } + ExecutorConfig clientExecutor = executors.getClient(); + if (clientExecutor != null) { + TaskExecutor executor = extensionLoader.load(TaskExecutor.class, ExecutorType.class, clientExecutor.getType()); + executor.init(clientExecutor.getThreads()); + clientExecutor.setExecutorInstance(executor); + } } log.info("{}", protocolConfig); return protocolConfig; } @Bean - public ClusterConfig clusterconfig(RegistryConfig registryConfig,ApplicationConfig applicationConfig) { + public ClusterConfig clusterconfig(RegistryConfig registryConfig, ApplicationConfig applicationConfig) { ClusterConfig clusterConfig = properties.getCluster(); if (clusterConfig == null) { throw new RPCException(ErrorEnum.APP_CONFIG_FILE_ERROR, "必须配置clusterConfig"); } AbstractLoadBalancer loadBalancer = extensionLoader.load(AbstractLoadBalancer.class, LoadBalanceType.class, clusterConfig.getLoadbalance()); - loadBalancer.setRegistryConfig(registryConfig); -// loadBalancer.setProtocolConfig(protocolConfig); - loadBalancer.setClusterConfig(clusterConfig); - loadBalancer.setApplicationConfig(applicationConfig); + loadBalancer.updateGlobalConfig(GlobalConfig.builder() + .applicationConfig(applicationConfig) + .registryConfig(registryConfig) + .clusterConfig(clusterConfig) + .build()); + if (clusterConfig.getFaulttolerance() != null) { clusterConfig.setFaultToleranceHandlerInstance(extensionLoader.load(FaultToleranceHandler.class, FaultToleranceType.class, clusterConfig.getFaulttolerance())); } else { @@ -109,7 +127,7 @@ public ClusterConfig clusterconfig(RegistryConfig registryConfig,ApplicationConf } clusterConfig.setLoadBalanceInstance(loadBalancer); log.info("{}", clusterConfig); - + // 注册Filter if (loadBalancer instanceof LeastActiveLoadBalancer) { extensionLoader.register(Filter.class, "activeLimit", new ActiveLimitFilter());