Skip to content

Commit

Permalink
Override quit to close selector and channel
Browse files Browse the repository at this point in the history
  • Loading branch information
bmalinowsky committed Aug 3, 2024
1 parent 91212df commit 0fb8915
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions src/io/calimero/knxnetip/Discoverer.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ private CompletableFuture<Result<SearchResponse>> receiveAsync(final DatagramCha

private final class ReceiverLoop extends UdpSocketLooper implements Runnable
{
private final DatagramChannel dc;
private final boolean multicast;
private final InetSocketAddress server;
private final NetworkInterface nif;
Expand All @@ -764,6 +765,7 @@ private final class ReceiverLoop extends UdpSocketLooper implements Runnable
throws IOException {
super(null, false, receiveBufferSize, 0, (int) timeout.toMillis());

this.dc = dc;
final var mcastIf = dc.getOption(StandardSocketOptions.IP_MULTICAST_IF);
nif = mcastIf == null ? Net.defaultNetif() : mcastIf;
this.localEndpoint = localEndpoint;
Expand All @@ -784,6 +786,8 @@ private final class ReceiverLoop extends UdpSocketLooper implements Runnable
ReceiverLoop(final DatagramChannel dc, final int receiveBufferSize, final Duration timeout,
final InetSocketAddress queriedServer) throws IOException {
super(null, true, receiveBufferSize, 0, (int) timeout.toMillis());

this.dc = dc;
nif = null;
localEndpoint = null;
multicast = false;
Expand Down Expand Up @@ -877,21 +881,42 @@ else if (!multicast && svc == KNXnetIPHeader.SearchResponse) {
protected void receive(final byte[] buf) throws IOException {
var remaining = timeout;
final var end = Instant.now().plus(remaining);
while (remaining.toMillis() > 0) {
while (selector.isOpen() && remaining.toMillis() > 0) {
if (selector.select(remaining.toMillis()) > 0) {
for (final var i = selector.selectedKeys().iterator(); i.hasNext();) {
final var key = i.next();
final Set<SelectionKey> selectedKeys;
// synchronize and copy to avoid CME if quit() closes selector
synchronized (this) {
selectedKeys = Set.copyOf(selector.selectedKeys());
selector.selectedKeys().clear();
}
for (final var key : selectedKeys) {
final var channel = key.channel();
final ByteBuffer buffer = ByteBuffer.wrap(buf);
final var source = ((DatagramChannel) channel).receive(buffer);
buffer.flip();
onReceive((InetSocketAddress) source, buf, buffer.position(), buffer.remaining());
i.remove();
}
return;
}
remaining = Duration.between(Instant.now(), end);
}
}

@Override
public void quit() {
super.quit();
try {
synchronized (this) {
selector.close();
}
}
catch (final IOException ignore) {}
finally {
try {
dc.close();
}
catch (final IOException ignore) {}
}
}
}
}

0 comments on commit 0fb8915

Please sign in to comment.