Skip to content

Commit

Permalink
sessionNode renew add weight
Browse files Browse the repository at this point in the history
  • Loading branch information
huanglongchao committed Mar 12, 2024
1 parent 1e02cc2 commit 9c603a9
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ public class SessionNode extends AbstractNode {

private final ProcessId processId;

// session weight for client conn load balance
private int weight;

/**
* constructor
*
* @param nodeUrl nodeUrl
* @param regionId regionId
* @param processId processId
*/
public SessionNode(URL nodeUrl, String regionId, ProcessId processId) {
public SessionNode(URL nodeUrl, String regionId, ProcessId processId, int weight) {
super(null, nodeUrl, regionId);
this.processId = processId;
this.weight = weight;
}

@Override
Expand All @@ -61,6 +65,10 @@ public ProcessId getProcessId() {
return processId;
}

public int getWeight() {
return weight;
}

/**
* Hash code int.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public void testDataNode() {
public void testSessionNode() {
ProcessId processId1 = new ProcessId("test", 1, 2, 3);
ProcessId processId2 = new ProcessId("test1", 1, 2, 3);
SessionNode node1 = new SessionNode(url1, region, processId1);
SessionNode node2 = new SessionNode(url2, region, processId2);
SessionNode node3 = new SessionNode(url1, region, processId2);
SessionNode node1 = new SessionNode(url1, region, processId1, 0);
SessionNode node2 = new SessionNode(url2, region, processId2, 0);
SessionNode node3 = new SessionNode(url1, region, processId2, 0);
Assert.assertEquals(node1, node3);
Assert.assertEquals(node1.hashCode(), node3.hashCode());
Assert.assertEquals(node1.toString(), node3.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public void test() {
ProcessId processId1 = new ProcessId("test", 1, 2, 3);
final String dataId = "testDataId";
DataCenterNodes request = new DataCenterNodes(Node.NodeType.CLIENT, 10, dataId);
SessionNode sessionNode = new SessionNode(new URL("192.168.1.1", 8888), "testZone", processId1);
SessionNode sessionNode =
new SessionNode(new URL("192.168.1.1", 8888), "testZone", processId1, 0);
request.setNodes(Collections.singletonMap("testKey", sessionNode));
Assert.assertEquals(request.getDataCenterId(), dataId);
Assert.assertEquals(request.getVersion(), 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public boolean renew(SessionNode renewal, int duration) {
notifyObservers(new NodeModified<>(lease.getRenewal(), renewal));
return false;
} else {
// replace the session node, as it has changed weight already
if(renewal.getProcessId() != null
&& lease != null
&& lease.getRenewal() != null
&& renewal.getWeight() != lease.getRenewal().getWeight()){
lease.setRenewal(renewal);
}
return super.renew(renewal, duration);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void afterDefaultSessionManagerTest() throws Exception {
public void testGetEpoch() throws TimeoutException, InterruptedException {
Assert.assertEquals(0, sessionManager.getEpoch());
sessionManager.renew(
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID), 1000);
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0), 1000);
waitConditionUntilTimeOut(() -> sessionManager.getEpoch() > 0, 100);
Assert.assertNotEquals(0, sessionManager.getEpoch());
}
Expand All @@ -82,7 +82,8 @@ public void testRenew() throws Exception {
String ip = randomIp();
long timestamp = System.currentTimeMillis();
SessionNode sessionNode =
new SessionNode(randomURL(ip), getDc(), new ProcessId(ip, timestamp, 1, random.nextInt()));
new SessionNode(
randomURL(ip), getDc(), new ProcessId(ip, timestamp, 1, random.nextInt()), 0);
NotifyObserversCounter counter = new NotifyObserversCounter();
sessionManager.addObserver(counter);

Expand All @@ -100,7 +101,8 @@ public void testRenew() throws Exception {
new SessionNode(
sessionNode.getNodeUrl(),
getDc(),
new ProcessId(sessionNode.getIp(), timestamp, 2, random.nextInt()));
new ProcessId(sessionNode.getIp(), timestamp, 2, random.nextInt()),
0);
Assert.assertFalse(sessionManager.renew(sessionNode2, 1));
Assert.assertEquals(2, counter.getCounter());
}
Expand All @@ -109,7 +111,7 @@ public void testRenew() throws Exception {
public void testSessionServerManagerRefreshEpochOnlyOnceWhenNewRegistered()
throws TimeoutException, InterruptedException {
makeMetaLeader();
SessionNode node = new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID);
SessionNode node = new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0);
sessionManager.renew(node, 1000);
Assert.assertEquals(1, sessionManager.getSessionServerMetaInfo().getClusterMembers().size());
}
Expand All @@ -130,7 +132,7 @@ public void testCancel() {
protected List<SessionNode> randomSessionNodes(int num) {
List<SessionNode> result = Lists.newArrayList();
for (int i = 0; i < num; i++) {
result.add(new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID));
result.add(new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0));
}
return result;
}
Expand All @@ -141,7 +143,8 @@ public void testOnHeartbeat() {
for (SessionNode sessionNode : sessionNodes) {
sessionManager.renew(sessionNode, 1000);
}
SessionNode sessionNode = new SessionNode(new URL(randomIp()), getDc(), ServerEnv.PROCESS_ID);
SessionNode sessionNode =
new SessionNode(new URL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0);
sessionManager.renew(sessionNode, 1000);
SlotTable slotTable = randomSlotTable();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public void testGetSessionServers() {
new VersionedList<>(
DatumVersionUtil.nextId(),
Lists.newArrayList(
new SessionNode(randomURL(), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(), getDc(), ServerEnv.PROCESS_ID))));
new SessionNode(randomURL(), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(), getDc(), ServerEnv.PROCESS_ID, 0))));
Assert.assertEquals(
2,
metaServer.getSessionServerManager().getSessionServerMetaInfo().getClusterMembers().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
new VersionedList<>(
DatumVersionUtil.nextId(),
Lists.newArrayList(
new SessionNode(randomURL(ip1), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(ip2), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID))));
new SessionNode(randomURL(ip1), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(ip2), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0))));
notifier.notifyProvideDataChange(
new ProvideDataChangeEvent(ValueConstants.BLACK_LIST_DATA_ID, System.currentTimeMillis()));
Thread.sleep(100);
Expand All @@ -114,9 +114,9 @@ public void testBoltResponsePositive() throws InterruptedException, RequestExcep
new VersionedList<>(
DatumVersionUtil.nextId(),
Lists.newArrayList(
new SessionNode(randomURL(ip1), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(ip2), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID))));
new SessionNode(randomURL(ip1), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(ip2), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0))));
Client client2 = spy(getRpcClient(scheduled, 10, "Response"));
SessionNodeExchanger otherNodeExchanger = mock(SessionNodeExchanger.class);
when(otherNodeExchanger.request(any(Request.class)))
Expand Down Expand Up @@ -154,9 +154,9 @@ public void testBroadcastInvoke() throws Exception {
new VersionedList<>(
DatumVersionUtil.nextId(),
Lists.newArrayList(
new SessionNode(randomURL(ip1), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(ip2), getDc(), ServerEnv.PROCESS_ID),
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID))));
new SessionNode(randomURL(ip1), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(ip2), getDc(), ServerEnv.PROCESS_ID, 0),
new SessionNode(randomURL(randomIp()), getDc(), ServerEnv.PROCESS_ID, 0))));

SessionNodeExchanger otherNodeExchanger = mock(SessionNodeExchanger.class);
when(otherNodeExchanger.request(any(Request.class))).thenReturn(Object::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.alipay.sofa.registry.common.model.slot.SlotConfig;
import com.alipay.sofa.registry.common.model.slot.SlotTable;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCache;
import com.alipay.sofa.registry.server.session.remoting.DataNodeExchanger;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class MetaServerServiceImpl extends AbstractMetaServerService<BaseHeartBe

@Autowired private DataNodeNotifyExchanger dataNodeNotifyExchanger;

@Autowired private Exchange boltExchange;

@Override
protected long getCurrentSlotTableEpoch() {
return slotTableCache.getEpoch(sessionServerConfig.getSessionServerDataCenter());
Expand Down Expand Up @@ -104,7 +108,18 @@ protected String cell() {

private Node createNode() {
return new SessionNode(
new URL(ServerEnv.IP), sessionServerConfig.getSessionServerRegion(), ServerEnv.PROCESS_ID);
new URL(ServerEnv.IP),
sessionServerConfig.getSessionServerRegion(),
ServerEnv.PROCESS_ID,
getWeight());
}

private int getWeight(){
Server server = boltExchange.getServer(sessionServerConfig.getServerPort());
if(null == server) {
return 0;
}
return server.getChannels().size();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package com.alipay.sofa.registry.server.session.resource;

import com.alipay.sofa.registry.common.model.metaserver.nodes.SessionNode;
import com.alipay.sofa.registry.common.model.slot.Slot;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
import com.alipay.sofa.registry.server.session.slot.SlotTableCache;
import com.alipay.sofa.registry.server.shared.meta.MetaServerService;
import com.alipay.sofa.registry.util.ParaCheckUtil;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.ws.rs.*;
Expand Down Expand Up @@ -66,6 +68,20 @@ public String getSessionServerList(@QueryParam("zone") String zone) {
return Joiner.on(";").join(getSessionServerListJson(zone));
}

@GET
@Path("queryWithConnNum")
@Produces(MediaType.TEXT_PLAIN)
public String getSessionServerListWithConnNum(@QueryParam("zone") String zone) {
if (StringUtils.isBlank(zone)) {
zone = sessionServerConfig.getSessionServerRegion();
}

if (StringUtils.isNotBlank(zone)) {
zone = zone.toUpperCase();
}
return Joiner.on(";").join(getSessionServersWithConnNum(zone));
}

@GET
@Path("connectionNum")
@Produces(MediaType.TEXT_PLAIN)
Expand Down Expand Up @@ -101,6 +117,21 @@ private List<String> getSessionServers(String zone) {
return serverList;
}

private List<String> getSessionServersWithConnNum(String zone) {
List<SessionNode> serverList = metaNodeService.getSessionNodeWithConnNumList(zone);
List<String> serverWithConnNumList = new ArrayList<>();
serverList.forEach(
item -> {
serverWithConnNumList.add(
item.getNodeUrl().getIpAddress()
+ ":"
+ sessionServerConfig.getServerPort()
+ ":"
+ item.getWeight());
});
return serverWithConnNumList;
}

@GET
@Path("slot")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ public List<String> getSessionServerList(String zonename) {
return serverList;
}

public List<SessionNode> getSessionNodeWithConnNumList(String zonename) {
List<SessionNode> serverList = new ArrayList<>();
for (SessionNode sessionNode : getSessionNodes().values()) {
if (StringUtils.isBlank(zonename) || zonename.equals(sessionNode.getRegionId())) {
URL url = sessionNode.getNodeUrl();
if (url != null) {
serverList.add(sessionNode);
}
}
}
return serverList;
}

@Override
public long getSessionServerEpoch() {
return state.sessionServerEpoch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alipay.sofa.registry.common.model.metaserver.FetchSystemPropertyResult;
import com.alipay.sofa.registry.common.model.metaserver.ProvideData;
import com.alipay.sofa.registry.common.model.metaserver.SlotTableChangeEvent;
import com.alipay.sofa.registry.common.model.metaserver.nodes.SessionNode;
import com.alipay.sofa.registry.common.model.slot.SlotTableStatusResponse;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -72,6 +73,8 @@ public interface MetaServerService {
*/
List<String> getSessionServerList(String zonename);

List<SessionNode> getSessionNodeWithConnNumList(String zonename);

Set<ProcessId> getSessionProcessIds();
/**
* Gets get data server list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ public void testHandleHeartbeatResp() {
new VersionedList(
1,
Lists.newArrayList(
new SessionNode(new URL("192.168.1.2"), "zoneA", ServerEnv.PROCESS_ID),
new SessionNode(new URL("192.168.1.2"), "zoneA", ServerEnv.PROCESS_ID, 0),
new SessionNode(
new URL("192.168.1.3"), "zoneB", new ProcessId("test", 1, 1, 1)))),
new URL("192.168.1.3"), "zoneB", new ProcessId("test", 1, 1, 1), 0))),
"test",
100,
Collections.emptyMap());
Expand Down Expand Up @@ -235,9 +235,9 @@ public void testRenewNode() {
new VersionedList(
1,
Lists.newArrayList(
new SessionNode(new URL("192.168.1.2"), "zoneA", ServerEnv.PROCESS_ID),
new SessionNode(new URL("192.168.1.2"), "zoneA", ServerEnv.PROCESS_ID, 0),
new SessionNode(
new URL("192.168.1.3"), "zoneB", new ProcessId("test", 1, 1, 1)))),
new URL("192.168.1.3"), "zoneB", new ProcessId("test", 1, 1, 1), 0))),
"test",
100,
Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class NodeUtilsTest {
@Test
public void test() {
Assert.assertTrue(NodeUtils.transferNodeToIpList(Collections.EMPTY_LIST).isEmpty());
SessionNode node1 = new SessionNode(new URL("xx", 12), "test", null);
SessionNode node2 = new SessionNode(new URL("xyz", 34), "test", null);
SessionNode node1 = new SessionNode(new URL("xx", 12), "test", null, 0);
SessionNode node2 = new SessionNode(new URL("xyz", 34), "test", null, 0);
List<String> list = NodeUtils.transferNodeToIpList(Lists.newArrayList(node1, node2));
Assert.assertEquals(list.get(0), "xx");
Assert.assertEquals(list.get(1), "xyz");
Expand Down

0 comments on commit 9c603a9

Please sign in to comment.