Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
rename async to immediate and replace synchronized with ReentrantLock
Browse files Browse the repository at this point in the history
  • Loading branch information
MelonHell committed Dec 16, 2023
1 parent 1f24605 commit 1cb15a6
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 33 deletions.
12 changes: 6 additions & 6 deletions src/main/java/net/minestom/server/entity/Player.java
Original file line number Diff line number Diff line change
Expand Up @@ -1398,18 +1398,18 @@ public void sendPackets(@NotNull Collection<SendablePacket> packets) {
}

@ApiStatus.Experimental
public void sendPacketAsync(@NotNull SendablePacket packet) {
this.playerConnection.sendPacketAsync(packet);
public void sendPacketImmediate(@NotNull SendablePacket packet) {
this.playerConnection.sendPacketImmediate(packet);
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull SendablePacket... packets) {
this.playerConnection.sendPacketsAsync(packets);
public void sendPacketsImmediate(@NotNull SendablePacket... packets) {
this.playerConnection.sendPacketsImmediate(packets);
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull Collection<SendablePacket> packets) {
this.playerConnection.sendPacketsAsync(packets);
public void sendPacketsImmediate(@NotNull Collection<SendablePacket> packets) {
this.playerConnection.sendPacketsImmediate(packets);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void sendPacket(@NotNull SendablePacket packet) {
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
public void sendPacketImmediate(@NotNull SendablePacket packet) {
sendPacket(packet);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public PlayerConnection() {
* @param packet the packet to send
*/
@ApiStatus.Experimental
public abstract void sendPacketAsync(@NotNull SendablePacket packet);
public abstract void sendPacketImmediate(@NotNull SendablePacket packet);

@ApiStatus.Experimental
public void sendPackets(@NotNull Collection<SendablePacket> packets) {
Expand All @@ -70,13 +70,13 @@ public void sendPackets(@NotNull SendablePacket... packets) {
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull Collection<SendablePacket> packets) {
packets.forEach(this::sendPacketAsync);
public void sendPacketsImmediate(@NotNull Collection<SendablePacket> packets) {
packets.forEach(this::sendPacketImmediate);
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull SendablePacket... packets) {
sendPacketsAsync(List.of(packets));
public void sendPacketsImmediate(@NotNull SendablePacket... packets) {
sendPacketsImmediate(List.of(packets));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;

/**
Expand Down Expand Up @@ -73,6 +74,7 @@ public class PlayerSocketConnection extends PlayerConnection {
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(POOL.get());
private BinaryBuffer cacheBuffer;
private ReentrantLock writeLock = new ReentrantLock();

private final ListenerHandle<PlayerPacketOutEvent> outgoing = EventDispatcher.getHandle(PlayerPacketOutEvent.class);

Expand Down Expand Up @@ -161,7 +163,7 @@ public void sendPacket(@NotNull SendablePacket packet) {
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
public void sendPacketImmediate(@NotNull SendablePacket packet) {
writePacket(packet, this.compressed, true);
}

Expand Down Expand Up @@ -338,7 +340,7 @@ public void setNonce(byte[] nonce) {
this.nonce = nonce;
}

private void writePacket(SendablePacket packet, boolean compressed, boolean async) {
private void writePacket(SendablePacket packet, boolean compressed, boolean immediate) {
if (!channel.isConnected()) return;
final Player player = getPlayer();
// Outgoing event
Expand All @@ -353,22 +355,22 @@ private void writePacket(SendablePacket packet, boolean compressed, boolean asyn
// so a state change inside one of them will never actually be triggered. Currently, cached
// packets are never used for packets that change state, so this is not a problem.
if (packet instanceof ServerPacket serverPacket) {
writeServerPacket(serverPacket, compressed, async);
writeServerPacket(serverPacket, compressed, immediate);
} else if (packet instanceof FramedPacket framedPacket) {
var buffer = framedPacket.body();
writeBuffer(buffer, 0, buffer.limit(), async);
writeBuffer(buffer, 0, buffer.limit(), immediate);
} else if (packet instanceof CachedPacket cachedPacket) {
var buffer = cachedPacket.body(getServerState());
if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), async);
else writeServerPacket(cachedPacket.packet(getServerState()), compressed, async);
if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), immediate);
else writeServerPacket(cachedPacket.packet(getServerState()), compressed, immediate);
} else if (packet instanceof LazyPacket lazyPacket) {
writeServerPacket(lazyPacket.packet(), compressed, async);
writeServerPacket(lazyPacket.packet(), compressed, immediate);
} else {
throw new RuntimeException("Unknown packet type: " + packet.getClass().getName());
}
}

private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean async) {
private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean immediate) {
final Player player = getPlayer();
if (player != null) {
if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) {
Expand All @@ -379,7 +381,7 @@ private void writeServerPacket(ServerPacket serverPacket, boolean compressed, bo
try (var hold = ObjectPool.PACKET_POOL.hold()) {
var state = getServerState();
var buffer = PacketUtils.createFramedPacket(state, hold.get(), serverPacket, compressed);
writeBuffer(buffer, 0, buffer.limit(), async);
writeBuffer(buffer, 0, buffer.limit(), immediate);

// If this packet has a state change, apply it.
var nextState = serverPacket.nextState();
Expand All @@ -389,27 +391,27 @@ private void writeServerPacket(ServerPacket serverPacket, boolean compressed, bo
}
}

private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean async) {
private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean immediate) {
// Encrypt data
final EncryptionContext encryptionContext = this.encryptionContext;
if (encryptionContext != null) { // Encryption support
try (var hold = ObjectPool.PACKET_POOL.hold()) {
ByteBuffer output = hold.get();
try {
length = encryptionContext.encrypt().update(buffer.slice(index, length), output);
writeBuffer0(output, 0, length, async);
writeBuffer0(output, 0, length, immediate);
} catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
return;
}
}
writeBuffer0(buffer, index, length, async);
writeBuffer0(buffer, index, length, immediate);
}

private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean async) {
if (async) {
writeBufferAsync0(buffer, index, length);
private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean immediate) {
if (immediate) {
writeBufferImmediate0(buffer, index, length);
} else {
writeBufferSync0(buffer, index, length);
}
Expand All @@ -434,14 +436,15 @@ private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length)
}
}

private void writeBufferAsync0(@NotNull ByteBuffer buffer, int index, int length) {
private void writeBufferImmediate0(@NotNull ByteBuffer buffer, int index, int length) {
final SocketChannel channel = this.channel;
if (!channel.isConnected()) return;
writeLock.lock();
try {
synchronized (channel) {
channel.write(buffer.slice(index, length));
}
channel.write(buffer.slice(index, length));
} catch (IOException ignore) {
} finally {
writeLock.unlock();
}
}

Expand All @@ -453,16 +456,22 @@ public void flushSync() throws IOException {
BinaryBuffer localBuffer = tickBuffer.getPlain();
if (localBuffer == null)
return; // Socket is closed
synchronized (channel) {
writeLock.lock();
try {
localBuffer.writeChannel(channel);
} finally {
writeLock.unlock();
}
} else {
// Write as much as possible from the waiting list
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next();
synchronized (channel) {
writeLock.lock();
try {
if (!waitingBuffer.writeChannel(channel)) break;
} finally {
writeLock.unlock();
}
iterator.remove();
POOL.add(waitingBuffer);
Expand Down

0 comments on commit 1cb15a6

Please sign in to comment.