Skip to content

Commit

Permalink
Merge pull request #15 from eulynx-live/ConnectionInterface
Browse files Browse the repository at this point in the history
Connection interface
  • Loading branch information
rs22 authored Dec 3, 2023
2 parents 59d9f5e + e5848ad commit 125361e
Show file tree
Hide file tree
Showing 10 changed files with 517 additions and 213 deletions.
140 changes: 122 additions & 18 deletions src/FieldElementSubsystems.Test/Point/PointTest.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,135 @@
using Castle.Core.Logging;
using EulynxLive.Messages.Baseline4R1;
using EulynxLive.Point;
using IPointToInterlockingConnection = EulynxLive.Point.Interfaces.IPointToInterlockingConnection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Moq;
namespace FieldElementSubsystems.Test;

public class PointTest
{
[Fact]
public void PointShouldParseConfiguration()
{
var testSettings = new Dictionary<string, string?> {
{"PointSettings:LocalId", "99W1" },
{"PointSettings:LocalRastaId", "100" },
{"PointSettings:RemoteId", "INTERLOCKING" },
{"PointSettings:RemoteEndpoint", "http://localhost:50051" },
{"PointSettings:AllPointMachinesCrucial", "true" },
{"PointSettings:SimulateRandomTimeouts", "true" },
};

var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(testSettings)
private EulynxLive.Point.Point CreateDefaultPoint(IPointToInterlockingConnection? connection = null) =>
new(_logger, _configuration, connection ?? Mock.Of<IPointToInterlockingConnection>(), () => Task.CompletedTask);

private Mock<IPointToInterlockingConnection> CreateDefaultMockConnection() {
var mockConnection = new Mock<IPointToInterlockingConnection>();
mockConnection
.Setup(m => m.SendPointPosition(
It.IsAny<IPointToInterlockingConnection.PointState>()))
.Returns(Task.FromResult(0));
mockConnection
.Setup(m => m.InitializeConnection(
It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(true));
return mockConnection;
}

private static readonly IDictionary<string, string?> _testSettings = new Dictionary<string, string?> {
{"PointSettings:LocalId", "99W1" },
{"PointSettings:LocalRastaId", "100" },
{"PointSettings:RemoteId", "INTERLOCKING" },
{"PointSettings:RemoteEndpoint", "http://localhost:50051" },
{"PointSettings:AllPointMachinesCrucial", "true" },
{"PointSettings:SimulateRandomTimeouts", "false" },
};
private readonly IConfiguration _configuration = new ConfigurationBuilder()
.AddInMemoryCollection(_testSettings)
.Build();
private readonly ILogger<EulynxLive.Point.Point> _logger = Mock.Of<ILogger<EulynxLive.Point.Point>>();

var point = new EulynxLive.Point.Point(Mock.Of<ILogger<EulynxLive.Point.Point>>(), configuration);
[Fact]
public void Test_Parse_Configuration()
{
var point = CreateDefaultPoint();

Assert.True(point.AllPointMachinesCrucial);
}

[Fact]
public void Test_Default_Position()
{
var point = CreateDefaultPoint();
Assert.Equal(IPointToInterlockingConnection.PointPosition.NoEndPosition, point.PointState.PointPosition);
}

[Fact]
public async Task Test_Turn_Left()
{
// Arrange
var point = CreateDefaultPoint();
var mockConnection = CreateDefaultMockConnection();
var cancel = new CancellationTokenSource();

mockConnection
.SetupSequence(m => m.ReceivePointPosition(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Left))
.Returns(() =>
{
cancel.Cancel();
return new TaskCompletionSource<IPointToInterlockingConnection.PointPosition?>().Task;
});

point = CreateDefaultPoint(mockConnection.Object);

// Act
await point.StartAsync(cancel.Token);

// Assert
mockConnection.Verify(v => v.InitializeConnection(It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()));
Assert.Equal(IPointToInterlockingConnection.PointPosition.Left, point.PointState.PointPosition);
}

[Fact]
public async Task Test_Turn_Right()
{
// Arrange
var point = CreateDefaultPoint();
var mockConnection = CreateDefaultMockConnection();
var cancel = new CancellationTokenSource();

mockConnection
.SetupSequence(m => m.ReceivePointPosition(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Right))
.Returns(() =>
{
cancel.Cancel();
return new TaskCompletionSource<IPointToInterlockingConnection.PointPosition?>().Task;
});

point = CreateDefaultPoint(mockConnection.Object);

// Act
await point.StartAsync(cancel.Token);

// Assert
mockConnection.Verify(v => v.InitializeConnection(It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()));
Assert.Equal(IPointToInterlockingConnection.PointPosition.Right, point.PointState.PointPosition);
}

[Fact]
public async Task Test_Turnover()
{
// Arrange
var point = CreateDefaultPoint();
var mockConnection = CreateDefaultMockConnection();
var cancel = new CancellationTokenSource();

mockConnection
.SetupSequence(m => m.ReceivePointPosition(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Right))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(null))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Left))
.Returns(() =>
{
cancel.Cancel();
return new TaskCompletionSource<IPointToInterlockingConnection.PointPosition?>().Task;
});

point = CreateDefaultPoint(mockConnection.Object);

// Act
await point.StartAsync(cancel.Token);

// Assert
mockConnection.Verify(v => v.InitializeConnection(It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()));
Assert.Equal(IPointToInterlockingConnection.PointPosition.Left, point.PointState.PointPosition);
}
}
134 changes: 134 additions & 0 deletions src/Point/Connections/Baseline4R1/PointToInterlockingConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using EulynxLive.Messages.Baseline4R1;
using Grpc.Core;
using IPointToInterlockingConnection = EulynxLive.Point.Interfaces.IPointToInterlockingConnection;
using PointState = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointState;
using PointPosition = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointPosition;
using EulynxLive.Point.Interfaces;


namespace EulynxLive.Point.EulynxBaseline4R1;

public class PointToInterlockingConnection : IPointToInterlockingConnection
{
private readonly ILogger _logger;
private readonly string _localId;
private readonly string _localRastaId;
private readonly string _remoteId;
private readonly string _remoteEndpoint;
IConnection? _currentConnection;
private CancellationTokenSource _timeout;
private int _timeoutDuration;
private CancellationToken _stoppingToken;

public PointToInterlockingConnection(
ILogger<PointToInterlockingConnection> logger,
IConfiguration configuration,
CancellationToken stoppingToken, int timeoutDuration = 10000)
{
_timeoutDuration = timeoutDuration;
_stoppingToken = stoppingToken;
_timeout = new CancellationTokenSource();
_logger = logger;
_currentConnection = null;

var config = configuration.GetSection("PointSettings").Get<PointConfiguration>() ?? throw new Exception("No configuration provided");
_localId = config.LocalId;
_localRastaId = config.LocalRastaId.ToString();
_remoteId = config.RemoteId;
_remoteEndpoint = config.RemoteEndpoint;
}

public void Connect()
{
ResetTimeout();
_logger.LogTrace("Connecting...");
var metadata = new Metadata { { "rasta-id", _localRastaId } };
_currentConnection = new GrpcConnection(metadata, _remoteEndpoint, _timeout.Token);
}

public async Task<bool> InitializeConnection(PointState state, CancellationToken cancellationToken)
{
_logger.LogTrace("Connected. Waiting for request...");
if (await ReceiveMessage<PointPdiVersionCheckCommand>(cancellationToken) == null)
{
_logger.LogError("Unexpected message.");
return false;
}

var versionCheckResponse = new PointPdiVersionCheckMessage(_localId, _remoteId, PointPdiVersionCheckMessageResultPdiVersionCheck.PDIVersionsFromReceiverAndSenderDoMatch, /* TODO */ 0, 0, new byte[] { });
await SendMessage(versionCheckResponse);

if (await ReceiveMessage<PointInitialisationRequestCommand>(cancellationToken) == null)
{
_logger.LogError("Unexpected message.");
return false;
}

var startInitialization = new PointStartInitialisationMessage(_localId, _remoteId);
await SendMessage(startInitialization);

var pointState = new PointStateBaseline4R1(state);
var initialPosition = new PointPointPositionMessage(_localId, _remoteId, pointState.PointPosition, pointState.DegradedPointPosition);
await SendMessage(initialPosition);

var completeInitialization = new PointInitialisationCompletedMessage(_localId, _remoteId);
await SendMessage(completeInitialization);
return true;
}

public async Task SendPointPosition(PointState state)
{
var pointState = new PointStateBaseline4R1(state);
var response = new PointPointPositionMessage(_localId, _remoteId, pointState.PointPosition, pointState.DegradedPointPosition);
await SendMessage(response);
}

async public Task SendTimeoutMessage()
{
var response = new PointTimeoutMessage(_localId, _remoteId);
await SendMessage(response);
}

public async Task<PointPosition?> ReceivePointPosition(CancellationToken cancellationToken)
{
var message = await ReceiveMessage<PointMovePointCommand>(cancellationToken);

return (message != null)? message.CommandedPointPosition switch
{
PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsARightHandPointMoving => PointPosition.Right,
PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsALeftHandPointMoving => PointPosition.Left,
_ => throw new global::System.NotImplementedException(),
} : null;
}

public void Dispose()
{
_currentConnection?.Dispose();
}

private async Task SendMessage(Message message)
{
if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?");
await _currentConnection.SendAsync(message.ToByteArray());
}

private async Task<T?> ReceiveMessage<T>(CancellationToken cancellationToken) where T : Message
{
if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?");
ResetTimeout();

var message = Message.FromBytes(await _currentConnection.ReceiveAsync(_timeout.Token));
if (message is not T)
{
_logger.LogError("Unexpected message: {}", message);
return null;
}
return message as T;
}

private void ResetTimeout()
{
_timeout = CancellationTokenSource.CreateLinkedTokenSource(_stoppingToken);
_timeout.CancelAfter(_timeoutDuration);
}
}
45 changes: 45 additions & 0 deletions src/Point/Connections/GrpcConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using EulynxLive.Point.Interfaces;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;
using Sci;
using static Sci.Rasta;

namespace EulynxLive.Point;

class GrpcConnection : IConnection
{
class GrpcConnectionException : Exception
{
public GrpcConnectionException(string message) : base(message) { }
}

AsyncDuplexStreamingCall<SciPacket, SciPacket>? _connection;
private CancellationToken _stoppingToken;

public GrpcConnection(Metadata? metadata, string remoteEndpoint, CancellationToken stoppingToken)
{
_stoppingToken = stoppingToken;

var channel = GrpcChannel.ForAddress(remoteEndpoint);
var client = new RastaClient(channel);
_connection = client.Stream(metadata, cancellationToken: stoppingToken);
}
public void Dispose()
{
_connection?.Dispose();
}

public async Task<byte[]> ReceiveAsync(CancellationToken cancellationToken)
{
if (_connection == null) throw new InvalidOperationException("Grpc connection not connected.");
if (!await _connection.ResponseStream.MoveNext(cancellationToken)) throw new GrpcConnectionException("Could not receive grpc message.");
return _connection.ResponseStream.Current.Message.ToByteArray();
}

public async Task SendAsync(byte[] bytes)
{
if (_connection == null) throw new InvalidOperationException("Grpc connection not connected.");
await _connection.RequestStream.WriteAsync(new SciPacket() { Message = ByteString.CopyFrom(bytes) });
}
}
36 changes: 36 additions & 0 deletions src/Point/Connections/PointStateBaseline4R1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

using EulynxLive.Messages.Baseline4R1;
using IPointToInterlockingConnection = EulynxLive.Point.Interfaces.IPointToInterlockingConnection;
using PointState = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointState;
using PointPosition = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointPosition;
using DegradedPointPosition = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.DegradedPointPosition;

public record PointStateBaseline4R1
{
private PointState _state;
public PointPointPositionMessageReportedPointPosition PointPosition { get => MapInterfacePointPositionToConcrete(_state.PointPosition); }
public PointPointPositionMessageReportedDegradedPointPosition DegradedPointPosition { get => MapInterfaceDegradedPointPositionToConcrete(_state.DegradedPointPosition); }

public PointStateBaseline4R1(PointState state)
{
_state = state;
}

private PointPointPositionMessageReportedPointPosition MapInterfacePointPositionToConcrete(PointPosition value) => value switch
{
IPointToInterlockingConnection.PointPosition.Left => PointPointPositionMessageReportedPointPosition.PointIsInALeftHandPositionDefinedEndPosition,
IPointToInterlockingConnection.PointPosition.Right => PointPointPositionMessageReportedPointPosition.PointIsInARightHandPositionDefinedEndPosition,
IPointToInterlockingConnection.PointPosition.UnintendedPosition => PointPointPositionMessageReportedPointPosition.PointIsTrailed,
IPointToInterlockingConnection.PointPosition.NoEndPosition => PointPointPositionMessageReportedPointPosition.PointIsInNoEndPosition,
_ => throw new NotImplementedException(),
};

private PointPointPositionMessageReportedDegradedPointPosition MapInterfaceDegradedPointPositionToConcrete(DegradedPointPosition value) => value switch
{
IPointToInterlockingConnection.DegradedPointPosition.DegradedLeft => PointPointPositionMessageReportedDegradedPointPosition.PointIsInADegradedLeftHandPosition,
IPointToInterlockingConnection.DegradedPointPosition.DegradedRight => PointPointPositionMessageReportedDegradedPointPosition.PointIsInADegradedRightHandPosition,
IPointToInterlockingConnection.DegradedPointPosition.NotDegraded => PointPointPositionMessageReportedDegradedPointPosition.PointIsNotInADegradedPosition,
IPointToInterlockingConnection.DegradedPointPosition.NotApplicable => PointPointPositionMessageReportedDegradedPointPosition.DegradedPointPositionIsNotApplicable,
_ => throw new NotImplementedException(),
};
}
8 changes: 8 additions & 0 deletions src/Point/Interfaces/IConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EulynxLive.Point.Interfaces
{
interface IConnection : IDisposable
{
Task<byte[]> ReceiveAsync(CancellationToken cancellationToken);
Task SendAsync(byte[] bytes);
}
}
Loading

0 comments on commit 125361e

Please sign in to comment.