Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Scretch9 committed Nov 28, 2023
1 parent 23e65f5 commit a43e3eb
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
using EulynxLive.Messages.Baseline4R1;
using EulynxLive.Point;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;
using Sci;
using EulynxLive.Messages.IPointToInterlockingConnection;
using static EulynxLive.Messages.IPointToInterlockingConnection.IPointToInterlockingConnection;
using static Sci.Rasta;
using EulynxLive.Point.Interfaces;
using EulynxLive.Messages.IPointToInterlockingConnection;

namespace EulynxLive.Point.EulynxBaseline4R1;

Expand All @@ -17,13 +13,19 @@ public class PointToInterlockingConnection : IPointToInterlockingConnection
private readonly string _localRastaId;
private readonly string _remoteId;
private readonly string _remoteEndpoint;
AsyncDuplexStreamingCall<SciPacket, SciPacket>? _currentConnection;
IConnection? _currentConnection;
private CancellationTokenSource _timeout;
private int _timeoutDuration;
private CancellationToken _stoppingToken;

public PointToInterlockingConnection(
ILogger<PointToInterlockingConnection> logger,
IConfiguration configuration)
IConfiguration configuration,
CancellationToken stoppingToken, int timeoutDuration = 10000)
{
_timeoutDuration = timeoutDuration;
_stoppingToken = stoppingToken;
_timeout = new CancellationTokenSource();
_logger = logger;
_currentConnection = null;

Expand All @@ -32,18 +34,14 @@ public PointToInterlockingConnection(
_localRastaId = config.LocalRastaId.ToString();
_remoteId = config.RemoteId;
_remoteEndpoint = config.RemoteEndpoint;
_timeout = new CancellationTokenSource();
}

public void Connect()
{
var channel = GrpcChannel.ForAddress(_remoteEndpoint);
var client = new RastaClient(channel);
ResetTimeout();
_logger.LogTrace("Connecting...");
_timeout = new CancellationTokenSource();
_timeout.CancelAfter(10000);
var metadata = new Metadata { { "rasta-id", _localRastaId } };
_currentConnection = client.Stream(metadata, cancellationToken: _timeout.Token);
_currentConnection = new GrpcConnection(metadata, _remoteEndpoint, _timeout.Token);
}

public async Task<bool> InitializeConnection(PointState state)
Expand Down Expand Up @@ -97,6 +95,7 @@ async public Task SendTimeoutMessage()
{
PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsARightHandPointMoving => PointPosition.Right,
PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsALeftHandPointMoving => PointPosition.Left,
_ => throw new global::System.NotImplementedException(),
} : null;
}

Expand All @@ -108,20 +107,26 @@ public void Dispose()
private async Task SendMessage(Message message)
{
if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?");
await _currentConnection.RequestStream.WriteAsync(new SciPacket() { Message = ByteString.CopyFrom(message.ToByteArray()) });
await _currentConnection.SendAsync(message.ToByteArray());
}

private async Task<T?> ReceiveMessage<T>() where T : Message
{
if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?");
if (!await _currentConnection.ResponseStream.MoveNext(_timeout.Token)) return null;
ResetTimeout();

var message = Message.FromBytes(_currentConnection.ResponseStream.Current.Message.ToByteArray());
var message = Message.FromBytes(await _currentConnection.ReceiveAsync(_timeout.Token));
if (message is not T)
{
_logger.LogError("Unexpected message: {}", message);
return null;
}
return message as T;
}

private void ResetTimeout()
{
_timeout = CancellationTokenSource.CreateLinkedTokenSource(_stoppingToken);
_timeout.CancelAfter(_timeoutDuration);
}
}
45 changes: 45 additions & 0 deletions src/Point/Connections/GrpcConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using EulynxLive.Point.Interfaces;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;
using Sci;
using static Sci.Rasta;

namespace EulynxLive.Point;

class GrpcConnection : IConnection
{
class GrpcConnectionException : Exception
{
public GrpcConnectionException(string message) : base(message) { }
}

AsyncDuplexStreamingCall<SciPacket, SciPacket>? _connection;
private CancellationToken _stoppingToken;

public GrpcConnection(Metadata? metadata, string remoteEndpoint, CancellationToken stoppingToken)
{
_stoppingToken = stoppingToken;

var channel = GrpcChannel.ForAddress(remoteEndpoint);
var client = new RastaClient(channel);
_connection = client.Stream(metadata, cancellationToken: stoppingToken);
}
public void Dispose()
{
_connection?.Dispose();
}

public async Task<byte[]> ReceiveAsync(CancellationToken cancellationToken)
{
if (_connection == null) throw new InvalidOperationException("Grpc connection not connected.");
if (!await _connection.ResponseStream.MoveNext(cancellationToken)) throw new GrpcConnectionException("Could not receive grpc message.");
return _connection.ResponseStream.Current.Message.ToByteArray();
}

public async Task SendAsync(byte[] bytes)
{
if (_connection == null) throw new InvalidOperationException("Grpc connection not connected.");
await _connection.RequestStream.WriteAsync(new SciPacket() { Message = ByteString.CopyFrom(bytes) });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public PointStateBaseline4R1(PointState state)
IPointToInterlockingConnection.PointPosition.Right => PointPointPositionMessageReportedPointPosition.PointIsInARightHandPositionDefinedEndPosition,
IPointToInterlockingConnection.PointPosition.UnintendetPosition => PointPointPositionMessageReportedPointPosition.PointIsTrailed,
IPointToInterlockingConnection.PointPosition.NoEndposition => PointPointPositionMessageReportedPointPosition.PointIsInNoEndPosition,
_ => throw new NotImplementedException(),
};

private PointPointPositionMessageReportedDegradedPointPosition MapInterfaceDegradedPointPositionToConcrete(DegradedPointPosition value) => value switch
Expand All @@ -28,5 +29,6 @@ public PointStateBaseline4R1(PointState state)
IPointToInterlockingConnection.DegradedPointPosition.DegradedRight => PointPointPositionMessageReportedDegradedPointPosition.PointIsInADegradedRightHandPosition,
IPointToInterlockingConnection.DegradedPointPosition.NotDegraded => PointPointPositionMessageReportedDegradedPointPosition.PointIsNotInADegradedPosition,
IPointToInterlockingConnection.DegradedPointPosition.NotApplicable => PointPointPositionMessageReportedDegradedPointPosition.DegradedPointPositionIsNotApplicable,
_ => throw new NotImplementedException(),
};
}
8 changes: 8 additions & 0 deletions src/Point/Interfaces/IConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EulynxLive.Point.Interfaces
{
interface IConnection : IDisposable
{
Task<byte[]> ReceiveAsync(CancellationToken cancellationToken);
Task SendAsync(byte[] bytes);
}
}
11 changes: 5 additions & 6 deletions src/Point/Point.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ public class Point : BackgroundService
private readonly Random _random;
private readonly bool _simulateRandomTimeouts;
private bool _initialized;
AsyncDuplexStreamingCall<SciPacket, SciPacket>? _currentConnection;
private readonly PointState _pointState;
public PointState PointState { get { return _pointState; } }

Expand Down Expand Up @@ -77,7 +76,7 @@ public async Task SimulateUnintendedPosition()
_pointState.PointPosition = PointPosition.UnintendetPosition;


if (_currentConnection != null)
if (_connection != null)
{
await _connection.SendPointPosition(PointState);
}
Expand Down Expand Up @@ -126,7 +125,7 @@ public async Task SetDegraded(PointDegradedMessage message)
{
_pointState.PointPosition = PointPosition.NoEndposition;

if (_currentConnection != null)
if (_connection != null)
{
var degradedPointPosition = AllPointMachinesCrucial ?
DegradedPointPosition.NotApplicable : GetDegradedPointPosition(_pointState.PointPosition);
Expand All @@ -152,7 +151,7 @@ public async Task SetDegraded(PointDegradedMessage message)
public async Task PutInEndPosition()
{

if (_currentConnection != null)
if (_connection != null)
{
if (_pointState.PointPosition != PointPosition.Right &&
_pointState.PointPosition != PointPosition.Left)
Expand All @@ -171,7 +170,7 @@ public async Task PutInEndPosition()
if (finalPosition != null)
{
UpdatePointState((PointPosition)finalPosition, reportedDegradedPointPosition);
_connection.SendPointPosition(PointState);
await _connection.SendPointPosition(PointState);
}
}
}
Expand Down Expand Up @@ -205,7 +204,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
if ((commandedPointPosition == PointPosition.Left && _pointState.PointPosition == PointPosition.Left)
|| (commandedPointPosition == PointPosition.Right && _pointState.PointPosition == PointPosition.Right))
{
_connection.SendPointPosition(PointState);
await _connection.SendPointPosition(PointState);
continue;
}

Expand Down
12 changes: 9 additions & 3 deletions src/Point/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ public void ConfigureServices(IServiceCollection services)
services.AddGrpc();
services.AddGrpcReflection();

services.AddSingleton<IPointToInterlockingConnection, PointToInterlockingConnection>();
services.AddSingleton<IPointToInterlockingConnection>(x =>
{
return new PointToInterlockingConnection(x.GetRequiredService<ILogger<PointToInterlockingConnection>>(), x.GetRequiredService<IConfiguration>(), CancellationToken.None);
});

// In production, the React files will be served from this directory
services.AddSpaStaticFiles(configuration =>
{
configuration.RootPath = "rasta-point-web/build";
});

try {
try
{
services.AddSingleton<Point>();
} catch (Exception e) {
}
catch (Exception e)
{
Console.WriteLine($"Usage: --PointSettings:LocalId=<> --PointSettings:LocalRastaId=<> --PointSettings:RemoteId=<> --PointSettings:RemoteEndpoint=<>. {e.Message}");
Environment.Exit(1);
}
Expand Down

0 comments on commit a43e3eb

Please sign in to comment.