Skip to content

Commit

Permalink
Merge branch 'feat/unix-socket'
Browse files Browse the repository at this point in the history
  • Loading branch information
bmalinowsky committed Aug 18, 2024
2 parents 07e0889 + 575d494 commit 6036c76
Show file tree
Hide file tree
Showing 18 changed files with 1,266 additions and 816 deletions.
106 changes: 85 additions & 21 deletions src/io/calimero/knxnetip/ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -116,14 +117,14 @@ public abstract class ClientConnection extends ConnectionBase

private volatile boolean cleanup;

final boolean tcp;
private final TcpConnection connection;
final boolean stream;
private final StreamConnection connection;

// logger is initialized in connect, when name of connection is available
protected ClientConnection(final int serviceRequest, final int serviceAck, final int maxSendAttempts,
final int responseTimeout, final TcpConnection connection) {
final int responseTimeout, final StreamConnection connection) {
super(serviceRequest, serviceAck, maxSendAttempts, responseTimeout);
tcp = connection != TcpConnection.Udp;
stream = connection != TcpConnection.Udp;
this.connection = connection;
}

Expand All @@ -132,11 +133,11 @@ protected ClientConnection(final int serviceRequest, final int serviceAck, final
this(serviceRequest, serviceAck, maxSendAttempts, responseTimeout, TcpConnection.Udp);
}

protected void connect(final TcpConnection c, final CRI cri) throws KNXException, InterruptedException {
protected void connect(final StreamConnection c, final CRI cri) throws KNXException, InterruptedException {
try {
c.connect();
c.registerConnectRequest(this);
connect(c.localEndpoint(), c.server(), cri, false);
doConnect(c, cri);
}
catch (final IOException e) {
throw new KNXException("connecting " + connection, e);
Expand All @@ -146,6 +147,64 @@ protected void connect(final TcpConnection c, final CRI cri) throws KNXException
}
}

void doConnect(final StreamConnection c, final CRI cri) throws KNXException, InterruptedException {
controlEndpoint = c.server();
if (c instanceof final TcpConnection tcpConn)
connect(tcpConn.localEndpoint(), tcpConn.server(), cri, false);
else if (c instanceof UnixDomainSocketConnection)
connectToUnixSocket(c, cri);
else
throw new IllegalStateException();
}

void connectToUnixSocket(final StreamConnection c, final CRI cri) throws KNXException, InterruptedException {
if (state != CLOSED)
throw new IllegalStateException("open connection");
logger = LogService.getLogger("io.calimero.knxnetip." + name());

final SocketAddress server = c.server();
try {
logger.log(DEBUG, "establish connection to {0} ({1})", server, "uds");
final var hpai = HPAI.Tcp;
final byte[] buf = PacketHelper.toPacket(protocolVersion(), new ConnectRequest(cri, hpai, hpai));
send(buf, null);
}
catch (IOException | SecurityException e) {
logger.log(ERROR, "communication failure on connect", e);
throw new KNXException("connecting to " + server + ": " + e.getMessage());
}

logger.log(DEBUG, "wait for connect response from {0} ...", server);
try {
final boolean changed = waitForStateChange(CLOSED, CONNECT_REQ_TIMEOUT);
if (state == OK) {
Executor.execute(heartbeat, "KNXnet/IP heartbeat monitor");

String optionalConnectionInfo = "";
if (tunnelingAddress != null)
optionalConnectionInfo = ", tunneling address " + tunnelingAddress;
logger.log(INFO, "connection established (data endpoint {0}:{1}, channel {2}{3})",
dataEndpt.getAddress().getHostAddress(), dataEndpt.getPort(), channelId,
optionalConnectionInfo);
return;
}
final KNXException e;
if (!changed)
e = new KNXTimeoutException("timeout connecting to control endpoint " + server);
else if (state == ACK_ERROR)
e = new KNXRemoteException("error response from control endpoint " + server + ": " + status);
else
e = new KNXInvalidResponseException("invalid connect response from " + server);
// quit, cleanup and notify user
connectCleanup(e);
throw e;
}
catch (final InterruptedException e) {
connectCleanup(e);
throw e;
}
}

/**
* Opens a new IP communication channel to a remote server.
* <p>
Expand All @@ -169,6 +228,10 @@ protected void connect(final TcpConnection c, final CRI cri) throws KNXException
protected void connect(final InetSocketAddress localEP, final InetSocketAddress serverCtrlEP,
final CRI cri, final boolean useNAT) throws KNXException, InterruptedException
{
// if we allow localEP to be null, we would create an unbound socket
if (localEP == null)
throw new KNXIllegalArgumentException("no local endpoint specified");

if (state != CLOSED)
throw new IllegalStateException("open connection");
ctrlEndpt = serverCtrlEP;
Expand All @@ -177,22 +240,20 @@ protected void connect(final InetSocketAddress localEP, final InetSocketAddress
if (ctrlEndpt.getAddress().isMulticastAddress())
throw new KNXIllegalArgumentException("server control endpoint cannot be a multicast address ("
+ ctrlEndpt.getAddress().getHostAddress() + ")");
controlEndpoint = ctrlEndpt;
useNat = useNAT;
logger = LogService.getLogger("io.calimero.knxnetip." + name());
// if we allow localEP to be null, we would create an unbound socket
if (localEP == null)
throw new KNXIllegalArgumentException("no local endpoint specified");
final InetSocketAddress local = Net.matchRemoteEndpoint(localEP, serverCtrlEP, useNAT);
final InetSocketAddress local = stream ? localEP : Net.matchRemoteEndpoint(localEP, serverCtrlEP, useNAT);
try {
if (!tcp) {
if (!stream) {
socket = new DatagramSocket(local);
ctrlSocket = socket;
}

final var lsa = localSocketAddress();
logger.log(DEBUG, "establish connection from {0} to {1} ({2})", hostPort(lsa), hostPort(ctrlEndpt), tcp ? "tcp" : "udp");
logger.log(DEBUG, "establish connection from {0} to {1} ({2})", hostPort(lsa), hostPort(ctrlEndpt), stream ? "tcp" : "udp");
// HPAI throws if wildcard local address (0.0.0.0) is supplied
final var hpai = tcp ? HPAI.Tcp : useNat ? HPAI.Nat : new HPAI(HPAI.IPV4_UDP, lsa);
final var hpai = stream ? HPAI.Tcp : useNat ? HPAI.Nat : new HPAI(HPAI.IPV4_UDP, lsa);
final byte[] buf = PacketHelper.toPacket(protocolVersion(), new ConnectRequest(cri, hpai, hpai));
send(buf, ctrlEndpt);
}
Expand All @@ -206,7 +267,7 @@ protected void connect(final InetSocketAddress localEP, final InetSocketAddress
}

logger.log(DEBUG, "wait for connect response from {0} ...", hostPort(ctrlEndpt));
if (!tcp)
if (!stream)
startReceiver();
try {
final boolean changed = waitForStateChange(CLOSED, CONNECT_REQ_TIMEOUT);
Expand Down Expand Up @@ -240,7 +301,7 @@ else if (state == ACK_ERROR)

@Override
protected void send(final byte[] packet, final InetSocketAddress dst) throws IOException {
if (tcp)
if (stream)
connection.send(packet);
else
super.send(packet, dst);
Expand Down Expand Up @@ -305,9 +366,9 @@ else if (svc == KNXnetIPHeader.CONNECT_RES) {
}
// address info is only != null on no error
final HPAI ep = res.getDataEndpoint();
if (res.getStatus() == ErrorCodes.NO_ERROR && tcp == (ep.hostProtocol() == HPAI.IPV4_TCP)) {
if (res.getStatus() == ErrorCodes.NO_ERROR && stream == (ep.hostProtocol() == HPAI.IPV4_TCP)) {
channelId = res.getChannelID();
if (tcp || (useNat && ep.nat()))
if (stream || (useNat && ep.nat()))
dataEndpt = new InetSocketAddress(src, port);
else
dataEndpt = ep.endpoint();
Expand All @@ -333,7 +394,7 @@ else if (svc == KNXnetIPHeader.CONNECTIONSTATE_RES) {
heartbeat.setResponse(new ConnectionstateResponse(data, offset));
}
else if (svc == KNXnetIPHeader.DISCONNECT_REQ) {
if (ctrlEndpt.getAddress().equals(src) && ctrlEndpt.getPort() == port)
if (ctrlEndpt == null || ctrlEndpt.getAddress().equals(src) && ctrlEndpt.getPort() == port)
disconnectRequested(new DisconnectRequest(data, offset));
}
else if (svc == KNXnetIPHeader.DISCONNECT_RES) {
Expand All @@ -345,7 +406,7 @@ else if (svc == KNXnetIPHeader.DISCONNECT_RES) {
}
else if (svc == serviceAck) {
// with tcp, service acks are not required and just ignored
if (tcp)
if (stream)
return true;

final ServiceAck res = new ServiceAck(svc, data, offset);
Expand Down Expand Up @@ -380,8 +441,11 @@ String connectionState() {
};
}

/** Experimental; do not use. */
public SocketAddress remoteAddress() { return controlEndpoint; }

private InetSocketAddress localSocketAddress() {
return (InetSocketAddress) (tcp ? connection.localEndpoint() : socket.getLocalSocketAddress());
return (InetSocketAddress) (stream ? connection.localEndpoint() : socket.getLocalSocketAddress());
}

private void disconnectRequested(final DisconnectRequest req)
Expand Down Expand Up @@ -457,7 +521,7 @@ private final class HeartbeatMonitor implements Runnable
public void run()
{
thread = Thread.currentThread();
final var hpai = tcp ? HPAI.Tcp : useNat ? HPAI.Nat : new HPAI(HPAI.IPV4_UDP, localSocketAddress());
final var hpai = stream ? HPAI.Tcp : useNat ? HPAI.Nat : new HPAI(HPAI.IPV4_UDP, localSocketAddress());
final byte[] buf = PacketHelper.toPacket(protocolVersion(), new ConnectionstateRequest(channelId, hpai));
try {
while (!stop) {
Expand Down
19 changes: 15 additions & 4 deletions src/io/calimero/knxnetip/ConnectionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnixDomainSocketAddress;
import java.util.HexFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -99,6 +101,7 @@ public abstract class ConnectionBase implements KNXnetIPConnection

/** Remote control endpoint. */
protected InetSocketAddress ctrlEndpt;
volatile SocketAddress controlEndpoint;
/** Remote data endpoint. */
protected InetSocketAddress dataEndpt;

Expand Down Expand Up @@ -313,6 +316,8 @@ public final int getState()
@Override
public String name()
{
if (ctrlEndpt == null)
return controlEndpoint.toString();
// only the control endpoint is set when our logger is initialized (the data
// endpoint gets assigned later in connect)
// to keep the name short, avoid a prepended host name as done by InetAddress
Expand Down Expand Up @@ -378,12 +383,18 @@ protected void fireFrameReceived(final CEMI frame)
listeners.fire(l -> l.frameReceived(fe));
}

boolean handleServiceType(final KNXnetIPHeader h, final byte[] data, final int offset,
final InetSocketAddress source) throws KNXFormatException, IOException {
boolean handleServiceType(final KNXnetIPHeader h, final byte[] data, final int offset, final SocketAddress source)
throws KNXFormatException, IOException {
final int hdrStart = offset - h.getStructLength();
logger.log(TRACE, "from {0}: {1}: {2}", Net.hostPort(source), h,
final var socketName = source instanceof final InetSocketAddress isa ? Net.hostPort(isa) : source;
logger.log(TRACE, "from {0}: {1}: {2}", socketName, h,
HexFormat.ofDelimiter(" ").formatHex(data, hdrStart, hdrStart + h.getTotalLength()));
return handleServiceType(h, data, offset, source.getAddress(), source.getPort());
if (source instanceof final InetSocketAddress isa)
return handleServiceType(h, data, offset, isa.getAddress(), isa.getPort());
else if (source instanceof UnixDomainSocketAddress)
return handleServiceType(h, data, offset, InetAddress.getByAddress(new byte[4]), 0); // TODO UDS remote
else
throw new IllegalStateException();
}

/**
Expand Down
6 changes: 5 additions & 1 deletion src/io/calimero/knxnetip/Discoverer.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
import io.calimero.KnxRuntimeException;
import io.calimero.internal.Executor;
import io.calimero.internal.UdpSocketLooper;
import io.calimero.knxnetip.TcpConnection.SecureSession;
import io.calimero.knxnetip.StreamConnection.SecureSession;
import io.calimero.knxnetip.servicetype.DescriptionRequest;
import io.calimero.knxnetip.servicetype.DescriptionResponse;
import io.calimero.knxnetip.servicetype.KNXnetIPHeader;
Expand Down Expand Up @@ -225,6 +225,10 @@ public static DiscovererTcp tcp(final TcpConnection c) {
return new DiscovererTcp(c);
}

public static DiscovererTcp uds(final UnixDomainSocketConnection c) {
return new DiscovererTcp(c);
}

/**
* Returns a discoverer which uses the supplied secure session for discovery &amp; description requests.
*
Expand Down
29 changes: 19 additions & 10 deletions src/io/calimero/knxnetip/DiscovererTcp.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@
import io.calimero.KNXFormatException;
import io.calimero.KNXIllegalArgumentException;
import io.calimero.KNXTimeoutException;
import io.calimero.ServiceType;
import io.calimero.knxnetip.Discoverer.Result;
import io.calimero.knxnetip.KNXnetIPTunnel.TunnelingLayer;
import io.calimero.knxnetip.TcpConnection.SecureSession;
import io.calimero.knxnetip.StreamConnection.SecureSession;
import io.calimero.knxnetip.servicetype.DescriptionRequest;
import io.calimero.knxnetip.servicetype.DescriptionResponse;
import io.calimero.knxnetip.servicetype.KNXnetIPHeader;
Expand All @@ -72,13 +73,13 @@
* KNX IP Secure connections.
*/
public class DiscovererTcp {
private final TcpConnection connection;
private final StreamConnection connection;
private final SecureSession session;

private volatile Duration timeout = Duration.ofSeconds(10);


DiscovererTcp(final TcpConnection c) {
DiscovererTcp(final StreamConnection c) {
this.connection = c;
this.session = null;
}
Expand Down Expand Up @@ -139,7 +140,7 @@ private <T> CompletableFuture<Result<T>> send(final byte[] packet) {
private final class Tunnel<T> extends KNXnetIPTunnel {
private final CompletableFuture<Result<T>> cf;

Tunnel(final TunnelingLayer knxLayer, final TcpConnection connection,
Tunnel(final TunnelingLayer knxLayer, final StreamConnection connection,
final IndividualAddress tunnelingAddress, final CompletableFuture<Result<T>> cf) throws KNXException,
InterruptedException {
super(knxLayer, connection, tunnelingAddress);
Expand All @@ -154,9 +155,16 @@ protected boolean handleServiceType(final KNXnetIPHeader h, final byte[] data, f
if (svc == KNXnetIPHeader.SearchResponse || svc == KNXnetIPHeader.DESCRIPTION_RES) {
final var sr = svc == KNXnetIPHeader.SearchResponse ? SearchResponse.from(h, data, offset)
: new DescriptionResponse(data, offset, h.getTotalLength() - h.getStructLength());
final var result = new Result<>(sr,
NetworkInterface.getByInetAddress(connection.localEndpoint().getAddress()),
connection.localEndpoint(), connection.server());

final Result<ServiceType> result;
if (connection instanceof final TcpConnection tcp)
result = new Result<>(sr, NetworkInterface.getByInetAddress(tcp.localEndpoint().getAddress()),
tcp.localEndpoint(), tcp.server());
else if (connection instanceof UnixDomainSocketConnection)
result = new Result<>(sr, Net.defaultNetif(), new InetSocketAddress(0), new InetSocketAddress(0));
else
throw new IllegalStateException();

complete(result);
return true;
}
Expand All @@ -170,11 +178,12 @@ protected boolean handleServiceType(final KNXnetIPHeader h, final byte[] data, f
public String name() {
final String lock = new String(Character.toChars(0x1F512));
final String secure = session != null ? (" " + lock) : "";
return "KNX IP" + secure + " Tunneling " + hostPort(ctrlEndpt);
final String remote = ctrlEndpt != null ? hostPort(ctrlEndpt) : controlEndpoint.toString();
return "KNX IP" + secure + " Tunneling " + remote;
}

@Override
protected void connect(final TcpConnection c, final CRI cri) throws KNXException, InterruptedException {
protected void connect(final StreamConnection c, final CRI cri) throws KNXException, InterruptedException {
if (session == null) {
super.connect(c, cri);
return;
Expand All @@ -183,7 +192,7 @@ protected void connect(final TcpConnection c, final CRI cri) throws KNXException
session.ensureOpen();
session.registerConnectRequest(this);
try {
super.connect(c.localEndpoint(), c.server(), cri, false);
doConnect(c, cri);
}
finally {
session.unregisterConnectRequest(this);
Expand Down
Loading

0 comments on commit 6036c76

Please sign in to comment.