From d03710068a4889bbe34292870e3c801e62e9db97 Mon Sep 17 00:00:00 2001 From: MelonHell Date: Sat, 16 Dec 2023 00:24:56 +0300 Subject: [PATCH] little refactoring --- .../network/player/FakePlayerConnection.java | 6 +- .../player/PlayerSocketConnection.java | 144 +++++------------- .../EntityTrackerIntegrationTest.java | 5 - .../net/minestom/testing/TestConnection.java | 4 - .../minestom/testing/TestConnectionImpl.java | 43 +----- 5 files changed, 51 insertions(+), 151 deletions(-) diff --git a/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java b/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java index ec5bf1ddefd..7586e0b34bb 100644 --- a/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java +++ b/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java @@ -21,10 +21,8 @@ public void sendPacket(@NotNull SendablePacket packet) { } @Override - public void sendPacketAsync(@NotNull SendablePacket packet) { // I do not know what else can be done with the packages here - FakePlayerController controller = getFakePlayer().getController(); - final ServerPacket serverPacket = SendablePacket.extractServerPacket(getServerState(), packet); - controller.consumePacket(serverPacket); + public void sendPacketAsync(@NotNull SendablePacket packet) { + sendPacket(packet); } @NotNull diff --git a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java index f2b329f2f40..5e495782d88 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java @@ -51,7 +51,6 @@ public class PlayerSocketConnection extends PlayerConnection { private final Worker worker; private final MessagePassingQueue workerQueue; private final SocketChannel channel; - private final Object channelWriteMonitor = new Object(); private SocketAddress remoteAddress; private volatile boolean compressed = false; @@ -158,7 +157,12 @@ public void startCompression() { @Override public void sendPacket(@NotNull SendablePacket packet) { final boolean compressed = this.compressed; - this.workerQueue.relaxedOffer(() -> writePacketSync(packet, compressed)); + this.workerQueue.relaxedOffer(() -> writePacket(packet, compressed, false)); + } + + @Override + public void sendPacketAsync(@NotNull SendablePacket packet) { + writePacket(packet, this.compressed, true); } @Override @@ -166,19 +170,13 @@ public void sendPackets(@NotNull Collection packets) { final List packetsCopy = List.copyOf(packets); final boolean compressed = this.compressed; this.workerQueue.relaxedOffer(() -> { - for (SendablePacket packet : packetsCopy) writePacketSync(packet, compressed); + for (SendablePacket packet : packetsCopy) writePacket(packet, compressed, false); }); } - @Override - public void sendPacketAsync(@NotNull SendablePacket packet) { - final boolean compressed = this.compressed; - writePacketAsync(packet, compressed); - } - @ApiStatus.Internal public void write(@NotNull ByteBuffer buffer, int index, int length) { - this.workerQueue.relaxedOffer(() -> writeBufferSync(buffer, index, length)); + this.workerQueue.relaxedOffer(() -> writeBuffer(buffer, index, length, false)); } @ApiStatus.Internal @@ -207,9 +205,7 @@ public void setRemoteAddress(@NotNull SocketAddress remoteAddress) { public void disconnect() { super.disconnect(); this.workerQueue.relaxedOffer(() -> { - synchronized (channelWriteMonitor) { - this.worker.disconnect(this, channel); - } + this.worker.disconnect(this, channel); final BinaryBuffer tick = tickBuffer.getAndSet(null); if (tick != null) POOL.add(tick); for (BinaryBuffer buffer : waitingBuffers) POOL.add(buffer); @@ -342,82 +338,48 @@ public void setNonce(byte[] nonce) { this.nonce = nonce; } - /** - * Send Server Packet Event. - * - * @return {@link PlayerPacketOutEvent::isCancelled} state - **/ - private boolean sendEvent(SendablePacket packet) { + private void writePacket(SendablePacket packet, boolean compressed, boolean async) { + if (!channel.isConnected()) return; final Player player = getPlayer(); + // Outgoing event if (player != null && outgoing.hasListener()) { final ServerPacket serverPacket = SendablePacket.extractServerPacket(getServerState(), packet); PlayerPacketOutEvent event = new PlayerPacketOutEvent(player, serverPacket); outgoing.call(event); - return event.isCancelled(); + if (event.isCancelled()) return; } - return false; - } - - private void writePacketSync(SendablePacket packet, boolean compressed) { - if (!channel.isConnected()) return; - if (sendEvent(packet)) return; // Write packet // WARNING: A cached or framed packet will currently never go through writeServerPacketSync, // so a state change inside one of them will never actually be triggered. Currently, cached // packets are never used for packets that change state, so this is not a problem. if (packet instanceof ServerPacket serverPacket) { - writeServerPacketSync(serverPacket, compressed); + writeServerPacket(serverPacket, compressed, async); } else if (packet instanceof FramedPacket framedPacket) { var buffer = framedPacket.body(); - writeBufferSync(buffer, 0, buffer.limit()); + writeBuffer(buffer, 0, buffer.limit(), async); } else if (packet instanceof CachedPacket cachedPacket) { var buffer = cachedPacket.body(getServerState()); - if (buffer != null) writeBufferSync(buffer, buffer.position(), buffer.remaining()); - else writeServerPacketSync(cachedPacket.packet(getServerState()), compressed); + if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), async); + else writeServerPacket(cachedPacket.packet(getServerState()), compressed, async); } else if (packet instanceof LazyPacket lazyPacket) { - writeServerPacketSync(lazyPacket.packet(), compressed); + writeServerPacket(lazyPacket.packet(), compressed, async); } else { throw new RuntimeException("Unknown packet type: " + packet.getClass().getName()); } } - private void writePacketAsync(SendablePacket packet, boolean compressed) { - if (!channel.isConnected()) return; - if (sendEvent(packet)) return; - // Write packet async - if (packet instanceof ServerPacket serverPacket) { - writeServerPacketAsync(serverPacket, compressed); - } else if (packet instanceof FramedPacket framedPacket) { - var buffer = framedPacket.body(); - writeBufferAsync(buffer, 0, buffer.limit()); - } else if (packet instanceof CachedPacket cachedPacket) { - var buffer = cachedPacket.body(getServerState()); - if (buffer != null) writeBufferAsync(buffer, buffer.position(), buffer.remaining()); - else writeServerPacketAsync(cachedPacket.packet(getServerState()), compressed); - } else if (packet instanceof LazyPacket lazyPacket) { - writeServerPacketAsync(lazyPacket.packet(), compressed); - } else { - throw new RuntimeException("Unknown packet type: " + packet.getClass().getName()); - } - } - - private ServerPacket translateServerPacket(ServerPacket serverPacket) { + private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean async) { final Player player = getPlayer(); if (player != null) { if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) { - return ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component -> + serverPacket = ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component -> MinestomAdventure.COMPONENT_TRANSLATOR.apply(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale))); } } - return serverPacket; - } - - private void writeServerPacketSync(ServerPacket serverPacket, boolean compressed) { - serverPacket = translateServerPacket(serverPacket); try (var hold = ObjectPool.PACKET_POOL.hold()) { var state = getServerState(); var buffer = PacketUtils.createFramedPacket(state, hold.get(), serverPacket, compressed); - writeBufferSync(buffer, 0, buffer.limit()); + writeBuffer(buffer, 0, buffer.limit(), async); // If this packet has a state change, apply it. var nextState = serverPacket.nextState(); @@ -427,16 +389,7 @@ private void writeServerPacketSync(ServerPacket serverPacket, boolean compressed } } - private void writeServerPacketAsync(ServerPacket serverPacket, boolean compressed) { - serverPacket = translateServerPacket(serverPacket); - try (var hold = ObjectPool.PACKET_POOL.hold()) { - var state = getServerState(); - var buffer = PacketUtils.createFramedPacket(state, hold.get(), serverPacket, compressed); - writeBufferAsync(buffer, 0, buffer.limit()); - } - } - - private void writeBufferSync(@NotNull ByteBuffer buffer, int index, int length) { + private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean async) { // Encrypt data final EncryptionContext encryptionContext = this.encryptionContext; if (encryptionContext != null) { // Encryption support @@ -444,46 +397,22 @@ private void writeBufferSync(@NotNull ByteBuffer buffer, int index, int length) ByteBuffer output = hold.get(); try { length = encryptionContext.encrypt().update(buffer.slice(index, length), output); - writeBufferSync0(output, 0, length); + writeBuffer0(output, 0, length, async); } catch (ShortBufferException e) { MinecraftServer.getExceptionManager().handleException(e); } return; } } - writeBufferSync0(buffer, index, length); + writeBuffer0(buffer, index, length, async); } - private void writeBufferAsync(@NotNull ByteBuffer buffer, int index, int length) { - final SocketChannel channel = this.channel; - if (!channel.isConnected()) return; - // Encrypt data - final EncryptionContext encryptionContext = this.encryptionContext; - if (encryptionContext != null) { // Encryption support - try (var hold = ObjectPool.PACKET_POOL.hold()) { - ByteBuffer output = hold.get(); - try { - length = encryptionContext.encrypt().update(buffer.slice(index, length), output); - try { - synchronized (channelWriteMonitor) { - channel.write(output.slice(0, length)); - } - } catch (IOException ignore) { - } - //Let catch this exceptions in flushSync. I'm not sure that disabling the client here would be correct. - } catch (ShortBufferException e) { - MinecraftServer.getExceptionManager().handleException(e); - } - return; - } - } - try { - synchronized (channelWriteMonitor) { - channel.write(buffer.slice(index, length)); - } - } catch (IOException ignore) { + private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean async) { + if (async) { + writeBufferAsync0(buffer, index, length); + } else { + writeBufferSync0(buffer, index, length); } - //Let catch this exceptions in flushSync. I'm not sure that disabling the client here would be correct. } private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length) { @@ -505,6 +434,17 @@ private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length) } } + private void writeBufferAsync0(@NotNull ByteBuffer buffer, int index, int length) { + final SocketChannel channel = this.channel; + if (!channel.isConnected()) return; + try { + synchronized (channel) { + channel.write(buffer.slice(index, length)); + } + } catch (IOException ignore) { + } + } + public void flushSync() throws IOException { final SocketChannel channel = this.channel; final List waitingBuffers = this.waitingBuffers; @@ -513,7 +453,7 @@ public void flushSync() throws IOException { BinaryBuffer localBuffer = tickBuffer.getPlain(); if (localBuffer == null) return; // Socket is closed - synchronized (channelWriteMonitor) { + synchronized (channel) { localBuffer.writeChannel(channel); } } else { @@ -521,7 +461,7 @@ public void flushSync() throws IOException { Iterator iterator = waitingBuffers.iterator(); while (iterator.hasNext()) { BinaryBuffer waitingBuffer = iterator.next(); - synchronized (channelWriteMonitor) { + synchronized (channel) { if (!waitingBuffer.writeChannel(channel)) break; } iterator.remove(); diff --git a/src/test/java/net/minestom/server/instance/EntityTrackerIntegrationTest.java b/src/test/java/net/minestom/server/instance/EntityTrackerIntegrationTest.java index 3d9278089c5..23c78ab95cf 100644 --- a/src/test/java/net/minestom/server/instance/EntityTrackerIntegrationTest.java +++ b/src/test/java/net/minestom/server/instance/EntityTrackerIntegrationTest.java @@ -136,11 +136,6 @@ public void sendPacket(@NotNull SendablePacket packet) { // nothing } - @Override - public void sendPacketAsync(@NotNull SendablePacket packet) { - // also nothing - } - @Override public @NotNull SocketAddress getRemoteAddress() { return null; diff --git a/testing/src/main/java/net/minestom/testing/TestConnection.java b/testing/src/main/java/net/minestom/testing/TestConnection.java index 6beac8e9b7a..c25b74be0e6 100644 --- a/testing/src/main/java/net/minestom/testing/TestConnection.java +++ b/testing/src/main/java/net/minestom/testing/TestConnection.java @@ -13,10 +13,6 @@ public interface TestConnection { @NotNull Collector trackIncoming(@NotNull Class type); - @NotNull Collector trackIncomingAsync(@NotNull Class type); - - @NotNull Collector trackIncomingSync(@NotNull Class type); - default @NotNull Collector trackIncoming() { return trackIncoming(ServerPacket.class); } diff --git a/testing/src/main/java/net/minestom/testing/TestConnectionImpl.java b/testing/src/main/java/net/minestom/testing/TestConnectionImpl.java index f0133675e49..6fe1846f8b2 100644 --- a/testing/src/main/java/net/minestom/testing/TestConnectionImpl.java +++ b/testing/src/main/java/net/minestom/testing/TestConnectionImpl.java @@ -25,9 +25,7 @@ final class TestConnectionImpl implements TestConnection { private final ServerProcess process; private final PlayerConnectionImpl playerConnection = new PlayerConnectionImpl(); - private final List> incomingTrackersSync = new CopyOnWriteArrayList<>(); - - private final List> incomingTrackersAsync = new CopyOnWriteArrayList<>(); + private final List> incomingTrackers = new CopyOnWriteArrayList<>(); TestConnectionImpl(Env env) { this.env = env; @@ -51,23 +49,8 @@ final class TestConnectionImpl implements TestConnection { @Override public @NotNull Collector trackIncoming(@NotNull Class type) { - var tracker = new IncomingCollector<>(type, incomingTrackersSync, incomingTrackersAsync); - this.incomingTrackersSync.add(IncomingCollector.class.cast(tracker)); - this.incomingTrackersAsync.add(IncomingCollector.class.cast(tracker)); - return tracker; - } - - @Override - public @NotNull Collector trackIncomingSync(@NotNull Class type) { - var tracker = new IncomingCollector<>(type, incomingTrackersSync); - this.incomingTrackersSync.add(IncomingCollector.class.cast(tracker)); - return tracker; - } - - @Override - public @NotNull Collector trackIncomingAsync(@NotNull Class type) { - var tracker = new IncomingCollector<>(type, incomingTrackersAsync); - this.incomingTrackersAsync.add(IncomingCollector.class.cast(tracker)); + var tracker = new IncomingCollector<>(type); + this.incomingTrackers.add(IncomingCollector.class.cast(tracker)); return tracker; } @@ -75,15 +58,7 @@ final class PlayerConnectionImpl extends PlayerConnection { @Override public void sendPacket(@NotNull SendablePacket packet) { final var serverPacket = this.extractPacket(packet); - for (var tracker : incomingTrackersSync) { - if (tracker.type.isAssignableFrom(serverPacket.getClass())) tracker.packets.add(serverPacket); - } - } - - @Override - public void sendPacketAsync(@NotNull SendablePacket packet) { - final var serverPacket = this.extractPacket(packet); - for (var tracker : incomingTrackersAsync) { + for (var tracker : incomingTrackers) { if (tracker.type.isAssignableFrom(serverPacket.getClass())) tracker.packets.add(serverPacket); } } @@ -113,21 +88,17 @@ public void disconnect() { } } - static final class IncomingCollector implements Collector { + final class IncomingCollector implements Collector { private final Class type; private final List packets = new CopyOnWriteArrayList<>(); - private final List>[] incomingTrackers; - @SafeVarargs - public IncomingCollector(Class type, List>... incomingTrackers) { - this.incomingTrackers = incomingTrackers; + public IncomingCollector(Class type) { this.type = type; } @Override public @NotNull List collect() { - for (List> incomingTracker : incomingTrackers) - incomingTracker.remove(this); + incomingTrackers.remove(this); return List.copyOf(packets); } }