From 8b85d2ce049ef1108168fbf253d09476f201b048 Mon Sep 17 00:00:00 2001 From: zhangzhiyong Date: Mon, 11 Nov 2024 12:05:53 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=20feat:=20uds=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=8D=8F=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/xiaomi/data/push/common/NetUtils.java | 8 +- .../com/xiaomi/data/push/uds/UdsClient.java | 24 +++--- .../com/xiaomi/data/push/uds/UdsServer.java | 27 ++++++- .../push/uds/handler/UdsServerHandler.java | 4 +- .../processor/sever/MockStreamProcessor.java | 76 ------------------- 5 files changed, 45 insertions(+), 94 deletions(-) delete mode 100644 jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java index d0a289181..f5111615e 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java @@ -13,10 +13,10 @@ */ public class NetUtils { - public static EventLoopGroup getEventLoopGroup() { -// if (CommonUtils.isMac() && CommonUtils.isArch64()) { -// return new NioEventLoopGroup(); -// } + public static EventLoopGroup getEventLoopGroup(boolean remote) { + if (remote) { + return new NioEventLoopGroup(); + } if (CommonUtils.isWindows()) { return new NioEventLoopGroup(); } diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java index 7a25002e7..55ec1dbf6 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java @@ -84,7 +84,7 @@ public UdsClient(String id) { private EventLoopGroup getEventLoopGroup() { - return NetUtils.getEventLoopGroup(); + return NetUtils.getEventLoopGroup(this.remote); } @@ -135,10 +135,6 @@ public void call(Object msg) { Send.send(this.channel, command); } - - /** - * 发送OpenAI流式请求 - */ public void stream(UdsCommand command, ClientStreamCallback callback) { Map attachments = command.getAttachments(); // 注册回调 @@ -169,13 +165,23 @@ public UdsCommand call(UdsCommand req) { } log.debug("start send,id:{}", id); Send.send(channel, req); + + wheelTimer.newTimeout(() -> { + log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout()); + HashMap map = reqMap.remove(req.getId()); + if (null != map) { + CompletableFuture f = (CompletableFuture) map.get("future"); + if (null != f && !f.isDone()) { + future.completeExceptionally( + new TimeoutException("Request timeout: " + req.getTimeout()) + ); + } + } + }, req.getTimeout() + 350); + //异步还是同步 if (req.isAsync()) { req.setCompletableFuture(future); - wheelTimer.newTimeout(() -> { - log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout()); - reqMap.remove(req.getId()); - }, req.getTimeout() + 350); return req; } return (UdsCommand) future.get(req.getTimeout(), TimeUnit.MILLISECONDS); diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java index 2f3082769..463ecc3cf 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java @@ -18,6 +18,7 @@ import com.google.common.base.Stopwatch; import com.xiaomi.data.push.common.*; +import com.xiaomi.data.push.uds.WheelTimer.UdsWheelTimer; import com.xiaomi.data.push.uds.context.TraceContext; import com.xiaomi.data.push.uds.context.TraceEvent; import com.xiaomi.data.push.uds.context.UdsServerContext; @@ -31,6 +32,8 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -42,7 +45,6 @@ import java.nio.file.Paths; import java.util.Optional; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -58,6 +60,10 @@ public class UdsServer implements IServer { private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); + // 创建时间轮,可以根据需要调整tick时间和轮子大小 + private UdsWheelTimer wheelTimer = new UdsWheelTimer(); + + /** * 是否使用remote模式(标准tcp) */ @@ -97,8 +103,8 @@ public void start(String path) { this.path = path; delPath(); boolean mac = CommonUtils.isMac(); - EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(); - EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(); + EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote); + EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) @@ -187,9 +193,24 @@ public UdsCommand call(UdsCommand req) { TraceContext context = new TraceContext(); context.enter(); long id = req.getId(); + + // 创建超时任务 + Timeout timeout = wheelTimer.newTimeout(() -> { + CompletableFuture future = reqMap.remove(id); + if (future != null && !future.isDone()) { + future.completeExceptionally( + new TimeoutException("Request timeout: " + req.getTimeout()) + ); + } + }, req.getTimeout()); + try { String app = req.getApp(); CompletableFuture future = new CompletableFuture<>(); + + // 添加完成时取消定时任务的回调 + future.whenComplete((k, v) -> timeout.cancel()); + reqMap.put(id, future); Channel channel = UdsServerContext.ins().channel(app); if (null == channel || !channel.isOpen()) { diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java index a66e0333c..7f8dfe999 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java @@ -34,8 +34,8 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; /** * @author goodjava@qq.com diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java deleted file mode 100644 index a69ee22ea..000000000 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.xiaomi.data.push.uds.processor.sever; - -import com.xiaomi.data.push.uds.handler.MessageTypes; -import com.xiaomi.data.push.uds.po.UdsCommand; -import com.xiaomi.data.push.uds.processor.StreamCallback; -import com.xiaomi.data.push.uds.processor.UdsProcessor; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author goodjava@qq.com - * @date 2024/11/7 11:53 - */ -public class MockStreamProcessor implements UdsProcessor { - - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - private final Map> activeStreams = new ConcurrentHashMap<>(); - - @Override - public boolean isStreamProcessor() { - return true; - } - - @Override - public String cmd() { - return "stream"; - } - - @Override - public UdsCommand processRequest(UdsCommand udsCommand) { - return null; - } - - - @Override - public void processStream(UdsCommand request, StreamCallback callback) { - - String streamId = request.getAttachments().getOrDefault( - MessageTypes.STREAM_ID_KEY, - UUID.randomUUID().toString() - ); - - AtomicInteger counter = new AtomicInteger(0); - - ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { - try { - int currentCount = counter.incrementAndGet(); - - if (currentCount <= 10) { - callback.onContent(String.valueOf(currentCount)); - - if (currentCount == 10) { - callback.onComplete(); - ScheduledFuture scheduledFuture = activeStreams.remove(streamId); - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - } - } - } - } catch (Exception e) { - callback.onError(e); - ScheduledFuture scheduledFuture = activeStreams.remove(streamId); - if (scheduledFuture != null) { - scheduledFuture.cancel(false); - } - } - }, 0, 1, TimeUnit.SECONDS); - - activeStreams.put(streamId, future); - - } -} From 6d19be804ed64075bdcc52856023212ef48d1911 Mon Sep 17 00:00:00 2001 From: zhangzhiyong Date: Mon, 11 Nov 2024 19:53:19 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=20feat:=20uds=20=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/xiaomi/data/push/uds/UdsClient.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java index 55ec1dbf6..50177d278 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java @@ -33,6 +33,7 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.util.Timeout; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -166,7 +167,7 @@ public UdsCommand call(UdsCommand req) { log.debug("start send,id:{}", id); Send.send(channel, req); - wheelTimer.newTimeout(() -> { + Timeout timeout = wheelTimer.newTimeout(() -> { log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout()); HashMap map = reqMap.remove(req.getId()); if (null != map) { @@ -177,7 +178,10 @@ public UdsCommand call(UdsCommand req) { ); } } - }, req.getTimeout() + 350); + }, req.getTimeout() + 200); + + // 添加完成时取消定时任务的回调 + future.whenComplete((k, v) -> timeout.cancel()); //异步还是同步 if (req.isAsync()) {