From a5b553012e997e7c2afbc7855eca1f39e34288c4 Mon Sep 17 00:00:00 2001 From: MelonHell Date: Sat, 16 Dec 2023 14:50:33 +0300 Subject: [PATCH] rename async to immediate and replace synchronized with ReentrantLock --- .../net/minestom/server/entity/Player.java | 12 ++--- .../network/player/FakePlayerConnection.java | 2 +- .../network/player/PlayerConnection.java | 10 ++-- .../player/PlayerSocketConnection.java | 51 +++++++++++-------- 4 files changed, 42 insertions(+), 33 deletions(-) diff --git a/src/main/java/net/minestom/server/entity/Player.java b/src/main/java/net/minestom/server/entity/Player.java index 8e0fcab8a93..ed69cb62386 100644 --- a/src/main/java/net/minestom/server/entity/Player.java +++ b/src/main/java/net/minestom/server/entity/Player.java @@ -1398,18 +1398,18 @@ public void sendPackets(@NotNull Collection packets) { } @ApiStatus.Experimental - public void sendPacketAsync(@NotNull SendablePacket packet) { - this.playerConnection.sendPacketAsync(packet); + public void sendPacketImmediate(@NotNull SendablePacket packet) { + this.playerConnection.sendPacketImmediate(packet); } @ApiStatus.Experimental - public void sendPacketsAsync(@NotNull SendablePacket... packets) { - this.playerConnection.sendPacketsAsync(packets); + public void sendPacketsImmediate(@NotNull SendablePacket... packets) { + this.playerConnection.sendPacketsImmediate(packets); } @ApiStatus.Experimental - public void sendPacketsAsync(@NotNull Collection packets) { - this.playerConnection.sendPacketsAsync(packets); + public void sendPacketsImmediate(@NotNull Collection packets) { + this.playerConnection.sendPacketsImmediate(packets); } /** diff --git a/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java b/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java index 7586e0b34bb..a0566f892db 100644 --- a/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java +++ b/src/main/java/net/minestom/server/network/player/FakePlayerConnection.java @@ -21,7 +21,7 @@ public void sendPacket(@NotNull SendablePacket packet) { } @Override - public void sendPacketAsync(@NotNull SendablePacket packet) { + public void sendPacketImmediate(@NotNull SendablePacket packet) { sendPacket(packet); } diff --git a/src/main/java/net/minestom/server/network/player/PlayerConnection.java b/src/main/java/net/minestom/server/network/player/PlayerConnection.java index 5d2a24c1db0..d474b821fae 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerConnection.java @@ -57,7 +57,7 @@ public PlayerConnection() { * @param packet the packet to send */ @ApiStatus.Experimental - public abstract void sendPacketAsync(@NotNull SendablePacket packet); + public abstract void sendPacketImmediate(@NotNull SendablePacket packet); @ApiStatus.Experimental public void sendPackets(@NotNull Collection packets) { @@ -70,13 +70,13 @@ public void sendPackets(@NotNull SendablePacket... packets) { } @ApiStatus.Experimental - public void sendPacketsAsync(@NotNull Collection packets) { - packets.forEach(this::sendPacketAsync); + public void sendPacketsImmediate(@NotNull Collection packets) { + packets.forEach(this::sendPacketImmediate); } @ApiStatus.Experimental - public void sendPacketsAsync(@NotNull SendablePacket... packets) { - sendPacketsAsync(List.of(packets)); + public void sendPacketsImmediate(@NotNull SendablePacket... packets) { + sendPacketsImmediate(List.of(packets)); } /** diff --git a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java index 5e495782d88..4e76e3fcc37 100644 --- a/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java +++ b/src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.zip.DataFormatException; /** @@ -73,6 +74,7 @@ public class PlayerSocketConnection extends PlayerConnection { private final List waitingBuffers = new ArrayList<>(); private final AtomicReference tickBuffer = new AtomicReference<>(POOL.get()); private BinaryBuffer cacheBuffer; + private ReentrantLock writeLock = new ReentrantLock(); private final ListenerHandle outgoing = EventDispatcher.getHandle(PlayerPacketOutEvent.class); @@ -161,7 +163,7 @@ public void sendPacket(@NotNull SendablePacket packet) { } @Override - public void sendPacketAsync(@NotNull SendablePacket packet) { + public void sendPacketImmediate(@NotNull SendablePacket packet) { writePacket(packet, this.compressed, true); } @@ -338,7 +340,7 @@ public void setNonce(byte[] nonce) { this.nonce = nonce; } - private void writePacket(SendablePacket packet, boolean compressed, boolean async) { + private void writePacket(SendablePacket packet, boolean compressed, boolean immediate) { if (!channel.isConnected()) return; final Player player = getPlayer(); // Outgoing event @@ -353,22 +355,22 @@ private void writePacket(SendablePacket packet, boolean compressed, boolean asyn // so a state change inside one of them will never actually be triggered. Currently, cached // packets are never used for packets that change state, so this is not a problem. if (packet instanceof ServerPacket serverPacket) { - writeServerPacket(serverPacket, compressed, async); + writeServerPacket(serverPacket, compressed, immediate); } else if (packet instanceof FramedPacket framedPacket) { var buffer = framedPacket.body(); - writeBuffer(buffer, 0, buffer.limit(), async); + writeBuffer(buffer, 0, buffer.limit(), immediate); } else if (packet instanceof CachedPacket cachedPacket) { var buffer = cachedPacket.body(getServerState()); - if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), async); - else writeServerPacket(cachedPacket.packet(getServerState()), compressed, async); + if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), immediate); + else writeServerPacket(cachedPacket.packet(getServerState()), compressed, immediate); } else if (packet instanceof LazyPacket lazyPacket) { - writeServerPacket(lazyPacket.packet(), compressed, async); + writeServerPacket(lazyPacket.packet(), compressed, immediate); } else { throw new RuntimeException("Unknown packet type: " + packet.getClass().getName()); } } - private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean async) { + private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean immediate) { final Player player = getPlayer(); if (player != null) { if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) { @@ -379,7 +381,7 @@ private void writeServerPacket(ServerPacket serverPacket, boolean compressed, bo try (var hold = ObjectPool.PACKET_POOL.hold()) { var state = getServerState(); var buffer = PacketUtils.createFramedPacket(state, hold.get(), serverPacket, compressed); - writeBuffer(buffer, 0, buffer.limit(), async); + writeBuffer(buffer, 0, buffer.limit(), immediate); // If this packet has a state change, apply it. var nextState = serverPacket.nextState(); @@ -389,7 +391,7 @@ private void writeServerPacket(ServerPacket serverPacket, boolean compressed, bo } } - private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean async) { + private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean immediate) { // Encrypt data final EncryptionContext encryptionContext = this.encryptionContext; if (encryptionContext != null) { // Encryption support @@ -397,19 +399,19 @@ private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, bool ByteBuffer output = hold.get(); try { length = encryptionContext.encrypt().update(buffer.slice(index, length), output); - writeBuffer0(output, 0, length, async); + writeBuffer0(output, 0, length, immediate); } catch (ShortBufferException e) { MinecraftServer.getExceptionManager().handleException(e); } return; } } - writeBuffer0(buffer, index, length, async); + writeBuffer0(buffer, index, length, immediate); } - private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean async) { - if (async) { - writeBufferAsync0(buffer, index, length); + private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean immediate) { + if (immediate) { + writeBufferImmediate0(buffer, index, length); } else { writeBufferSync0(buffer, index, length); } @@ -434,14 +436,15 @@ private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length) } } - private void writeBufferAsync0(@NotNull ByteBuffer buffer, int index, int length) { + private void writeBufferImmediate0(@NotNull ByteBuffer buffer, int index, int length) { final SocketChannel channel = this.channel; if (!channel.isConnected()) return; + writeLock.lock(); try { - synchronized (channel) { - channel.write(buffer.slice(index, length)); - } + channel.write(buffer.slice(index, length)); } catch (IOException ignore) { + } finally { + writeLock.unlock(); } } @@ -453,16 +456,22 @@ public void flushSync() throws IOException { BinaryBuffer localBuffer = tickBuffer.getPlain(); if (localBuffer == null) return; // Socket is closed - synchronized (channel) { + writeLock.lock(); + try { localBuffer.writeChannel(channel); + } finally { + writeLock.unlock(); } } else { // Write as much as possible from the waiting list Iterator iterator = waitingBuffers.iterator(); while (iterator.hasNext()) { BinaryBuffer waitingBuffer = iterator.next(); - synchronized (channel) { + writeLock.lock(); + try { if (!waitingBuffer.writeChannel(channel)) break; + } finally { + writeLock.unlock(); } iterator.remove(); POOL.add(waitingBuffer);