From e5f3231af558978016897f17fec47552709bc83a Mon Sep 17 00:00:00 2001 From: wangweicugw Date: Thu, 7 Mar 2024 11:03:55 +0800 Subject: [PATCH] fix getAllCells when initial --- .../discovery/TopologyWatcherManager.java | 95 +++------------ .../com/jd/jdbc/evalengine/EvalEngine.java | 12 +- .../jd/jdbc/monitor/TopoServerCollector.java | 42 +++++++ src/main/java/com/jd/jdbc/topo/Topo.java | 10 +- .../java/com/jd/jdbc/topo/TopoServer.java | 102 +++++++++++++--- .../com/jd/jdbc/util/ScheduledManager.java | 58 +++++++++ .../java/com/jd/jdbc/vitess/VitessDriver.java | 13 +-- .../jd/jdbc/discovery/EtcdTopoServerTest.java | 9 +- .../jd/jdbc/discovery/HealthCheckTest.java | 17 +-- .../com/jd/jdbc/topo/MemoryTopoFactory.java | 3 + .../com/jd/jdbc/topo/MemoryTopoServer.java | 9 +- .../topo/etcd2topo/Etcd2TopoServerTest.java | 7 +- .../jd/jdbc/topo/etcd2topo/ServerTest.java | 7 +- .../jd/jdbc/vitess/KeyspaceNotExistTest.java | 7 +- .../java/com/jd/jdbc/vitess/TypeTest.java | 110 +++++++++++++++++- .../jdbc/vitess/VitessJdbcUrlParserTest.java | 10 +- 16 files changed, 366 insertions(+), 145 deletions(-) create mode 100644 src/main/java/com/jd/jdbc/monitor/TopoServerCollector.java create mode 100644 src/main/java/com/jd/jdbc/util/ScheduledManager.java diff --git a/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java b/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java index 6501578..f70f115 100644 --- a/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java +++ b/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java @@ -18,20 +18,11 @@ package com.jd.jdbc.discovery; -import com.jd.jdbc.common.util.CollectionUtils; import com.jd.jdbc.context.IContext; -import com.jd.jdbc.sqlparser.support.logging.Log; -import com.jd.jdbc.sqlparser.support.logging.LogFactory; -import com.jd.jdbc.topo.TopoException; import com.jd.jdbc.topo.TopoServer; -import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -43,36 +34,30 @@ public enum TopologyWatcherManager { private Map cellTopologyWatcherMap = null; - private Map> globalKeyspacesMap = null; - private final Lock lock = new ReentrantLock(); - private static final Log LOGGER = LogFactory.getLog(TopologyWatcherManager.class); - - private ScheduledThreadPoolExecutor scheduledExecutor; - TopologyWatcherManager() { cellTopologyWatcherMap = new ConcurrentHashMap<>(16); - globalKeyspacesMap = new ConcurrentHashMap<>(16); - - scheduledExecutor = new ScheduledThreadPoolExecutor(1, new VtThreadFactoryBuilder.DefaultThreadFactory("reload-cell-schedule", true)); - scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - scheduledExecutor.setRemoveOnCancelPolicy(true); } - public void startWatch(IContext ctx, TopoServer topoServer, String cell, String tabletKeyspace, TimeUnit timeUnit) { + public void startWatch(IContext ctx, TopoServer topoServer, String cell, String tabletKeyspace) { lock.lock(); try { - String serverAddress = topoServer.getServerAddress(); - if (!globalKeyspacesMap.containsKey(serverAddress)) { - globalKeyspacesMap.put(serverAddress, new HashSet<>()); - - startTickerReloadCell(ctx, topoServer, timeUnit); + if (!cellTopologyWatcherMap.containsKey(cell)) { + TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, tabletKeyspace); + topologyWatcher.start(ctx); + cellTopologyWatcherMap.put(cell, topologyWatcher); } - globalKeyspacesMap.get(serverAddress).add(tabletKeyspace); + } finally { + lock.unlock(); + } + } + public void startWatch(IContext ctx, TopoServer topoServer, String cell, Set keyspaces) { + lock.lock(); + try { if (!cellTopologyWatcherMap.containsKey(cell)) { - TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, tabletKeyspace); + TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, keyspaces); topologyWatcher.start(ctx); cellTopologyWatcherMap.put(cell, topologyWatcher); } @@ -89,68 +74,14 @@ public void watch(IContext ctx, String cell, String tabletKeyspace) { } public void close() { - closeScheduledExecutor(); - for (Map.Entry entry : cellTopologyWatcherMap.entrySet()) { TopologyWatcher topologyWatcher = entry.getValue(); topologyWatcher.close(); } cellTopologyWatcherMap.clear(); - globalKeyspacesMap.clear(); - } - - public void resetScheduledExecutor() { - closeScheduledExecutor(); - - scheduledExecutor = new ScheduledThreadPoolExecutor(1, new VtThreadFactoryBuilder.DefaultThreadFactory("reload-cell-schedule", true)); - scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - scheduledExecutor.setRemoveOnCancelPolicy(true); - } - - public void closeScheduledExecutor() { - scheduledExecutor.shutdownNow(); - try { - int tryAgain = 3; - while (tryAgain > 0 && !scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS)) { - tryAgain--; - } - } catch (InterruptedException e) { - // We're shutting down anyway, so just ignore. - } } public boolean isWatching(String cell) { return cellTopologyWatcherMap.containsKey(cell); } - - public void startTickerReloadCell(IContext globalContext, TopoServer topoServer, TimeUnit timeUnit) { - scheduledExecutor.scheduleWithFixedDelay(() -> { - try { - tickerUpdateCells(globalContext, topoServer); - } catch (Throwable e) { - LOGGER.error("tickerUpdateCells error: " + e); - } - }, 5, 10, timeUnit); - } - - private void tickerUpdateCells(IContext globalContext, TopoServer topoServer) throws TopoException { - String serverAddress = topoServer.getServerAddress(); - Set keyspaceSet = globalKeyspacesMap.get(serverAddress); - if (CollectionUtils.isEmpty(keyspaceSet)) { - throw new RuntimeException("not found keyspace in " + serverAddress + " of TopologyWatcherManager.globalKeyspacesMap ."); - } - List allCells = topoServer.getAllCells(globalContext); - for (String cell : allCells) { - if (!isWatching(cell)) { - lock.lock(); - try { - TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, keyspaceSet); - topologyWatcher.start(globalContext); - cellTopologyWatcherMap.put(cell, topologyWatcher); - } finally { - lock.unlock(); - } - } - } - } } diff --git a/src/main/java/com/jd/jdbc/evalengine/EvalEngine.java b/src/main/java/com/jd/jdbc/evalengine/EvalEngine.java index ef8f8b7..30586ad 100644 --- a/src/main/java/com/jd/jdbc/evalengine/EvalEngine.java +++ b/src/main/java/com/jd/jdbc/evalengine/EvalEngine.java @@ -159,6 +159,12 @@ public static VtResultValue nullSafeAdd(VtResultValue value1, VtResultValue valu BigDecimal bResult = b1.add(b2); return VtResultValue.newVtResultValue(Query.Type.DECIMAL, bResult); } + if (Query.Type.FLOAT64.equals(resultType)) { + BigDecimal f1 = (BigDecimal) ResultSetUtil.convertValue(value1, BigDecimal.class); + BigDecimal f2 = (BigDecimal) ResultSetUtil.convertValue(value2, BigDecimal.class); + BigDecimal fResult = f1.add(f2); + return VtResultValue.newVtResultValue(Query.Type.FLOAT64, fResult); + } throw new SQLException("nullSafeAdd error"); } @@ -861,7 +867,7 @@ public String string() { public static class AnyExpr implements Expr { - private SQLExpr expr; + private final SQLExpr expr; public AnyExpr(SQLExpr expr) { this.expr = expr; @@ -907,7 +913,7 @@ public void output(StringBuilder builder, boolean wrap, Map tupleExpr; + private final List tupleExpr; @Override public EvalResult evaluate(ExpressionEnv env) throws SQLException { diff --git a/src/main/java/com/jd/jdbc/monitor/TopoServerCollector.java b/src/main/java/com/jd/jdbc/monitor/TopoServerCollector.java new file mode 100644 index 0000000..7a4dccb --- /dev/null +++ b/src/main/java/com/jd/jdbc/monitor/TopoServerCollector.java @@ -0,0 +1,42 @@ +/* +Copyright 2024 JD Project Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.monitor; + +import io.prometheus.client.Counter; + +public class TopoServerCollector { + + private static final Counter EXEC_COUNTER = Counter.build() + .name("TopoServer_get_all_cell_counter") + .labelNames("Address") + .help("TopoServer get all cells counter") + .register(MonitorServer.getCollectorRegistry()); + + private static final Counter NEW_CELLS_COUNTER = Counter.build() + .name("TopoServer_new_cell_counter") + .labelNames("Address") + .help("TopoServer add new cell counter") + .register(MonitorServer.getCollectorRegistry()); + + public static Counter getExecCounterCounter() { + return EXEC_COUNTER; + } + + public static Counter geCellsCounter() { + return NEW_CELLS_COUNTER; + } +} diff --git a/src/main/java/com/jd/jdbc/topo/Topo.java b/src/main/java/com/jd/jdbc/topo/Topo.java index bf56533..83302b3 100644 --- a/src/main/java/com/jd/jdbc/topo/Topo.java +++ b/src/main/java/com/jd/jdbc/topo/Topo.java @@ -18,6 +18,7 @@ package com.jd.jdbc.topo; +import com.jd.jdbc.context.IContext; import static com.jd.jdbc.topo.TopoExceptionCode.NO_IMPLEMENTATION; import static com.jd.jdbc.topo.TopoServer.CELLS_PATH; import static com.jd.jdbc.topo.TopoServer.CELL_INFO_FILE; @@ -34,6 +35,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class Topo { @@ -45,7 +47,7 @@ public class Topo { public static String TOPO_GLOBAL_ROOT = "/vitess/global"; - public static TopoServer getTopoServer(TopoServerImplementType topoServerImplementType, String topoServerAddress) throws TopoException { + public static TopoServer getTopoServer(IContext ctx, TopoServerImplementType topoServerImplementType, String topoServerAddress) throws TopoException { synchronized (Topo.class) { registerFactory(topoServerImplementType); @@ -58,6 +60,7 @@ public static TopoServer getTopoServer(TopoServerImplementType topoServerImpleme throw new TopoException(Vtrpc.Code.UNKNOWN, ""); } topoServers.put(topoServerAddress, topoServer); + topoServer.startTickerReloadCell(ctx); return topoServer; } } @@ -124,7 +127,10 @@ protected static TopoServer newWithFactory(TopoFactory topoFactory, String serve topoServer.globalCell = conn; topoServer.globalReadOnlyCell = connReadOnly; topoServer.topoFactory = topoFactory; - topoServer.cells = new HashMap<>(16); + topoServer.cellsTopoConnMap = new HashMap<>(16); + topoServer.keyspaces = ConcurrentHashMap.newKeySet(); + topoServer.cells = ConcurrentHashMap.newKeySet(); + topoServer.localCell = ""; topoServer.serverAddress = serverAddress; return topoServer; } diff --git a/src/main/java/com/jd/jdbc/topo/TopoServer.java b/src/main/java/com/jd/jdbc/topo/TopoServer.java index 3e147c4..1588903 100644 --- a/src/main/java/com/jd/jdbc/topo/TopoServer.java +++ b/src/main/java/com/jd/jdbc/topo/TopoServer.java @@ -20,9 +20,13 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.jd.jdbc.context.IContext; +import com.jd.jdbc.discovery.TopologyWatcherManager; import com.jd.jdbc.key.CurrentShard; +import com.jd.jdbc.monitor.TopoServerCollector; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; +import com.jd.jdbc.srvtopo.ResilientServer; +import com.jd.jdbc.srvtopo.SrvTopoServer; import static com.jd.jdbc.topo.Topo.pathForCellInfo; import static com.jd.jdbc.topo.Topo.pathForSrvKeyspaceFile; import static com.jd.jdbc.topo.Topo.pathForTabletAlias; @@ -31,16 +35,20 @@ import com.jd.jdbc.topo.TopoConnection.DirEntry; import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; import com.jd.jdbc.topo.topoproto.TopoProto; +import com.jd.jdbc.util.ScheduledManager; import io.vitess.proto.Topodata; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import lombok.Setter; import vschema.Vschema; public class TopoServer implements Resource, TopoCellInfo, TopoSrvKeyspace, TopoTablet, TopoVschema { @@ -75,15 +83,25 @@ public class TopoServer implements Resource, TopoCellInfo, TopoSrvKeyspace, Topo ReentrantLock lock; - Map cells; + Map cellsTopoConnMap; + + Set cells; + + String localCell; + + Set keyspaces; String serverAddress; + @Setter + private ScheduledManager scheduledManager; + /** * */ TopoServer() { lock = new ReentrantLock(true); + scheduledManager = new ScheduledManager("reload-cell-schedule", 1, TimeUnit.MINUTES); } /** @@ -100,7 +118,7 @@ public TopoConnection connForCell(IContext ctx, String cell) throws TopoExceptio this.lock.lock(); TopoConnection topoConnection; try { - topoConnection = this.cells.get(cell); + topoConnection = this.cellsTopoConnMap.get(cell); } finally { this.lock.unlock(); } @@ -112,14 +130,14 @@ public TopoConnection connForCell(IContext ctx, String cell) throws TopoExceptio this.lock.lock(); try { - topoConnection = this.cells.get(cell); + topoConnection = this.cellsTopoConnMap.get(cell); if (topoConnection != null) { return topoConnection; } String serverAddr = cellInfo.getServerAddress(); topoConnection = this.topoFactory.create(cell, serverAddr, cellInfo.getRoot()); topoConnection = new TopoStatsConnection(cell, topoConnection); - this.cells.put(cell, topoConnection); + this.cellsTopoConnMap.put(cell, topoConnection); return topoConnection; } catch (Exception e) { if (TopoException.isErrType(e, NO_NODE)) { @@ -138,15 +156,19 @@ public TopoConnection connForCell(IContext ctx, String cell) throws TopoExceptio @Override public void close() { this.globalCell.close(); + scheduledManager.close(); + if (this.globalReadOnlyCell != this.globalCell) { this.globalReadOnlyCell.close(); } this.globalCell = null; this.globalReadOnlyCell = null; + this.keyspaces.clear(); + this.cells.clear(); lock.lock(); try { - this.cells.forEach((s, topoConnection) -> topoConnection.close()); - this.cells = new HashMap<>(16); + this.cellsTopoConnMap.forEach((s, topoConnection) -> topoConnection.close()); + this.cellsTopoConnMap = new HashMap<>(16); } finally { lock.unlock(); } @@ -198,22 +220,46 @@ public List getAllCells(IContext ctx) throws TopoException { return cells; } - public String getLocalCell(IContext globalContext, TopoServer topoServer, List cells, String defaultKeyspace) throws TopoException { - String localCell = cells.get(0); + public Set getCells(IContext ctx) throws TopoException { + if (this.cells.isEmpty()) { + this.cells.addAll(this.getAllCells(ctx)); + } + return this.cells; + } + + public String getLocalCell(IContext globalContext, SrvTopoServer srvTopoServer, Set cells, String defaultKeyspace) throws TopoException { + if (this.localCell.isEmpty()) { + return resetLocalCell(globalContext, cells, defaultKeyspace); + } + + String currentLocalCell = this.localCell; + ResilientServer.GetSrvKeyspaceResponse srvKeyspace = srvTopoServer.getSrvKeyspace(globalContext, currentLocalCell, defaultKeyspace); + Exception err = srvKeyspace.getException(); + if (err != null) { + return resetLocalCell(globalContext, cells, defaultKeyspace); + } + + return currentLocalCell; + } + + private String resetLocalCell(IContext globalContext, Set cells, String defaultKeyspace) throws TopoException { + String errMessage = ""; for (String cell : cells) { try { - Topodata.SrvKeyspace getSrvKeyspace = topoServer.getSrvKeyspace(globalContext, cell, defaultKeyspace); + Topodata.SrvKeyspace getSrvKeyspace = this.getSrvKeyspace(globalContext, cell, defaultKeyspace); if (getSrvKeyspace != null) { - localCell = cell; - return localCell; + this.localCell = cell; + return cell; } } catch (TopoException e) { - if (e.getCode() != TopoExceptionCode.NO_NODE) { + if (e.getCode() != NO_NODE) { throw TopoException.wrap(e.getMessage()); } + errMessage = e.getMessage(); } } - return localCell; + + throw TopoException.wrap(NO_NODE, errMessage + " OR invalid local cell: " + cells); } /** @@ -343,7 +389,6 @@ public List getTabletAliasByCell(IContext ctx, String cell } catch (TopoException e) { if (TopoException.isErrType(e, NO_NODE)) { log.debug("failed to get tablet in cell " + cell + " . error: " + e); - return null; } throw e; } @@ -390,4 +435,31 @@ public Vschema.Keyspace getVschema(IContext ctx, String keyspaceName) throws Top public String getServerAddress() { return this.serverAddress; } -} \ No newline at end of file + + public void addKeyspace(String ks) { + this.keyspaces.add(ks); + } + + public void startTickerReloadCell(IContext globalContext) { + scheduledManager.getScheduledExecutor().scheduleWithFixedDelay(() -> { + try { + tickerUpdateCells(globalContext); + } catch (Throwable e) { + log.error("tickerUpdateCells error: " + e); + } + }, 5, 10, scheduledManager.getTimeUnit()); + } + + private void tickerUpdateCells(IContext globalContext) throws TopoException { + List allCells = this.getAllCells(globalContext); + for (String cell : allCells) { + if (!this.cells.contains(cell)) { + this.cells.add(cell); + TopologyWatcherManager.INSTANCE.startWatch(globalContext, this, cell, this.keyspaces); + + TopoServerCollector.geCellsCounter().labels(this.serverAddress).inc(); + } + } + TopoServerCollector.getExecCounterCounter().labels(this.serverAddress).inc(); + } +} diff --git a/src/main/java/com/jd/jdbc/util/ScheduledManager.java b/src/main/java/com/jd/jdbc/util/ScheduledManager.java new file mode 100644 index 0000000..eb44b23 --- /dev/null +++ b/src/main/java/com/jd/jdbc/util/ScheduledManager.java @@ -0,0 +1,58 @@ +/* +Copyright 2024 JD Project Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.util; + +import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class ScheduledManager { + + private ScheduledThreadPoolExecutor scheduledExecutor; + + private TimeUnit timeUnit; + + public ScheduledManager() { + scheduledExecutor = new ScheduledThreadPoolExecutor(1, new VtThreadFactoryBuilder.DefaultThreadFactory("default-schedule", true)); + scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduledExecutor.setRemoveOnCancelPolicy(true); + timeUnit = TimeUnit.MINUTES; + } + + public ScheduledManager(String threadName, int poolSize, TimeUnit timeInternal) { + scheduledExecutor = new ScheduledThreadPoolExecutor(poolSize, new VtThreadFactoryBuilder.DefaultThreadFactory(threadName, true)); + scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduledExecutor.setRemoveOnCancelPolicy(true); + timeUnit = timeInternal; + } + + public void close() { + scheduledExecutor.shutdownNow(); + try { + int tryAgain = 3; + while (tryAgain > 0 && !scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS)) { + tryAgain--; + } + } catch (InterruptedException e) { + // We're shutting down anyway, so just ignore. + } + } +} diff --git a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java index a587a8f..4de6078 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java @@ -55,8 +55,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Set; import java.util.logging.Logger; public class VitessDriver implements java.sql.Driver { @@ -80,8 +79,6 @@ public class VitessDriver implements java.sql.Driver { new BinaryHash(); } - private final ReentrantLock lock = new ReentrantLock(); - public Connection initConnect(String url, Properties info, boolean initOnly) throws SQLException { if (log.isDebugEnabled()) { log.debug("initConnect,url=" + url); @@ -100,13 +97,13 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr if (!prop.containsKey(Constant.DRIVER_PROPERTY_ROLE_KEY)) { prop.put(Constant.DRIVER_PROPERTY_ROLE_KEY, role); } - TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, "http://" + prop.getProperty("host") + ":" + prop.getProperty("port")); + TopoServer topoServer = Topo.getTopoServer(globalContext, Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, "http://" + prop.getProperty("host") + ":" + prop.getProperty("port")); ResilientServer resilientServer = SrvTopo.newResilientServer(topoServer, "ResilientSrvTopoServer"); - List cells = topoServer.getAllCells(globalContext); + Set cells = topoServer.getCells(globalContext); for (String cell : cells) { - TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, tabletKeyspace, TimeUnit.MINUTES); + TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, tabletKeyspace); } TabletGateway tabletGateway = TabletGateway.build(resilientServer); @@ -115,7 +112,7 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr TopologyWatcherManager.INSTANCE.watch(globalContext, cell, tabletKeyspace); } - String localCell = topoServer.getLocalCell(globalContext, topoServer, cells, tabletKeyspace); + String localCell = topoServer.getLocalCell(globalContext, resilientServer, cells, tabletKeyspace); boolean masterFlag = role.equalsIgnoreCase(Constant.DRIVER_PROPERTY_ROLE_RW); List tabletTypes = masterFlag diff --git a/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java b/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java index 0cb1c13..e6f4b44 100644 --- a/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java +++ b/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java @@ -19,6 +19,7 @@ package com.jd.jdbc.discovery; import com.jd.jdbc.common.Constant; +import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; import com.jd.jdbc.monitor.SrvKeyspaceCollector; import com.jd.jdbc.topo.Topo; @@ -60,6 +61,8 @@ @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class EtcdTopoServerTest extends TestSuite { + private final IContext globalContext = VtContext.withCancel(VtContext.background()); + private static final ExecutorService executorService = getThreadPool(10, 10); private TopoServer topoServer; @@ -76,9 +79,9 @@ public class EtcdTopoServerTest extends TestSuite { private String keyspacePrefix; - private String counter = "watch_SrvKeyspace_counter_total"; + private final String counter = "watch_SrvKeyspace_counter_total"; - private String errorCounter = "watch_SrvKeyspace_error_counter_total"; + private final String errorCounter = "watch_SrvKeyspace_error_counter_total"; @AfterClass public static void afterClass() { @@ -92,7 +95,7 @@ public void init() throws TopoException, SQLException { keyspace = prop.getProperty(Constant.DRIVER_PROPERTY_SCHEMA); String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port"); - topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); + topoServer = Topo.getTopoServer(globalContext, Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background())); cell = cells.get(0); keyspacePrefix = "testkeyspace"; diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index e7cd644..f08aeae 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -44,7 +44,6 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.AllArgsConstructor; import lombok.ToString; @@ -617,10 +616,10 @@ public void testGetHealthyTablet() { @Test public void testPrimaryInOtherCell() throws TopoException, InterruptedException { - TopologyWatcherManager.INSTANCE.resetScheduledExecutor(); - TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer(); startWatchTopo("k", topoServer, "cell1", "cell2"); + topoServer.addKeyspace("k"); + topoServer.startTickerReloadCell(globalContext); printComment("10. HealthCheck Test Primary in other cell"); printComment("a. Get Health"); @@ -640,16 +639,18 @@ public void testPrimaryInOtherCell() throws TopoException, InterruptedException Assert.assertEquals(1, hcList.size()); MockTablet.closeQueryService(mockTablet); + TopologyWatcherManager.INSTANCE.close(); + topoServer.close(); printOk(); } @Test - public void testReplicaInOtherCell() throws TopoException, InterruptedException { - TopologyWatcherManager.INSTANCE.resetScheduledExecutor(); - + public void testReplicaInOtherCell() throws TopoException { TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer(); startWatchTopo("k", topoServer, "cell1", "cell2"); + topoServer.addKeyspace("k"); + topoServer.startTickerReloadCell(globalContext); printComment("11. HealthCheck Test Primary in other cell"); printComment("a. Get Health"); @@ -683,6 +684,8 @@ public void testReplicaInOtherCell() throws TopoException, InterruptedException Assert.assertEquals(2, hcList.size()); MockTablet.closeQueryService(mockTablet, mockTablet1); + TopologyWatcherManager.INSTANCE.close(); + topoServer.close(); printOk(); } @@ -1145,7 +1148,7 @@ private List mockGetHealthyTabletStats(Map allCells = topoServer.getAllCells(ctx); } // createTablet creates a new tablet and all associated paths for the diff --git a/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java b/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java index 59da55f..f432581 100644 --- a/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java +++ b/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java @@ -18,6 +18,7 @@ package com.jd.jdbc.topo.etcd2topo; +import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; import com.jd.jdbc.topo.Topo; import com.jd.jdbc.topo.TopoServer; @@ -33,6 +34,8 @@ public class Etcd2TopoServerTest extends TestSuite { + private final IContext globalContext = VtContext.withCancel(VtContext.background()); + @Test @Ignore public void testTopoExecuteTimeout() throws Exception { @@ -42,7 +45,7 @@ public void testTopoExecuteTimeout() throws Exception { String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)); Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null); String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port"); - TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); + TopoServer topoServer = Topo.getTopoServer(globalContext, Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); List cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background())); String cell = cells.get(0); topoServer.connForCell(null, cell); @@ -58,7 +61,7 @@ public void testTopoExecuteTimeout2() throws Exception { String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)); Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null); String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port"); - TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); + TopoServer topoServer = Topo.getTopoServer(globalContext, Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); List cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background())); String cell = cells.get(0); topoServer.connForCell(null, cell); diff --git a/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java b/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java index e780201..3dd9cb9 100644 --- a/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java +++ b/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import com.jd.jdbc.common.util.CollectionUtils; +import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; import com.jd.jdbc.srvtopo.ResilientServer; import com.jd.jdbc.srvtopo.SrvTopo; @@ -79,6 +80,8 @@ public class ServerTest extends TestSuite { private static final ExecutorService executorService = getThreadPool(10, 10); + private static final IContext globalContext = VtContext.withCancel(VtContext.background()); + @AfterClass public static void afterClass() { if (executorService != null) { @@ -97,7 +100,7 @@ public static TopoServer open() throws TopoException { if (StringUtil.isNullOrEmpty(TOPO_GLOBAL_ROOT)) { throw TopoException.wrap("topo_global_root must be non-empty"); } - return Topo.getTopoServer(TOPO_IMPLEMENTATION, TOPO_GLOBAL_SERVER_ADDRESS); + return Topo.getTopoServer(globalContext, TOPO_IMPLEMENTATION, TOPO_GLOBAL_SERVER_ADDRESS); } @BeforeClass @@ -141,7 +144,7 @@ public void case02_testServer() { @Test public void case03_testServer() { try { - TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, TOPO_GLOBAL_PROXY_ADDRESS); + TopoServer topoServer = Topo.getTopoServer(globalContext, Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, TOPO_GLOBAL_PROXY_ADDRESS); Assert.assertNotNull(topoServer); this.commonTestServer(topoServer); } catch (TopoException e) { diff --git a/src/test/java/com/jd/jdbc/vitess/KeyspaceNotExistTest.java b/src/test/java/com/jd/jdbc/vitess/KeyspaceNotExistTest.java index cfd8031..02156dc 100644 --- a/src/test/java/com/jd/jdbc/vitess/KeyspaceNotExistTest.java +++ b/src/test/java/com/jd/jdbc/vitess/KeyspaceNotExistTest.java @@ -19,7 +19,6 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.sql.SQLSyntaxErrorException; import java.sql.Statement; import org.junit.Rule; import org.junit.Test; @@ -37,10 +36,10 @@ public void case05() throws SQLException { String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)); String keyspaceNotExist = keyspace + "not_exist"; connectionUrl = connectionUrl.replaceAll(keyspace, keyspaceNotExist); - thrown.expect(SQLSyntaxErrorException.class); - thrown.expectMessage("Unknown database '" + keyspaceNotExist + "'"); + thrown.expectMessage("node doesn't exist"); + thrown.expectMessage("/keyspaces/" + keyspaceNotExist + "/SrvKeyspace"); try (Connection connection = DriverManager.getConnection(connectionUrl); - Statement stmt = connection.createStatement();) { + Statement stmt = connection.createStatement()) { stmt.executeQuery("select 1"); } } diff --git a/src/test/java/com/jd/jdbc/vitess/TypeTest.java b/src/test/java/com/jd/jdbc/vitess/TypeTest.java index 7866c16..d21f5c1 100644 --- a/src/test/java/com/jd/jdbc/vitess/TypeTest.java +++ b/src/test/java/com/jd/jdbc/vitess/TypeTest.java @@ -41,15 +41,14 @@ import org.apache.commons.lang3.RandomUtils; import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import testsuite.TestSuite; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import testsuite.TestSuite; import static testsuite.internal.TestSuiteShardSpec.TWO_SHARDS; /* @@ -532,4 +531,107 @@ private void cleanBitTinyint() throws SQLException { stmt.executeUpdate("delete from type_bit_tinyint"); } } + + @Test + public void sumFields() throws SQLException { + String sql = "select sum(f_key), sum(f_tinyint), sum(f_u_tinyint),sum(f_smallint),sum(f_u_smallint),sum(f_mediumint),sum(f_u_mediumint),sum(f_int),sum(f_u_int),sum(f_bigint),sum(f_u_bigint)" + + ",sum(f_float),sum(f_u_float),sum(f_double),sum(f_u_double),sum(f_decimal),sum(f_u_decimal),sum(f_bit),sum(f_date)," + + "sum(f_time),sum(f_datetime),sum(f_timestamp),sum(f_year),sum(f_boolean),sum(f_varchar),sum(f_text),sum(f_ttext)" + + ",sum(f_mtext),sum(f_ltext),sum(f_varbinary),sum(f_blob),sum(f_mblob),sum(f_tblob),sum(f_lblob),sum(f_binary)," + + "sum(f_enum),sum(f_set),sum(f_json) from type_test "; + stmtSelect = driverConnection.prepareStatement(sql); + ResultSet rs = stmtSelect.executeQuery(); + while (rs.next()) { + System.out.println(rs.getMetaData().getColumnTypeName(1)); + System.out.println(rs.getMetaData().getColumnTypeName(2)); + System.out.println(rs.getMetaData().getColumnTypeName(3)); + System.out.println(rs.getMetaData().getColumnTypeName(4)); + System.out.println(rs.getMetaData().getColumnTypeName(5)); + System.out.println(rs.getMetaData().getColumnTypeName(6)); + System.out.println(rs.getMetaData().getColumnTypeName(7)); + System.out.println(rs.getMetaData().getColumnTypeName(8)); + System.out.println(rs.getMetaData().getColumnTypeName(9)); + System.out.println(rs.getMetaData().getColumnTypeName(10)); + System.out.println(rs.getMetaData().getColumnTypeName(11)); + System.out.println(rs.getMetaData().getColumnTypeName(12)); + System.out.println(rs.getMetaData().getColumnTypeName(13)); + System.out.println(rs.getMetaData().getColumnTypeName(14)); + System.out.println(rs.getMetaData().getColumnTypeName(15)); + System.out.println(rs.getMetaData().getColumnTypeName(16)); + System.out.println(rs.getMetaData().getColumnTypeName(17)); + System.out.println(rs.getMetaData().getColumnTypeName(18)); + System.out.println(rs.getMetaData().getColumnTypeName(19)); + System.out.println(rs.getMetaData().getColumnTypeName(20)); + System.out.println(rs.getMetaData().getColumnTypeName(21)); + System.out.println(rs.getMetaData().getColumnTypeName(22)); + System.out.println(rs.getMetaData().getColumnTypeName(23)); + System.out.println(rs.getMetaData().getColumnTypeName(24)); + System.out.println(rs.getMetaData().getColumnTypeName(25)); + System.out.println(rs.getMetaData().getColumnTypeName(26)); + System.out.println(rs.getMetaData().getColumnTypeName(27)); + System.out.println(rs.getMetaData().getColumnTypeName(28)); + System.out.println(rs.getMetaData().getColumnTypeName(29)); + System.out.println(rs.getMetaData().getColumnTypeName(30)); + System.out.println(rs.getMetaData().getColumnTypeName(31)); + System.out.println(rs.getMetaData().getColumnTypeName(32)); + System.out.println(rs.getMetaData().getColumnTypeName(33)); + System.out.println(rs.getMetaData().getColumnTypeName(34)); + System.out.println(rs.getMetaData().getColumnTypeName(35)); + System.out.println(rs.getMetaData().getColumnTypeName(36)); + System.out.println(rs.getMetaData().getColumnTypeName(37)); + System.out.println(rs.getMetaData().getColumnTypeName(38)); + } + } + + @Test + public void countFields() throws SQLException { + String sql = + "select count(f_key), count(f_tinyint), count(f_u_tinyint),count(f_smallint),count(f_u_smallint),count(f_mediumint),count(f_u_mediumint),count(f_int),count(f_u_int),count(f_bigint),count(f_u_bigint)" + + ",count(f_float),count(f_u_float),count(f_double),count(f_u_double),count(f_decimal),count(f_u_decimal),count(f_bit),count(f_date)," + + "count(f_time),count(f_datetime),count(f_timestamp),count(f_year),count(f_boolean),count(f_varchar),count(f_text),count(f_ttext)" + + ",count(f_mtext),count(f_ltext),count(f_varbinary),count(f_blob),count(f_mblob),count(f_tblob),count(f_lblob),count(f_binary)," + + "count(f_enum),count(f_set),count(f_json) from type_test "; + stmtSelect = driverConnection.prepareStatement(sql); + ResultSet rs = stmtSelect.executeQuery(); + while (rs.next()) { + System.out.println(rs.getMetaData().getColumnTypeName(1)); + System.out.println(rs.getMetaData().getColumnTypeName(2)); + System.out.println(rs.getMetaData().getColumnTypeName(3)); + System.out.println(rs.getMetaData().getColumnTypeName(4)); + System.out.println(rs.getMetaData().getColumnTypeName(5)); + System.out.println(rs.getMetaData().getColumnTypeName(6)); + System.out.println(rs.getMetaData().getColumnTypeName(7)); + System.out.println(rs.getMetaData().getColumnTypeName(8)); + System.out.println(rs.getMetaData().getColumnTypeName(9)); + System.out.println(rs.getMetaData().getColumnTypeName(10)); + System.out.println(rs.getMetaData().getColumnTypeName(11)); + System.out.println(rs.getMetaData().getColumnTypeName(12)); + System.out.println(rs.getMetaData().getColumnTypeName(13)); + System.out.println(rs.getMetaData().getColumnTypeName(14)); + System.out.println(rs.getMetaData().getColumnTypeName(15)); + System.out.println(rs.getMetaData().getColumnTypeName(16)); + System.out.println(rs.getMetaData().getColumnTypeName(17)); + System.out.println(rs.getMetaData().getColumnTypeName(18)); + System.out.println(rs.getMetaData().getColumnTypeName(19)); + System.out.println(rs.getMetaData().getColumnTypeName(20)); + System.out.println(rs.getMetaData().getColumnTypeName(21)); + System.out.println(rs.getMetaData().getColumnTypeName(22)); + System.out.println(rs.getMetaData().getColumnTypeName(23)); + System.out.println(rs.getMetaData().getColumnTypeName(24)); + System.out.println(rs.getMetaData().getColumnTypeName(25)); + System.out.println(rs.getMetaData().getColumnTypeName(26)); + System.out.println(rs.getMetaData().getColumnTypeName(27)); + System.out.println(rs.getMetaData().getColumnTypeName(28)); + System.out.println(rs.getMetaData().getColumnTypeName(29)); + System.out.println(rs.getMetaData().getColumnTypeName(30)); + System.out.println(rs.getMetaData().getColumnTypeName(31)); + System.out.println(rs.getMetaData().getColumnTypeName(32)); + System.out.println(rs.getMetaData().getColumnTypeName(33)); + System.out.println(rs.getMetaData().getColumnTypeName(34)); + System.out.println(rs.getMetaData().getColumnTypeName(35)); + System.out.println(rs.getMetaData().getColumnTypeName(36)); + System.out.println(rs.getMetaData().getColumnTypeName(37)); + System.out.println(rs.getMetaData().getColumnTypeName(38)); + } + } } \ No newline at end of file diff --git a/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java b/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java index 8b4056f..d268f01 100644 --- a/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java +++ b/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java @@ -16,8 +16,8 @@ package com.jd.jdbc.vitess; -import com.jd.jdbc.discovery.TopologyWatcherManager; import com.jd.jdbc.sqlparser.utils.StringUtils; +import static com.jd.jdbc.vitess.VitessJdbcUrlParser.JDBC_VITESS_PREFIX; import java.net.URI; import java.net.URISyntaxException; import java.sql.DriverManager; @@ -28,13 +28,11 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import testsuite.TestSuite; -import testsuite.internal.environment.TestSuiteEnv; - -import static com.jd.jdbc.vitess.VitessJdbcUrlParser.JDBC_VITESS_PREFIX; import static testsuite.internal.TestSuiteShardSpec.TWO_SHARDS; +import testsuite.internal.environment.TestSuiteEnv; public class VitessJdbcUrlParserTest extends TestSuite { - private TestSuiteEnv env = Driver.of(TWO_SHARDS); + private final TestSuiteEnv env = Driver.of(TWO_SHARDS); private String schema = getKeyspace(env); @@ -53,8 +51,6 @@ public class VitessJdbcUrlParserTest extends TestSuite { private VitessConnection conn; private void init() throws SQLException { - TopologyWatcherManager.INSTANCE.resetScheduledExecutor(); - String connecturlionUrl = getConnectionUrl(env); URI uri = null;