Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): enhance transaction in graph server #2686

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hugegraph.auth.HugePermission;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.core.GraphManager;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.type.define.GraphReadMode;
import org.apache.hugegraph.util.E;
Expand Down Expand Up @@ -66,6 +68,8 @@ public class GraphsAPI extends API {
private static final String CONFIRM_CLEAR = "I'm sure to delete all data";
private static final String CONFIRM_DROP = "I'm sure to drop the graph";

private KvClient<WatchResponse> client;

@GET
@Timed
@Produces(APPLICATION_JSON_WITH_CHARSET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.variables.CheckList;
import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.auth.AuthManager;
Expand All @@ -53,6 +54,10 @@
import org.apache.hugegraph.masterelection.StandardRoleListener;
import org.apache.hugegraph.metrics.MetricsUtil;
import org.apache.hugegraph.metrics.ServerReporter;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.client.PDConfig;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.rpc.RpcClientProvider;
import org.apache.hugegraph.rpc.RpcConsumerConfig;
import org.apache.hugegraph.rpc.RpcProviderConfig;
Expand All @@ -66,12 +71,14 @@
import org.apache.hugegraph.util.ConfigUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.junit.Before;
import org.slf4j.Logger;

import com.alipay.sofa.rpc.config.ServerConfig;
Expand All @@ -94,6 +101,15 @@ public final class GraphManager {
private final HugeConfig conf;
private final EventHub eventHub;

private final String preFix = "graph_creat_tx";

private KvClient<WatchResponse> client;

@Before
public void setUp() {
this.client = new KvClient<>(PDConfig.of("localhost:8686"));
}

public GraphManager(HugeConfig conf, EventHub hub) {
this.graphsDir = conf.get(ServerOptions.GRAPHS);
this.graphs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -172,6 +188,8 @@ public HugeGraph cloneGraph(String name, String newName, String configText) {
}

public HugeGraph createGraph(String name, String configText) {


E.checkArgument(this.conf.get(ServerOptions.ENABLE_DYNAMIC_CREATE_DROP),
"Not allowed to create graph '%s' dynamically, " +
"please set `enable_dynamic_create_drop` to true.",
Expand All @@ -181,9 +199,34 @@ public HugeGraph createGraph(String name, String configText) {
E.checkArgument(!this.graphs().contains(name),
"The graph name '%s' has existed", name);

CheckList checkList = new CheckList(name, configText);


PropertiesConfiguration propConfig = ConfigUtil.buildConfig(configText);
HugeConfig config = new HugeConfig(propConfig);
this.checkOptions(config);

checkList.setConfig(config);
checkList.setStage("config");

String json = JsonUtil.toJson(checkList);

try {
client.put(preFix + name, json);
}

catch (PDException e) {
throw new RuntimeException(e);
}

checkList.setStage("finish");
try {
client.put(preFix + name, json);
}

catch (PDException e) {
throw new RuntimeException(e);
}

return this.createGraph(config, name);
}
Expand Down Expand Up @@ -581,7 +624,7 @@ private void notifyAndWaitEvent(String event, HugeGraph graph) {
}
}

private HugeGraph createGraph(HugeConfig config, String name) {
private HugeGraph createGraph(HugeConfig config, String name){
HugeGraph graph = null;
try {
// Create graph instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
import org.apache.hugegraph.masterelection.RoleElectionStateMachine;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.rpc.RpcServiceConfig4Client;
import org.apache.hugegraph.rpc.RpcServiceConfig4Server;
import org.apache.hugegraph.schema.EdgeLabel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
import org.apache.hugegraph.masterelection.StandardClusterRoleStore;
import org.apache.hugegraph.masterelection.StandardRoleElectionStateMachine;
import org.apache.hugegraph.meta.MetaManager;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.KResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.rpc.RpcServiceConfig4Client;
import org.apache.hugegraph.rpc.RpcServiceConfig4Server;
Expand All @@ -97,8 +101,10 @@
import org.apache.hugegraph.util.DateUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.LockUtil;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.variables.CheckList;
import org.apache.hugegraph.variables.HugeVariables;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.structure.Edge;
Expand Down Expand Up @@ -180,6 +186,8 @@ public class StandardHugeGraph implements HugeGraph {

private final String schedulerType;

private KvClient<WatchResponse> client;

public StandardHugeGraph(HugeConfig config) {
this.params = new StandardHugeGraphParams();
this.configuration = config;
Expand Down Expand Up @@ -1003,14 +1011,32 @@ public synchronized void close() throws Exception {
}

@Override
public void create(String configPath, GlobalMasterInfo nodeInfo) {
this.initBackend();
this.serverStarted(nodeInfo);
public void create(String configPath, GlobalMasterInfo nodeInfo){
//CheckList checkList = new CheckList();
KResponse result = null;
try {
result = client.get(this.name);
String json = result.getValue();
CheckList checkList = JsonUtil.fromJson(json, CheckList.class);

this.initBackend();
checkList.setInitBackended(true);
checkList.setStage("initBackend");
client.put(name, JsonUtil.toJson(checkList));
this.serverStarted(nodeInfo);
checkList.setServerStarted(true);
checkList.setStage("setServerStarted");
client.put(name, JsonUtil.toJson(checkList));


// Write config to disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
} catch (Exception e) {

}

// Write config to disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.apache.hugegraph.variables;

import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;

import java.io.Serializable;

public class CheckList implements Serializable {

private String name;
private String configText;

private HugeConfig config;

private boolean initBackended;

private boolean serverStarted;

private String stage;

private String configPath;

private GlobalMasterInfo nodeInfo;

boolean toCheck;
String context;
private boolean isBuild;

public void setBuild(boolean build) {
isBuild = build;
}

public CheckList(String name, String context) {
this.name = name;
this.context = context;
}

public HugeConfig getConfig() {
return config;
}

public void setConfig(HugeConfig config) {
this.config = config;
}

public boolean isInitBackended() {
return initBackended;
}

public void setInitBackended(boolean initBackended) {
this.initBackended = initBackended;
}

public boolean isServerStarted() {
return serverStarted;
}

public void setServerStarted(boolean serverStarted) {
this.serverStarted = serverStarted;
}

public String getStage() {
return stage;
}

public void setStage(String stage) {
this.stage = stage;
}

public String getConfigPath() {
return configPath;
}

public void setConfigPath(String configPath) {
this.configPath = configPath;
}

public GlobalMasterInfo getNodeInfo() {
return nodeInfo;
}

public void setNodeInfo(GlobalMasterInfo nodeInfo) {
this.nodeInfo = nodeInfo;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.apache.hugegraph;

import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.core.GraphManager;
import org.apache.hugegraph.masterelection.GlobalMasterInfo;
import org.apache.hugegraph.pd.client.KvClient;
import org.apache.hugegraph.pd.common.PDException;
import org.apache.hugegraph.pd.grpc.kv.ScanPrefixResponse;
import org.apache.hugegraph.pd.grpc.kv.WatchResponse;
import org.apache.hugegraph.util.ConfigUtil;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.variables.CheckList;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;



public class TxScanner {
private final String prefix = "graph_creat_tx";

private KvClient<WatchResponse> client;

public TxScanner(KvClient<WatchResponse> client) {
}


public void scan() {
try {
ScanPrefixResponse response = this.client.scanPrefix(prefix);
for(String key : response.getKvsMap().keySet()) {
String value = response.getKvsMap().get(key);
CheckList checkList = JsonUtil.fromJson(value, CheckList.class);
switch (checkList.getStage()) {
case "config": {
configContinue(checkList);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments that we don't expect break here and explain why

case "initBackend" : {
HugeConfig config = checkList.getConfig();
HugeGraph graph = (HugeGraph) GraphFactory.open(config);
GlobalMasterInfo globalMasterInfo = checkList.getNodeInfo();
graph.serverStarted(globalMasterInfo);
// Write config to disk file
String confPath = ConfigUtil.writeToFile(checkList.getConfigPath(), graph.name(),
(HugeConfig)graph.configuration());
}
case "setServerStarted" : {
HugeConfig config = checkList.getConfig();
HugeGraph graph = (HugeGraph) GraphFactory.open(config);
String confPath = ConfigUtil.writeToFile(checkList.getConfigPath(), graph.name(),
(HugeConfig)graph.configuration());
}
case "finish" : {
client.delete(prefix + checkList.getName());
}
}
}
} catch (PDException e) {
throw new RuntimeException(e);
}

}

private void configContinue(CheckList checkList) {
HugeConfig config = checkList.getConfig();
HugeGraph graph = (HugeGraph) GraphFactory.open(config);
try {
// Create graph instance
graph = (HugeGraph) GraphFactory.open(config);
String configPath = checkList.getConfigPath();
GlobalMasterInfo globalMasterInfo = checkList.getNodeInfo();
// Init graph and start it
graph.create(configPath, globalMasterInfo);
} catch (Throwable e) {
throw e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need to catch if we just re-throw here?

}

}

}
Loading