diff --git a/example/src/test/java/io/mycat/sql/UserCaseTest.java b/example/src/test/java/io/mycat/sql/UserCaseTest.java index 120a96308..aaecd026a 100644 --- a/example/src/test/java/io/mycat/sql/UserCaseTest.java +++ b/example/src/test/java/io/mycat/sql/UserCaseTest.java @@ -249,7 +249,7 @@ public void case4() throws Exception { System.out.println(explain1); executeQuery(mycatConnection,sql1); - Assert.assertTrue(explain1.contains("MycatView(distribution=[[cloud.log]], conditions=[=($3, CAST(?0):TIMESTAMP(0) NOT NULL)])")); + Assert.assertTrue(explain1.contains("MycatView(distribution=[[cloud.log]]")); String sql2 = "SELECT log.id ,user.id,service.`id` FROM (SELECT log.`id` ,log.`service_id`,log.`submit_time`,log.`user_id` FROM LOG WHERE log.submit_time = '2021-5-31' ORDER BY log.submit_time DESC LIMIT 0,20) AS `log` INNER JOIN `user` ON log.user_id = user.id INNER JOIN `service` ON service.id = log.service_id ORDER BY log.submit_time DESC LIMIT 0,20;"; @@ -258,6 +258,10 @@ public void case4() throws Exception { executeQuery(mycatConnection,sql2); Assert.assertTrue(explain2.contains("WHERE (`submit_time` = ?) ORDER BY (`submit_time` IS NULL) DESC, `submit_time` DESC LIMIT 20 OFFSET 0)")); + //test transaction + mycatConnection.setAutoCommit(false); + executeQuery(mycatConnection,sql2); + mycatConnection.setAutoCommit(true); } } } diff --git a/hbt/src/main/java/io/mycat/AsyncMycatDataContextImpl.java b/hbt/src/main/java/io/mycat/AsyncMycatDataContextImpl.java index 5ea733910..4a4f8e068 100644 --- a/hbt/src/main/java/io/mycat/AsyncMycatDataContextImpl.java +++ b/hbt/src/main/java/io/mycat/AsyncMycatDataContextImpl.java @@ -1,6 +1,8 @@ package io.mycat; +import cn.mycat.vertx.xa.MySQLManager; import cn.mycat.vertx.xa.XaSqlConnection; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import hu.akarnokd.rxjava3.operators.Flowables; @@ -36,7 +38,8 @@ import java.util.stream.Collectors; public abstract class AsyncMycatDataContextImpl extends NewMycatDataContextImpl { - final Map> usedConnnectionMap = new HashMap<>(); + final Map> transactionConnnectionMap = new HashMap<>();// int transaction + final List> connnectionFutureCollection = new LinkedList<>();//not int transaction final Map>> shareObservable = new HashMap<>(); public AsyncMycatDataContextImpl(MycatDataContext dataContext, @@ -45,44 +48,64 @@ public AsyncMycatDataContextImpl(MycatDataContext dataContext, super(dataContext, context, params); } + Future getConnection(String key) { + XaSqlConnection transactionSession = (XaSqlConnection) context.getTransactionSession(); + if (context.isInTransaction()) { + return transactionConnnectionMap + .computeIfAbsent(key, s -> transactionSession.getConnection(key)); + } + MySQLManager mySQLManager = MetaClusterCurrent.wrapper(MySQLManager.class); + Future connection = mySQLManager.getConnection(key); + connnectionFutureCollection.add(connection); + return connection; + } + + void recycleConnection(String key, Future connectionFuture) { + XaSqlConnection transactionSession = (XaSqlConnection) context.getTransactionSession(); + if (context.isInTransaction()) { + transactionConnnectionMap.put(key, connectionFuture); + return; + } + connectionFuture = connectionFuture.flatMap(c -> c.close().mapEmpty()); + transactionSession.addCloseFuture(connectionFuture.mapEmpty()); + connnectionFutureCollection.add(connectionFuture); + } + @NotNull - public List> getObservables(ImmutableMultimap expand, MycatRowMetaData calciteRowMetaData) { + public synchronized List> getObservables(ImmutableMultimap expand, MycatRowMetaData calciteRowMetaData) { LinkedList> observables = new LinkedList<>(); - XaSqlConnection transactionSession = (XaSqlConnection) context.getTransactionSession(); for (Map.Entry entry : expand.entries()) { String key = context.resolveDatasourceTargetName(entry.getKey()); SqlString sqlString = entry.getValue(); Observable observable = Observable.create(emitter -> { - synchronized (usedConnnectionMap) { - Future sessionConnection = usedConnnectionMap - .computeIfAbsent(key, s -> transactionSession.getConnection(key)); - PromiseInternal promise = VertxUtil.newPromise(); - Observable innerObservable = Objects.requireNonNull(VertxExecuter.runQuery(sessionConnection, - sqlString.getSql(), - MycatPreparedStatementUtil.extractParams(params, sqlString.getDynamicParameters()), calciteRowMetaData)); - innerObservable.subscribe(objects -> { - emitter.onNext((objects)); - }, - throwable -> { - sessionConnection.onSuccess(c -> { - promise.fail(throwable); + Future sessionConnection = getConnection(key); + PromiseInternal promise = VertxUtil.newPromise(); + Observable innerObservable = Objects.requireNonNull(VertxExecuter.runQuery(sessionConnection, + sqlString.getSql(), + MycatPreparedStatementUtil.extractParams(params, sqlString.getDynamicParameters()), calciteRowMetaData)); + innerObservable.subscribe(objects -> { + emitter.onNext((objects)); + }, + throwable -> { + sessionConnection.onSuccess(c -> { + //close connection? + promise.fail(throwable); + }) + .onFailure(t -> promise.fail(t)); + }, () -> { + sessionConnection.onSuccess(c -> { + promise.tryComplete(c); + }).onFailure(t -> promise.fail(t)); + ; + }); + recycleConnection(key, + promise.future() + .onSuccess(c -> { + emitter.onComplete(); }) - .onFailure(t -> promise.fail(t)); - }, () -> { - sessionConnection.onSuccess(c -> { - promise.tryComplete(c); - }).onFailure(t -> promise.fail(t)); - ; - }); - usedConnnectionMap.put(key, - promise.future() - .onSuccess(c -> { - emitter.onComplete(); - }) - .onFailure(t -> { - emitter.onError(t); - })); - } + .onFailure(t -> { + emitter.onError(t); + })); }); observables.add(observable); } @@ -90,7 +113,9 @@ public List> getObservables(ImmutableMultimap(usedConnnectionMap.values())); + return CompositeFuture.all((List) ImmutableList.builder() + .addAll(transactionConnnectionMap.values()) + .addAll(connnectionFutureCollection).build()); } public abstract List> getObservableList(String node); @@ -119,7 +144,7 @@ public Observable getObservable(String node, Function1 function1, Comp public static final class SqlMycatDataContextImpl extends AsyncMycatDataContextImpl { private DrdsSqlWithParams drdsSqlWithParams; - private ConcurrentMap>> cache = new ConcurrentHashMap<>(); + private ConcurrentMap>> cache = new ConcurrentHashMap<>(); public SqlMycatDataContextImpl(MycatDataContext dataContext, CodeExecuterContext context, DrdsSqlWithParams drdsSqlWithParams) { @@ -146,9 +171,9 @@ public List> getObservableList(String node) { public Optional>> getPartition(String node) { MycatRelDatasourceSourceInfo mycatRelDatasourceSourceInfo = this.codeExecuterContext.getRelContext().get(node); - if (mycatRelDatasourceSourceInfo==null)return Optional.empty(); + if (mycatRelDatasourceSourceInfo == null) return Optional.empty(); MycatView view = mycatRelDatasourceSourceInfo.getRelNode(); - return Optional.ofNullable(cache.computeIfAbsent(node, s -> getSqlMap(codeExecuterContext, view, drdsSqlWithParams, drdsSqlWithParams.getHintDataNodeFilter()))); + return Optional.ofNullable(cache.computeIfAbsent(node, s -> getSqlMap(codeExecuterContext, view, drdsSqlWithParams, drdsSqlWithParams.getHintDataNodeFilter()))); } diff --git a/hbt/src/main/java/io/mycat/calcite/logical/MycatView.java b/hbt/src/main/java/io/mycat/calcite/logical/MycatView.java index 810918a18..d8b664f93 100644 --- a/hbt/src/main/java/io/mycat/calcite/logical/MycatView.java +++ b/hbt/src/main/java/io/mycat/calcite/logical/MycatView.java @@ -221,6 +221,9 @@ public RelWriter explainTerms(RelWriter pw) { if (condition != null) { writer.item("conditions", condition); } + if (isMergeSort()) { + writer.item("mergeSort", isMergeSort()); + } return writer; }