Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection interface #15

Merged
merged 22 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 122 additions & 18 deletions src/FieldElementSubsystems.Test/Point/PointTest.cs
Original file line number Diff line number Diff line change
@@ -1,31 +1,135 @@
using Castle.Core.Logging;
using EulynxLive.Messages.Baseline4R1;
using EulynxLive.Point;
using IPointToInterlockingConnection = EulynxLive.Point.Interfaces.IPointToInterlockingConnection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Moq;
namespace FieldElementSubsystems.Test;

public class PointTest
{
[Fact]
public void PointShouldParseConfiguration()
{
var testSettings = new Dictionary<string, string?> {
{"PointSettings:LocalId", "99W1" },
{"PointSettings:LocalRastaId", "100" },
{"PointSettings:RemoteId", "INTERLOCKING" },
{"PointSettings:RemoteEndpoint", "http://localhost:50051" },
{"PointSettings:AllPointMachinesCrucial", "true" },
{"PointSettings:SimulateRandomTimeouts", "true" },
};

var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(testSettings)
private EulynxLive.Point.Point CreateDefaultPoint(IPointToInterlockingConnection? connection = null) =>
new(_logger, _configuration, connection ?? Mock.Of<IPointToInterlockingConnection>(), () => Task.CompletedTask);

private Mock<IPointToInterlockingConnection> CreateDefaultMockConnection() {
var mockConnection = new Mock<IPointToInterlockingConnection>();
mockConnection
.Setup(m => m.SendPointPosition(
It.IsAny<IPointToInterlockingConnection.PointState>()))
.Returns(Task.FromResult(0));
mockConnection
.Setup(m => m.InitializeConnection(
It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()))
.Returns(Task.FromResult(true));
return mockConnection;
}

private static readonly IDictionary<string, string?> _testSettings = new Dictionary<string, string?> {
{"PointSettings:LocalId", "99W1" },
{"PointSettings:LocalRastaId", "100" },
{"PointSettings:RemoteId", "INTERLOCKING" },
{"PointSettings:RemoteEndpoint", "http://localhost:50051" },
{"PointSettings:AllPointMachinesCrucial", "true" },
{"PointSettings:SimulateRandomTimeouts", "false" },
};
private readonly IConfiguration _configuration = new ConfigurationBuilder()
.AddInMemoryCollection(_testSettings)
.Build();
private readonly ILogger<EulynxLive.Point.Point> _logger = Mock.Of<ILogger<EulynxLive.Point.Point>>();

var point = new EulynxLive.Point.Point(Mock.Of<ILogger<EulynxLive.Point.Point>>(), configuration);
[Fact]
public void Test_Parse_Configuration()
{
var point = CreateDefaultPoint();

Assert.True(point.AllPointMachinesCrucial);
}

[Fact]
public void Test_Default_Position()
{
var point = CreateDefaultPoint();
Assert.Equal(IPointToInterlockingConnection.PointPosition.NoEndPosition, point.PointState.PointPosition);
}

[Fact]
public async Task Test_Turn_Left()
{
// Arrange
var point = CreateDefaultPoint();
var mockConnection = CreateDefaultMockConnection();
var cancel = new CancellationTokenSource();

mockConnection
.SetupSequence(m => m.ReceivePointPosition(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Left))
.Returns(() =>
{
cancel.Cancel();
return new TaskCompletionSource<IPointToInterlockingConnection.PointPosition?>().Task;
});

point = CreateDefaultPoint(mockConnection.Object);

// Act
await point.StartAsync(cancel.Token);

// Assert
mockConnection.Verify(v => v.InitializeConnection(It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()));
Assert.Equal(IPointToInterlockingConnection.PointPosition.Left, point.PointState.PointPosition);
}

[Fact]
public async Task Test_Turn_Right()
{
// Arrange
var point = CreateDefaultPoint();
var mockConnection = CreateDefaultMockConnection();
var cancel = new CancellationTokenSource();

mockConnection
.SetupSequence(m => m.ReceivePointPosition(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Right))
.Returns(() =>
{
cancel.Cancel();
return new TaskCompletionSource<IPointToInterlockingConnection.PointPosition?>().Task;
});

point = CreateDefaultPoint(mockConnection.Object);

// Act
await point.StartAsync(cancel.Token);

// Assert
mockConnection.Verify(v => v.InitializeConnection(It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()));
Assert.Equal(IPointToInterlockingConnection.PointPosition.Right, point.PointState.PointPosition);
}

[Fact]
public async Task Test_Turnover()
{
// Arrange
var point = CreateDefaultPoint();
var mockConnection = CreateDefaultMockConnection();
var cancel = new CancellationTokenSource();

mockConnection
.SetupSequence(m => m.ReceivePointPosition(It.IsAny<CancellationToken>()))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Right))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(null))
.Returns(Task.FromResult<IPointToInterlockingConnection.PointPosition?>(IPointToInterlockingConnection.PointPosition.Left))
.Returns(() =>
{
cancel.Cancel();
return new TaskCompletionSource<IPointToInterlockingConnection.PointPosition?>().Task;
});

point = CreateDefaultPoint(mockConnection.Object);

// Act
await point.StartAsync(cancel.Token);

// Assert
mockConnection.Verify(v => v.InitializeConnection(It.IsAny<IPointToInterlockingConnection.PointState>(), It.IsAny<CancellationToken>()));
Assert.Equal(IPointToInterlockingConnection.PointPosition.Left, point.PointState.PointPosition);
}
}
134 changes: 134 additions & 0 deletions src/Point/Connections/Baseline4R1/PointToInterlockingConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using EulynxLive.Messages.Baseline4R1;
using Grpc.Core;
using IPointToInterlockingConnection = EulynxLive.Point.Interfaces.IPointToInterlockingConnection;
using PointState = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointState;
using PointPosition = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointPosition;
using EulynxLive.Point.Interfaces;


namespace EulynxLive.Point.EulynxBaseline4R1;

public class PointToInterlockingConnection : IPointToInterlockingConnection
{
private readonly ILogger _logger;
private readonly string _localId;
private readonly string _localRastaId;
private readonly string _remoteId;
private readonly string _remoteEndpoint;
IConnection? _currentConnection;
private CancellationTokenSource _timeout;
private int _timeoutDuration;
private CancellationToken _stoppingToken;

public PointToInterlockingConnection(
ILogger<PointToInterlockingConnection> logger,
IConfiguration configuration,
CancellationToken stoppingToken, int timeoutDuration = 10000)
{
_timeoutDuration = timeoutDuration;
_stoppingToken = stoppingToken;
_timeout = new CancellationTokenSource();
_logger = logger;
_currentConnection = null;

var config = configuration.GetSection("PointSettings").Get<PointConfiguration>() ?? throw new Exception("No configuration provided");
_localId = config.LocalId;
_localRastaId = config.LocalRastaId.ToString();
_remoteId = config.RemoteId;
_remoteEndpoint = config.RemoteEndpoint;
}

public void Connect()
{
ResetTimeout();
_logger.LogTrace("Connecting...");
var metadata = new Metadata { { "rasta-id", _localRastaId } };
_currentConnection = new GrpcConnection(metadata, _remoteEndpoint, _timeout.Token);
}

public async Task<bool> InitializeConnection(PointState state, CancellationToken cancellationToken)
{
_logger.LogTrace("Connected. Waiting for request...");
if (await ReceiveMessage<PointPdiVersionCheckCommand>(cancellationToken) == null)
{
_logger.LogError("Unexpected message.");
return false;
}

var versionCheckResponse = new PointPdiVersionCheckMessage(_localId, _remoteId, PointPdiVersionCheckMessageResultPdiVersionCheck.PDIVersionsFromReceiverAndSenderDoMatch, /* TODO */ 0, 0, new byte[] { });
await SendMessage(versionCheckResponse);

if (await ReceiveMessage<PointInitialisationRequestCommand>(cancellationToken) == null)
{
_logger.LogError("Unexpected message.");
return false;
}

var startInitialization = new PointStartInitialisationMessage(_localId, _remoteId);
await SendMessage(startInitialization);

var pointState = new PointStateBaseline4R1(state);
var initialPosition = new PointPointPositionMessage(_localId, _remoteId, pointState.PointPosition, pointState.DegradedPointPosition);
await SendMessage(initialPosition);

var completeInitialization = new PointInitialisationCompletedMessage(_localId, _remoteId);
await SendMessage(completeInitialization);
return true;
}

public async Task SendPointPosition(PointState state)
{
var pointState = new PointStateBaseline4R1(state);
var response = new PointPointPositionMessage(_localId, _remoteId, pointState.PointPosition, pointState.DegradedPointPosition);
await SendMessage(response);
}

async public Task SendTimeoutMessage()
{
var response = new PointTimeoutMessage(_localId, _remoteId);
await SendMessage(response);
}

public async Task<PointPosition?> ReceivePointPosition(CancellationToken cancellationToken)
{
var message = await ReceiveMessage<PointMovePointCommand>(cancellationToken);

return (message != null)? message.CommandedPointPosition switch
{
PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsARightHandPointMoving => PointPosition.Right,
PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsALeftHandPointMoving => PointPosition.Left,
_ => throw new global::System.NotImplementedException(),
} : null;
}

public void Dispose()
{
_currentConnection?.Dispose();
}

private async Task SendMessage(Message message)
{
if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?");
await _currentConnection.SendAsync(message.ToByteArray());
}

private async Task<T?> ReceiveMessage<T>(CancellationToken cancellationToken) where T : Message
{
if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?");
ResetTimeout();

var message = Message.FromBytes(await _currentConnection.ReceiveAsync(_timeout.Token));
if (message is not T)
{
_logger.LogError("Unexpected message: {}", message);
return null;
}
return message as T;
}

private void ResetTimeout()
{
_timeout = CancellationTokenSource.CreateLinkedTokenSource(_stoppingToken);
_timeout.CancelAfter(_timeoutDuration);
}
}
45 changes: 45 additions & 0 deletions src/Point/Connections/GrpcConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using EulynxLive.Point.Interfaces;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;
using Sci;
using static Sci.Rasta;

namespace EulynxLive.Point;

class GrpcConnection : IConnection
{
class GrpcConnectionException : Exception
{
public GrpcConnectionException(string message) : base(message) { }
}

AsyncDuplexStreamingCall<SciPacket, SciPacket>? _connection;
private CancellationToken _stoppingToken;

public GrpcConnection(Metadata? metadata, string remoteEndpoint, CancellationToken stoppingToken)
{
_stoppingToken = stoppingToken;

var channel = GrpcChannel.ForAddress(remoteEndpoint);
var client = new RastaClient(channel);
_connection = client.Stream(metadata, cancellationToken: stoppingToken);
}
public void Dispose()
{
_connection?.Dispose();
}

public async Task<byte[]> ReceiveAsync(CancellationToken cancellationToken)
{
if (_connection == null) throw new InvalidOperationException("Grpc connection not connected.");
if (!await _connection.ResponseStream.MoveNext(cancellationToken)) throw new GrpcConnectionException("Could not receive grpc message.");
return _connection.ResponseStream.Current.Message.ToByteArray();
}

public async Task SendAsync(byte[] bytes)
{
if (_connection == null) throw new InvalidOperationException("Grpc connection not connected.");
await _connection.RequestStream.WriteAsync(new SciPacket() { Message = ByteString.CopyFrom(bytes) });
}
}
36 changes: 36 additions & 0 deletions src/Point/Connections/PointStateBaseline4R1.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

using EulynxLive.Messages.Baseline4R1;
using IPointToInterlockingConnection = EulynxLive.Point.Interfaces.IPointToInterlockingConnection;
using PointState = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointState;
using PointPosition = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.PointPosition;
using DegradedPointPosition = EulynxLive.Point.Interfaces.IPointToInterlockingConnection.DegradedPointPosition;

public record PointStateBaseline4R1
{
private PointState _state;
public PointPointPositionMessageReportedPointPosition PointPosition { get => MapInterfacePointPositionToConcrete(_state.PointPosition); }
public PointPointPositionMessageReportedDegradedPointPosition DegradedPointPosition { get => MapInterfaceDegradedPointPositionToConcrete(_state.DegradedPointPosition); }

public PointStateBaseline4R1(PointState state)
{
_state = state;
}

private PointPointPositionMessageReportedPointPosition MapInterfacePointPositionToConcrete(PointPosition value) => value switch
{
IPointToInterlockingConnection.PointPosition.Left => PointPointPositionMessageReportedPointPosition.PointIsInALeftHandPositionDefinedEndPosition,
IPointToInterlockingConnection.PointPosition.Right => PointPointPositionMessageReportedPointPosition.PointIsInARightHandPositionDefinedEndPosition,
IPointToInterlockingConnection.PointPosition.UnintendedPosition => PointPointPositionMessageReportedPointPosition.PointIsTrailed,
IPointToInterlockingConnection.PointPosition.NoEndPosition => PointPointPositionMessageReportedPointPosition.PointIsInNoEndPosition,
_ => throw new NotImplementedException(),
};

private PointPointPositionMessageReportedDegradedPointPosition MapInterfaceDegradedPointPositionToConcrete(DegradedPointPosition value) => value switch
{
IPointToInterlockingConnection.DegradedPointPosition.DegradedLeft => PointPointPositionMessageReportedDegradedPointPosition.PointIsInADegradedLeftHandPosition,
IPointToInterlockingConnection.DegradedPointPosition.DegradedRight => PointPointPositionMessageReportedDegradedPointPosition.PointIsInADegradedRightHandPosition,
IPointToInterlockingConnection.DegradedPointPosition.NotDegraded => PointPointPositionMessageReportedDegradedPointPosition.PointIsNotInADegradedPosition,
IPointToInterlockingConnection.DegradedPointPosition.NotApplicable => PointPointPositionMessageReportedDegradedPointPosition.DegradedPointPositionIsNotApplicable,
_ => throw new NotImplementedException(),
};
}
8 changes: 8 additions & 0 deletions src/Point/Interfaces/IConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EulynxLive.Point.Interfaces
{
interface IConnection : IDisposable
{
Task<byte[]> ReceiveAsync(CancellationToken cancellationToken);
Task SendAsync(byte[] bytes);
}
}
Loading
Loading