Skip to content

Commit

Permalink
resupport same target have multi connection in transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
junwen12221 committed Jun 18, 2021
1 parent 1d4c9e1 commit 98e0020
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 37 deletions.
6 changes: 5 additions & 1 deletion example/src/test/java/io/mycat/sql/UserCaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void case4() throws Exception {
System.out.println(explain1);
executeQuery(mycatConnection,sql1);

Assert.assertTrue(explain1.contains("MycatView(distribution=[[cloud.log]], conditions=[=($3, CAST(?0):TIMESTAMP(0) NOT NULL)])"));
Assert.assertTrue(explain1.contains("MycatView(distribution=[[cloud.log]]"));

String sql2 = "SELECT log.id ,user.id,service.`id` FROM (SELECT log.`id` ,log.`service_id`,log.`submit_time`,log.`user_id` FROM LOG WHERE log.submit_time = '2021-5-31' ORDER BY log.submit_time DESC LIMIT 0,20) AS `log` INNER JOIN `user` ON log.user_id = user.id INNER JOIN `service` ON service.id = log.service_id ORDER BY log.submit_time DESC LIMIT 0,20;";

Expand All @@ -258,6 +258,10 @@ public void case4() throws Exception {
executeQuery(mycatConnection,sql2);
Assert.assertTrue(explain2.contains("WHERE (`submit_time` = ?) ORDER BY (`submit_time` IS NULL) DESC, `submit_time` DESC LIMIT 20 OFFSET 0)"));

//test transaction
mycatConnection.setAutoCommit(false);
executeQuery(mycatConnection,sql2);
mycatConnection.setAutoCommit(true);
}
}
}
97 changes: 61 additions & 36 deletions hbt/src/main/java/io/mycat/AsyncMycatDataContextImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.mycat;

import cn.mycat.vertx.xa.MySQLManager;
import cn.mycat.vertx.xa.XaSqlConnection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import hu.akarnokd.rxjava3.operators.Flowables;
Expand Down Expand Up @@ -36,7 +38,8 @@
import java.util.stream.Collectors;

public abstract class AsyncMycatDataContextImpl extends NewMycatDataContextImpl {
final Map<String, Future<SqlConnection>> usedConnnectionMap = new HashMap<>();
final Map<String, Future<SqlConnection>> transactionConnnectionMap = new HashMap<>();// int transaction
final List<Future<SqlConnection>> connnectionFutureCollection = new LinkedList<>();//not int transaction
final Map<String, List<Observable<Object[]>>> shareObservable = new HashMap<>();

public AsyncMycatDataContextImpl(MycatDataContext dataContext,
Expand All @@ -45,52 +48,74 @@ public AsyncMycatDataContextImpl(MycatDataContext dataContext,
super(dataContext, context, params);
}

Future<SqlConnection> getConnection(String key) {
XaSqlConnection transactionSession = (XaSqlConnection) context.getTransactionSession();
if (context.isInTransaction()) {
return transactionConnnectionMap
.computeIfAbsent(key, s -> transactionSession.getConnection(key));
}
MySQLManager mySQLManager = MetaClusterCurrent.wrapper(MySQLManager.class);
Future<SqlConnection> connection = mySQLManager.getConnection(key);
connnectionFutureCollection.add(connection);
return connection;
}

void recycleConnection(String key, Future<SqlConnection> connectionFuture) {
XaSqlConnection transactionSession = (XaSqlConnection) context.getTransactionSession();
if (context.isInTransaction()) {
transactionConnnectionMap.put(key, connectionFuture);
return;
}
connectionFuture = connectionFuture.flatMap(c -> c.close().mapEmpty());
transactionSession.addCloseFuture(connectionFuture.mapEmpty());
connnectionFutureCollection.add(connectionFuture);
}

@NotNull
public List<Observable<Object[]>> getObservables(ImmutableMultimap<String, SqlString> expand, MycatRowMetaData calciteRowMetaData) {
public synchronized List<Observable<Object[]>> getObservables(ImmutableMultimap<String, SqlString> expand, MycatRowMetaData calciteRowMetaData) {
LinkedList<Observable<Object[]>> observables = new LinkedList<>();
XaSqlConnection transactionSession = (XaSqlConnection) context.getTransactionSession();
for (Map.Entry<String, SqlString> entry : expand.entries()) {
String key = context.resolveDatasourceTargetName(entry.getKey());
SqlString sqlString = entry.getValue();
Observable<Object[]> observable = Observable.create(emitter -> {
synchronized (usedConnnectionMap) {
Future<SqlConnection> sessionConnection = usedConnnectionMap
.computeIfAbsent(key, s -> transactionSession.getConnection(key));
PromiseInternal<SqlConnection> promise = VertxUtil.newPromise();
Observable<Object[]> innerObservable = Objects.requireNonNull(VertxExecuter.runQuery(sessionConnection,
sqlString.getSql(),
MycatPreparedStatementUtil.extractParams(params, sqlString.getDynamicParameters()), calciteRowMetaData));
innerObservable.subscribe(objects -> {
emitter.onNext((objects));
},
throwable -> {
sessionConnection.onSuccess(c -> {
promise.fail(throwable);
Future<SqlConnection> sessionConnection = getConnection(key);
PromiseInternal<SqlConnection> promise = VertxUtil.newPromise();
Observable<Object[]> innerObservable = Objects.requireNonNull(VertxExecuter.runQuery(sessionConnection,
sqlString.getSql(),
MycatPreparedStatementUtil.extractParams(params, sqlString.getDynamicParameters()), calciteRowMetaData));
innerObservable.subscribe(objects -> {
emitter.onNext((objects));
},
throwable -> {
sessionConnection.onSuccess(c -> {
//close connection?
promise.fail(throwable);
})
.onFailure(t -> promise.fail(t));
}, () -> {
sessionConnection.onSuccess(c -> {
promise.tryComplete(c);
}).onFailure(t -> promise.fail(t));
;
});
recycleConnection(key,
promise.future()
.onSuccess(c -> {
emitter.onComplete();
})
.onFailure(t -> promise.fail(t));
}, () -> {
sessionConnection.onSuccess(c -> {
promise.tryComplete(c);
}).onFailure(t -> promise.fail(t));
;
});
usedConnnectionMap.put(key,
promise.future()
.onSuccess(c -> {
emitter.onComplete();
})
.onFailure(t -> {
emitter.onError(t);
}));
}
.onFailure(t -> {
emitter.onError(t);
}));
});
observables.add(observable);
}
return observables;
}

public CompositeFuture endFuture() {
return CompositeFuture.all(new ArrayList<>(usedConnnectionMap.values()));
return CompositeFuture.all((List) ImmutableList.builder()
.addAll(transactionConnnectionMap.values())
.addAll(connnectionFutureCollection).build());
}

public abstract List<Observable<Object[]>> getObservableList(String node);
Expand Down Expand Up @@ -119,7 +144,7 @@ public Observable<Object[]> getObservable(String node, Function1 function1, Comp
public static final class SqlMycatDataContextImpl extends AsyncMycatDataContextImpl {

private DrdsSqlWithParams drdsSqlWithParams;
private ConcurrentMap<String,List<Map<String, Partition>>> cache = new ConcurrentHashMap<>();
private ConcurrentMap<String, List<Map<String, Partition>>> cache = new ConcurrentHashMap<>();


public SqlMycatDataContextImpl(MycatDataContext dataContext, CodeExecuterContext context, DrdsSqlWithParams drdsSqlWithParams) {
Expand All @@ -146,9 +171,9 @@ public List<Observable<Object[]>> getObservableList(String node) {

public Optional<List<Map<String, Partition>>> getPartition(String node) {
MycatRelDatasourceSourceInfo mycatRelDatasourceSourceInfo = this.codeExecuterContext.getRelContext().get(node);
if (mycatRelDatasourceSourceInfo==null)return Optional.empty();
if (mycatRelDatasourceSourceInfo == null) return Optional.empty();
MycatView view = mycatRelDatasourceSourceInfo.getRelNode();
return Optional.ofNullable(cache.computeIfAbsent(node, s -> getSqlMap(codeExecuterContext, view, drdsSqlWithParams, drdsSqlWithParams.getHintDataNodeFilter())));
return Optional.ofNullable(cache.computeIfAbsent(node, s -> getSqlMap(codeExecuterContext, view, drdsSqlWithParams, drdsSqlWithParams.getHintDataNodeFilter())));
}


Expand Down
3 changes: 3 additions & 0 deletions hbt/src/main/java/io/mycat/calcite/logical/MycatView.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ public RelWriter explainTerms(RelWriter pw) {
if (condition != null) {
writer.item("conditions", condition);
}
if (isMergeSort()) {
writer.item("mergeSort", isMergeSort());
}
return writer;
}

Expand Down

0 comments on commit 98e0020

Please sign in to comment.