Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
rename async to immediate and replace synchronized with ReentrantLock
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/net/minestom/server/network/player/PlayerSocketConnection.java
  • Loading branch information
MelonHell committed Jan 22, 2024
1 parent 1c791f3 commit f117182
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 33 deletions.
12 changes: 6 additions & 6 deletions src/main/java/net/minestom/server/entity/Player.java
Original file line number Diff line number Diff line change
Expand Up @@ -1492,18 +1492,18 @@ public void sendPackets(@NotNull Collection<SendablePacket> packets) {
}

@ApiStatus.Experimental
public void sendPacketAsync(@NotNull SendablePacket packet) {
this.playerConnection.sendPacketAsync(packet);
public void sendPacketImmediate(@NotNull SendablePacket packet) {
this.playerConnection.sendPacketImmediate(packet);
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull SendablePacket... packets) {
this.playerConnection.sendPacketsAsync(packets);
public void sendPacketsImmediate(@NotNull SendablePacket... packets) {
this.playerConnection.sendPacketsImmediate(packets);
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull Collection<SendablePacket> packets) {
this.playerConnection.sendPacketsAsync(packets);
public void sendPacketsImmediate(@NotNull Collection<SendablePacket> packets) {
this.playerConnection.sendPacketsImmediate(packets);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void sendPacket(@NotNull SendablePacket packet) {
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
public void sendPacketImmediate(@NotNull SendablePacket packet) {
sendPacket(packet);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public PlayerConnection() {
* @param packet the packet to send
*/
@ApiStatus.Experimental
public abstract void sendPacketAsync(@NotNull SendablePacket packet);
public abstract void sendPacketImmediate(@NotNull SendablePacket packet);

@ApiStatus.Experimental
public void sendPackets(@NotNull Collection<SendablePacket> packets) {
Expand All @@ -68,13 +68,13 @@ public void sendPackets(@NotNull SendablePacket... packets) {
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull Collection<SendablePacket> packets) {
packets.forEach(this::sendPacketAsync);
public void sendPacketsImmediate(@NotNull Collection<SendablePacket> packets) {
packets.forEach(this::sendPacketImmediate);
}

@ApiStatus.Experimental
public void sendPacketsAsync(@NotNull SendablePacket... packets) {
sendPacketsAsync(List.of(packets));
public void sendPacketsImmediate(@NotNull SendablePacket... packets) {
sendPacketsImmediate(List.of(packets));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;

/**
Expand Down Expand Up @@ -73,6 +74,7 @@ public class PlayerSocketConnection extends PlayerConnection {
private final List<BinaryBuffer> waitingBuffers = new ArrayList<>();
private final AtomicReference<BinaryBuffer> tickBuffer = new AtomicReference<>(POOL.get());
private BinaryBuffer cacheBuffer;
private ReentrantLock writeLock = new ReentrantLock();

private final ListenerHandle<PlayerPacketOutEvent> outgoing = EventDispatcher.getHandle(PlayerPacketOutEvent.class);

Expand Down Expand Up @@ -161,7 +163,7 @@ public void sendPacket(@NotNull SendablePacket packet) {
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
public void sendPacketImmediate(@NotNull SendablePacket packet) {
writePacket(packet, this.compressed, true);
}

Expand Down Expand Up @@ -338,7 +340,7 @@ public void setNonce(byte[] nonce) {
this.nonce = nonce;
}

private void writePacket(SendablePacket packet, boolean compressed, boolean async) {
private void writePacket(SendablePacket packet, boolean compressed, boolean immediate) {
if (!channel.isConnected()) return;
final Player player = getPlayer();
// Outgoing event
Expand All @@ -350,22 +352,22 @@ private void writePacket(SendablePacket packet, boolean compressed, boolean asyn
}
// Write packet
if (packet instanceof ServerPacket serverPacket) {
writeServerPacket(serverPacket, compressed, async);
writeServerPacket(serverPacket, compressed, immediate);
} else if (packet instanceof FramedPacket framedPacket) {
var buffer = framedPacket.body();
writeBuffer(buffer, 0, buffer.limit(), async);
writeBuffer(buffer, 0, buffer.limit(), immediate);
} else if (packet instanceof CachedPacket cachedPacket) {
var buffer = cachedPacket.body(getConnectionState());
if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), async);
else writeServerPacket(cachedPacket.packet(getConnectionState()), compressed, async);
if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), immediate);
else writeServerPacket(cachedPacket.packet(getConnectionState()), compressed, immediate);
} else if (packet instanceof LazyPacket lazyPacket) {
writeServerPacket(lazyPacket.packet(), compressed, async);
writeServerPacket(lazyPacket.packet(), compressed, immediate);
} else {
throw new RuntimeException("Unknown packet type: " + packet.getClass().getName());
}
}

private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean async) {
private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean immediate) {
final Player player = getPlayer();
if (player != null) {
if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) {
Expand All @@ -375,31 +377,31 @@ private void writeServerPacket(ServerPacket serverPacket, boolean compressed, bo
}
try (var hold = ObjectPool.PACKET_POOL.hold()) {
var buffer = PacketUtils.createFramedPacket(getConnectionState(), hold.get(), serverPacket, compressed);
writeBuffer(buffer, 0, buffer.limit(), async);
writeBuffer(buffer, 0, buffer.limit(), immediate);
}
}

private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean async) {
private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean immediate) {
// Encrypt data
final EncryptionContext encryptionContext = this.encryptionContext;
if (encryptionContext != null) { // Encryption support
try (var hold = ObjectPool.PACKET_POOL.hold()) {
ByteBuffer output = hold.get();
try {
length = encryptionContext.encrypt().update(buffer.slice(index, length), output);
writeBuffer0(output, 0, length, async);
writeBuffer0(output, 0, length, immediate);
} catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
return;
}
}
writeBuffer0(buffer, index, length, async);
writeBuffer0(buffer, index, length, immediate);
}

private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean async) {
if (async) {
writeBufferAsync0(buffer, index, length);
private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean immediate) {
if (immediate) {
writeBufferImmediate0(buffer, index, length);
} else {
writeBufferSync0(buffer, index, length);
}
Expand All @@ -424,14 +426,15 @@ private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length)
}
}

private void writeBufferAsync0(@NotNull ByteBuffer buffer, int index, int length) {
private void writeBufferImmediate0(@NotNull ByteBuffer buffer, int index, int length) {
final SocketChannel channel = this.channel;
if (!channel.isConnected()) return;
writeLock.lock();
try {
synchronized (channel) {
channel.write(buffer.slice(index, length));
}
channel.write(buffer.slice(index, length));
} catch (IOException ignore) {
} finally {
writeLock.unlock();
}
}

Expand All @@ -443,16 +446,22 @@ public void flushSync() throws IOException {
BinaryBuffer localBuffer = tickBuffer.getPlain();
if (localBuffer == null)
return; // Socket is closed
synchronized (channel) {
writeLock.lock();
try {
localBuffer.writeChannel(channel);
} finally {
writeLock.unlock();
}
} else {
// Write as much as possible from the waiting list
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next();
synchronized (channel) {
writeLock.lock();
try {
if (!waitingBuffer.writeChannel(channel)) break;
} finally {
writeLock.unlock();
}
iterator.remove();
POOL.add(waitingBuffer);
Expand Down

0 comments on commit f117182

Please sign in to comment.