Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
little refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
MelonHell committed Dec 19, 2023
1 parent 47cf4ae commit d037100
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ public void sendPacket(@NotNull SendablePacket packet) {
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) { // I do not know what else can be done with the packages here
FakePlayerController controller = getFakePlayer().getController();
final ServerPacket serverPacket = SendablePacket.extractServerPacket(getServerState(), packet);
controller.consumePacket(serverPacket);
public void sendPacketAsync(@NotNull SendablePacket packet) {
sendPacket(packet);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class PlayerSocketConnection extends PlayerConnection {
private final Worker worker;
private final MessagePassingQueue<Runnable> workerQueue;
private final SocketChannel channel;
private final Object channelWriteMonitor = new Object();
private SocketAddress remoteAddress;

private volatile boolean compressed = false;
Expand Down Expand Up @@ -158,27 +157,26 @@ public void startCompression() {
@Override
public void sendPacket(@NotNull SendablePacket packet) {
final boolean compressed = this.compressed;
this.workerQueue.relaxedOffer(() -> writePacketSync(packet, compressed));
this.workerQueue.relaxedOffer(() -> writePacket(packet, compressed, false));
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
writePacket(packet, this.compressed, true);
}

@Override
public void sendPackets(@NotNull Collection<SendablePacket> packets) {
final List<SendablePacket> packetsCopy = List.copyOf(packets);
final boolean compressed = this.compressed;
this.workerQueue.relaxedOffer(() -> {
for (SendablePacket packet : packetsCopy) writePacketSync(packet, compressed);
for (SendablePacket packet : packetsCopy) writePacket(packet, compressed, false);
});
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
final boolean compressed = this.compressed;
writePacketAsync(packet, compressed);
}

@ApiStatus.Internal
public void write(@NotNull ByteBuffer buffer, int index, int length) {
this.workerQueue.relaxedOffer(() -> writeBufferSync(buffer, index, length));
this.workerQueue.relaxedOffer(() -> writeBuffer(buffer, index, length, false));
}

@ApiStatus.Internal
Expand Down Expand Up @@ -207,9 +205,7 @@ public void setRemoteAddress(@NotNull SocketAddress remoteAddress) {
public void disconnect() {
super.disconnect();
this.workerQueue.relaxedOffer(() -> {
synchronized (channelWriteMonitor) {
this.worker.disconnect(this, channel);
}
this.worker.disconnect(this, channel);
final BinaryBuffer tick = tickBuffer.getAndSet(null);
if (tick != null) POOL.add(tick);
for (BinaryBuffer buffer : waitingBuffers) POOL.add(buffer);
Expand Down Expand Up @@ -342,82 +338,48 @@ public void setNonce(byte[] nonce) {
this.nonce = nonce;
}

/**
* Send Server Packet Event.
*
* @return {@link PlayerPacketOutEvent::isCancelled} state
**/
private boolean sendEvent(SendablePacket packet) {
private void writePacket(SendablePacket packet, boolean compressed, boolean async) {
if (!channel.isConnected()) return;
final Player player = getPlayer();
// Outgoing event
if (player != null && outgoing.hasListener()) {
final ServerPacket serverPacket = SendablePacket.extractServerPacket(getServerState(), packet);
PlayerPacketOutEvent event = new PlayerPacketOutEvent(player, serverPacket);
outgoing.call(event);
return event.isCancelled();
if (event.isCancelled()) return;
}
return false;
}

private void writePacketSync(SendablePacket packet, boolean compressed) {
if (!channel.isConnected()) return;
if (sendEvent(packet)) return;
// Write packet
// WARNING: A cached or framed packet will currently never go through writeServerPacketSync,
// so a state change inside one of them will never actually be triggered. Currently, cached
// packets are never used for packets that change state, so this is not a problem.
if (packet instanceof ServerPacket serverPacket) {
writeServerPacketSync(serverPacket, compressed);
writeServerPacket(serverPacket, compressed, async);
} else if (packet instanceof FramedPacket framedPacket) {
var buffer = framedPacket.body();
writeBufferSync(buffer, 0, buffer.limit());
writeBuffer(buffer, 0, buffer.limit(), async);
} else if (packet instanceof CachedPacket cachedPacket) {
var buffer = cachedPacket.body(getServerState());
if (buffer != null) writeBufferSync(buffer, buffer.position(), buffer.remaining());
else writeServerPacketSync(cachedPacket.packet(getServerState()), compressed);
if (buffer != null) writeBuffer(buffer, buffer.position(), buffer.remaining(), async);
else writeServerPacket(cachedPacket.packet(getServerState()), compressed, async);
} else if (packet instanceof LazyPacket lazyPacket) {
writeServerPacketSync(lazyPacket.packet(), compressed);
writeServerPacket(lazyPacket.packet(), compressed, async);
} else {
throw new RuntimeException("Unknown packet type: " + packet.getClass().getName());
}
}

private void writePacketAsync(SendablePacket packet, boolean compressed) {
if (!channel.isConnected()) return;
if (sendEvent(packet)) return;
// Write packet async
if (packet instanceof ServerPacket serverPacket) {
writeServerPacketAsync(serverPacket, compressed);
} else if (packet instanceof FramedPacket framedPacket) {
var buffer = framedPacket.body();
writeBufferAsync(buffer, 0, buffer.limit());
} else if (packet instanceof CachedPacket cachedPacket) {
var buffer = cachedPacket.body(getServerState());
if (buffer != null) writeBufferAsync(buffer, buffer.position(), buffer.remaining());
else writeServerPacketAsync(cachedPacket.packet(getServerState()), compressed);
} else if (packet instanceof LazyPacket lazyPacket) {
writeServerPacketAsync(lazyPacket.packet(), compressed);
} else {
throw new RuntimeException("Unknown packet type: " + packet.getClass().getName());
}
}

private ServerPacket translateServerPacket(ServerPacket serverPacket) {
private void writeServerPacket(ServerPacket serverPacket, boolean compressed, boolean async) {
final Player player = getPlayer();
if (player != null) {
if (MinestomAdventure.AUTOMATIC_COMPONENT_TRANSLATION && serverPacket instanceof ComponentHoldingServerPacket) {
return ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component ->
serverPacket = ((ComponentHoldingServerPacket) serverPacket).copyWithOperator(component ->
MinestomAdventure.COMPONENT_TRANSLATOR.apply(component, Objects.requireNonNullElseGet(player.getLocale(), MinestomAdventure::getDefaultLocale)));
}
}
return serverPacket;
}

private void writeServerPacketSync(ServerPacket serverPacket, boolean compressed) {
serverPacket = translateServerPacket(serverPacket);
try (var hold = ObjectPool.PACKET_POOL.hold()) {
var state = getServerState();
var buffer = PacketUtils.createFramedPacket(state, hold.get(), serverPacket, compressed);
writeBufferSync(buffer, 0, buffer.limit());
writeBuffer(buffer, 0, buffer.limit(), async);

// If this packet has a state change, apply it.
var nextState = serverPacket.nextState();
Expand All @@ -427,63 +389,30 @@ private void writeServerPacketSync(ServerPacket serverPacket, boolean compressed
}
}

private void writeServerPacketAsync(ServerPacket serverPacket, boolean compressed) {
serverPacket = translateServerPacket(serverPacket);
try (var hold = ObjectPool.PACKET_POOL.hold()) {
var state = getServerState();
var buffer = PacketUtils.createFramedPacket(state, hold.get(), serverPacket, compressed);
writeBufferAsync(buffer, 0, buffer.limit());
}
}

private void writeBufferSync(@NotNull ByteBuffer buffer, int index, int length) {
private void writeBuffer(@NotNull ByteBuffer buffer, int index, int length, boolean async) {
// Encrypt data
final EncryptionContext encryptionContext = this.encryptionContext;
if (encryptionContext != null) { // Encryption support
try (var hold = ObjectPool.PACKET_POOL.hold()) {
ByteBuffer output = hold.get();
try {
length = encryptionContext.encrypt().update(buffer.slice(index, length), output);
writeBufferSync0(output, 0, length);
writeBuffer0(output, 0, length, async);
} catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
return;
}
}
writeBufferSync0(buffer, index, length);
writeBuffer0(buffer, index, length, async);
}

private void writeBufferAsync(@NotNull ByteBuffer buffer, int index, int length) {
final SocketChannel channel = this.channel;
if (!channel.isConnected()) return;
// Encrypt data
final EncryptionContext encryptionContext = this.encryptionContext;
if (encryptionContext != null) { // Encryption support
try (var hold = ObjectPool.PACKET_POOL.hold()) {
ByteBuffer output = hold.get();
try {
length = encryptionContext.encrypt().update(buffer.slice(index, length), output);
try {
synchronized (channelWriteMonitor) {
channel.write(output.slice(0, length));
}
} catch (IOException ignore) {
}
//Let catch this exceptions in flushSync. I'm not sure that disabling the client here would be correct.
} catch (ShortBufferException e) {
MinecraftServer.getExceptionManager().handleException(e);
}
return;
}
}
try {
synchronized (channelWriteMonitor) {
channel.write(buffer.slice(index, length));
}
} catch (IOException ignore) {
private void writeBuffer0(@NotNull ByteBuffer buffer, int index, int length, boolean async) {
if (async) {
writeBufferAsync0(buffer, index, length);
} else {
writeBufferSync0(buffer, index, length);
}
//Let catch this exceptions in flushSync. I'm not sure that disabling the client here would be correct.
}

private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length) {
Expand All @@ -505,6 +434,17 @@ private void writeBufferSync0(@NotNull ByteBuffer buffer, int index, int length)
}
}

private void writeBufferAsync0(@NotNull ByteBuffer buffer, int index, int length) {
final SocketChannel channel = this.channel;
if (!channel.isConnected()) return;
try {
synchronized (channel) {
channel.write(buffer.slice(index, length));
}
} catch (IOException ignore) {
}
}

public void flushSync() throws IOException {
final SocketChannel channel = this.channel;
final List<BinaryBuffer> waitingBuffers = this.waitingBuffers;
Expand All @@ -513,15 +453,15 @@ public void flushSync() throws IOException {
BinaryBuffer localBuffer = tickBuffer.getPlain();
if (localBuffer == null)
return; // Socket is closed
synchronized (channelWriteMonitor) {
synchronized (channel) {
localBuffer.writeChannel(channel);
}
} else {
// Write as much as possible from the waiting list
Iterator<BinaryBuffer> iterator = waitingBuffers.iterator();
while (iterator.hasNext()) {
BinaryBuffer waitingBuffer = iterator.next();
synchronized (channelWriteMonitor) {
synchronized (channel) {
if (!waitingBuffer.writeChannel(channel)) break;
}
iterator.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ public void sendPacket(@NotNull SendablePacket packet) {
// nothing
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
// also nothing
}

@Override
public @NotNull SocketAddress getRemoteAddress() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ public interface TestConnection {

<T extends ServerPacket> @NotNull Collector<T> trackIncoming(@NotNull Class<T> type);

<T extends ServerPacket> @NotNull Collector<T> trackIncomingAsync(@NotNull Class<T> type);

<T extends ServerPacket> @NotNull Collector<T> trackIncomingSync(@NotNull Class<T> type);

default @NotNull Collector<ServerPacket> trackIncoming() {
return trackIncoming(ServerPacket.class);
}
Expand Down
43 changes: 7 additions & 36 deletions testing/src/main/java/net/minestom/testing/TestConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ final class TestConnectionImpl implements TestConnection {
private final ServerProcess process;
private final PlayerConnectionImpl playerConnection = new PlayerConnectionImpl();

private final List<IncomingCollector<ServerPacket>> incomingTrackersSync = new CopyOnWriteArrayList<>();

private final List<IncomingCollector<ServerPacket>> incomingTrackersAsync = new CopyOnWriteArrayList<>();
private final List<IncomingCollector<ServerPacket>> incomingTrackers = new CopyOnWriteArrayList<>();

TestConnectionImpl(Env env) {
this.env = env;
Expand All @@ -51,39 +49,16 @@ final class TestConnectionImpl implements TestConnection {

@Override
public @NotNull <T extends ServerPacket> Collector<T> trackIncoming(@NotNull Class<T> type) {
var tracker = new IncomingCollector<>(type, incomingTrackersSync, incomingTrackersAsync);
this.incomingTrackersSync.add(IncomingCollector.class.cast(tracker));
this.incomingTrackersAsync.add(IncomingCollector.class.cast(tracker));
return tracker;
}

@Override
public @NotNull <T extends ServerPacket> Collector<T> trackIncomingSync(@NotNull Class<T> type) {
var tracker = new IncomingCollector<>(type, incomingTrackersSync);
this.incomingTrackersSync.add(IncomingCollector.class.cast(tracker));
return tracker;
}

@Override
public @NotNull <T extends ServerPacket> Collector<T> trackIncomingAsync(@NotNull Class<T> type) {
var tracker = new IncomingCollector<>(type, incomingTrackersAsync);
this.incomingTrackersAsync.add(IncomingCollector.class.cast(tracker));
var tracker = new IncomingCollector<>(type);
this.incomingTrackers.add(IncomingCollector.class.cast(tracker));
return tracker;
}

final class PlayerConnectionImpl extends PlayerConnection {
@Override
public void sendPacket(@NotNull SendablePacket packet) {
final var serverPacket = this.extractPacket(packet);
for (var tracker : incomingTrackersSync) {
if (tracker.type.isAssignableFrom(serverPacket.getClass())) tracker.packets.add(serverPacket);
}
}

@Override
public void sendPacketAsync(@NotNull SendablePacket packet) {
final var serverPacket = this.extractPacket(packet);
for (var tracker : incomingTrackersAsync) {
for (var tracker : incomingTrackers) {
if (tracker.type.isAssignableFrom(serverPacket.getClass())) tracker.packets.add(serverPacket);
}
}
Expand Down Expand Up @@ -113,21 +88,17 @@ public void disconnect() {
}
}

static final class IncomingCollector<T extends ServerPacket> implements Collector<T> {
final class IncomingCollector<T extends ServerPacket> implements Collector<T> {
private final Class<T> type;
private final List<T> packets = new CopyOnWriteArrayList<>();
private final List<IncomingCollector<ServerPacket>>[] incomingTrackers;

@SafeVarargs
public IncomingCollector(Class<T> type, List<IncomingCollector<ServerPacket>>... incomingTrackers) {
this.incomingTrackers = incomingTrackers;
public IncomingCollector(Class<T> type) {
this.type = type;
}

@Override
public @NotNull List<T> collect() {
for (List<IncomingCollector<ServerPacket>> incomingTracker : incomingTrackers)
incomingTracker.remove(this);
incomingTrackers.remove(this);
return List.copyOf(packets);
}
}
Expand Down

0 comments on commit d037100

Please sign in to comment.