From a43e3eb45447e9e818d3974d9127c7d6f9c4f6dd Mon Sep 17 00:00:00 2001 From: Benedikt Schenkel Date: Tue, 28 Nov 2023 16:14:46 +0100 Subject: [PATCH] resolve comments --- .../PointToInterlockingConnection.cs} | 39 +++++++++------- src/Point/Connections/GrpcConnection.cs | 45 +++++++++++++++++++ .../PointStateBaseline4R1.cs | 2 + src/Point/Interfaces/IConnection.cs | 8 ++++ src/Point/Point.cs | 11 +++-- src/Point/Startup.cs | 12 +++-- 6 files changed, 91 insertions(+), 26 deletions(-) rename src/Point/{PointToInterlockingConnectionB4R1Impl.cs => Connections/Baseline4R1/PointToInterlockingConnection.cs} (82%) create mode 100644 src/Point/Connections/GrpcConnection.cs rename src/Point/{ => Connections}/PointStateBaseline4R1.cs (95%) create mode 100644 src/Point/Interfaces/IConnection.cs diff --git a/src/Point/PointToInterlockingConnectionB4R1Impl.cs b/src/Point/Connections/Baseline4R1/PointToInterlockingConnection.cs similarity index 82% rename from src/Point/PointToInterlockingConnectionB4R1Impl.cs rename to src/Point/Connections/Baseline4R1/PointToInterlockingConnection.cs index d3de42c..d43018e 100644 --- a/src/Point/PointToInterlockingConnectionB4R1Impl.cs +++ b/src/Point/Connections/Baseline4R1/PointToInterlockingConnection.cs @@ -1,12 +1,8 @@ using EulynxLive.Messages.Baseline4R1; -using EulynxLive.Point; -using Google.Protobuf; using Grpc.Core; -using Grpc.Net.Client; -using Sci; -using EulynxLive.Messages.IPointToInterlockingConnection; using static EulynxLive.Messages.IPointToInterlockingConnection.IPointToInterlockingConnection; -using static Sci.Rasta; +using EulynxLive.Point.Interfaces; +using EulynxLive.Messages.IPointToInterlockingConnection; namespace EulynxLive.Point.EulynxBaseline4R1; @@ -17,13 +13,19 @@ public class PointToInterlockingConnection : IPointToInterlockingConnection private readonly string _localRastaId; private readonly string _remoteId; private readonly string _remoteEndpoint; - AsyncDuplexStreamingCall? _currentConnection; + IConnection? _currentConnection; private CancellationTokenSource _timeout; + private int _timeoutDuration; + private CancellationToken _stoppingToken; public PointToInterlockingConnection( ILogger logger, - IConfiguration configuration) + IConfiguration configuration, + CancellationToken stoppingToken, int timeoutDuration = 10000) { + _timeoutDuration = timeoutDuration; + _stoppingToken = stoppingToken; + _timeout = new CancellationTokenSource(); _logger = logger; _currentConnection = null; @@ -32,18 +34,14 @@ public PointToInterlockingConnection( _localRastaId = config.LocalRastaId.ToString(); _remoteId = config.RemoteId; _remoteEndpoint = config.RemoteEndpoint; - _timeout = new CancellationTokenSource(); } public void Connect() { - var channel = GrpcChannel.ForAddress(_remoteEndpoint); - var client = new RastaClient(channel); + ResetTimeout(); _logger.LogTrace("Connecting..."); - _timeout = new CancellationTokenSource(); - _timeout.CancelAfter(10000); var metadata = new Metadata { { "rasta-id", _localRastaId } }; - _currentConnection = client.Stream(metadata, cancellationToken: _timeout.Token); + _currentConnection = new GrpcConnection(metadata, _remoteEndpoint, _timeout.Token); } public async Task InitializeConnection(PointState state) @@ -97,6 +95,7 @@ async public Task SendTimeoutMessage() { PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsARightHandPointMoving => PointPosition.Right, PointMovePointCommandCommandedPointPosition.SubsystemElectronicInterlockingRequestsALeftHandPointMoving => PointPosition.Left, + _ => throw new global::System.NotImplementedException(), } : null; } @@ -108,15 +107,15 @@ public void Dispose() private async Task SendMessage(Message message) { if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?"); - await _currentConnection.RequestStream.WriteAsync(new SciPacket() { Message = ByteString.CopyFrom(message.ToByteArray()) }); + await _currentConnection.SendAsync(message.ToByteArray()); } private async Task ReceiveMessage() where T : Message { if (_currentConnection == null) throw new InvalidOperationException("Connection is null. Did you call Connect()?"); - if (!await _currentConnection.ResponseStream.MoveNext(_timeout.Token)) return null; + ResetTimeout(); - var message = Message.FromBytes(_currentConnection.ResponseStream.Current.Message.ToByteArray()); + var message = Message.FromBytes(await _currentConnection.ReceiveAsync(_timeout.Token)); if (message is not T) { _logger.LogError("Unexpected message: {}", message); @@ -124,4 +123,10 @@ private async Task SendMessage(Message message) } return message as T; } + + private void ResetTimeout() + { + _timeout = CancellationTokenSource.CreateLinkedTokenSource(_stoppingToken); + _timeout.CancelAfter(_timeoutDuration); + } } diff --git a/src/Point/Connections/GrpcConnection.cs b/src/Point/Connections/GrpcConnection.cs new file mode 100644 index 0000000..e7f71a0 --- /dev/null +++ b/src/Point/Connections/GrpcConnection.cs @@ -0,0 +1,45 @@ +using EulynxLive.Point.Interfaces; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Net.Client; +using Sci; +using static Sci.Rasta; + +namespace EulynxLive.Point; + +class GrpcConnection : IConnection +{ + class GrpcConnectionException : Exception + { + public GrpcConnectionException(string message) : base(message) { } + } + + AsyncDuplexStreamingCall? _connection; + private CancellationToken _stoppingToken; + + public GrpcConnection(Metadata? metadata, string remoteEndpoint, CancellationToken stoppingToken) + { + _stoppingToken = stoppingToken; + + var channel = GrpcChannel.ForAddress(remoteEndpoint); + var client = new RastaClient(channel); + _connection = client.Stream(metadata, cancellationToken: stoppingToken); + } + public void Dispose() + { + _connection?.Dispose(); + } + + public async Task ReceiveAsync(CancellationToken cancellationToken) + { + if (_connection == null) throw new InvalidOperationException("Grpc connection not connected."); + if (!await _connection.ResponseStream.MoveNext(cancellationToken)) throw new GrpcConnectionException("Could not receive grpc message."); + return _connection.ResponseStream.Current.Message.ToByteArray(); + } + + public async Task SendAsync(byte[] bytes) + { + if (_connection == null) throw new InvalidOperationException("Grpc connection not connected."); + await _connection.RequestStream.WriteAsync(new SciPacket() { Message = ByteString.CopyFrom(bytes) }); + } +} diff --git a/src/Point/PointStateBaseline4R1.cs b/src/Point/Connections/PointStateBaseline4R1.cs similarity index 95% rename from src/Point/PointStateBaseline4R1.cs rename to src/Point/Connections/PointStateBaseline4R1.cs index 69e16a3..3599b9a 100644 --- a/src/Point/PointStateBaseline4R1.cs +++ b/src/Point/Connections/PointStateBaseline4R1.cs @@ -20,6 +20,7 @@ public PointStateBaseline4R1(PointState state) IPointToInterlockingConnection.PointPosition.Right => PointPointPositionMessageReportedPointPosition.PointIsInARightHandPositionDefinedEndPosition, IPointToInterlockingConnection.PointPosition.UnintendetPosition => PointPointPositionMessageReportedPointPosition.PointIsTrailed, IPointToInterlockingConnection.PointPosition.NoEndposition => PointPointPositionMessageReportedPointPosition.PointIsInNoEndPosition, + _ => throw new NotImplementedException(), }; private PointPointPositionMessageReportedDegradedPointPosition MapInterfaceDegradedPointPositionToConcrete(DegradedPointPosition value) => value switch @@ -28,5 +29,6 @@ public PointStateBaseline4R1(PointState state) IPointToInterlockingConnection.DegradedPointPosition.DegradedRight => PointPointPositionMessageReportedDegradedPointPosition.PointIsInADegradedRightHandPosition, IPointToInterlockingConnection.DegradedPointPosition.NotDegraded => PointPointPositionMessageReportedDegradedPointPosition.PointIsNotInADegradedPosition, IPointToInterlockingConnection.DegradedPointPosition.NotApplicable => PointPointPositionMessageReportedDegradedPointPosition.DegradedPointPositionIsNotApplicable, + _ => throw new NotImplementedException(), }; } diff --git a/src/Point/Interfaces/IConnection.cs b/src/Point/Interfaces/IConnection.cs new file mode 100644 index 0000000..9b68e95 --- /dev/null +++ b/src/Point/Interfaces/IConnection.cs @@ -0,0 +1,8 @@ +namespace EulynxLive.Point.Interfaces +{ + interface IConnection : IDisposable + { + Task ReceiveAsync(CancellationToken cancellationToken); + Task SendAsync(byte[] bytes); + } +} diff --git a/src/Point/Point.cs b/src/Point/Point.cs index 2b23394..3ed48b4 100644 --- a/src/Point/Point.cs +++ b/src/Point/Point.cs @@ -21,7 +21,6 @@ public class Point : BackgroundService private readonly Random _random; private readonly bool _simulateRandomTimeouts; private bool _initialized; - AsyncDuplexStreamingCall? _currentConnection; private readonly PointState _pointState; public PointState PointState { get { return _pointState; } } @@ -77,7 +76,7 @@ public async Task SimulateUnintendedPosition() _pointState.PointPosition = PointPosition.UnintendetPosition; - if (_currentConnection != null) + if (_connection != null) { await _connection.SendPointPosition(PointState); } @@ -126,7 +125,7 @@ public async Task SetDegraded(PointDegradedMessage message) { _pointState.PointPosition = PointPosition.NoEndposition; - if (_currentConnection != null) + if (_connection != null) { var degradedPointPosition = AllPointMachinesCrucial ? DegradedPointPosition.NotApplicable : GetDegradedPointPosition(_pointState.PointPosition); @@ -152,7 +151,7 @@ public async Task SetDegraded(PointDegradedMessage message) public async Task PutInEndPosition() { - if (_currentConnection != null) + if (_connection != null) { if (_pointState.PointPosition != PointPosition.Right && _pointState.PointPosition != PointPosition.Left) @@ -171,7 +170,7 @@ public async Task PutInEndPosition() if (finalPosition != null) { UpdatePointState((PointPosition)finalPosition, reportedDegradedPointPosition); - _connection.SendPointPosition(PointState); + await _connection.SendPointPosition(PointState); } } } @@ -205,7 +204,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) if ((commandedPointPosition == PointPosition.Left && _pointState.PointPosition == PointPosition.Left) || (commandedPointPosition == PointPosition.Right && _pointState.PointPosition == PointPosition.Right)) { - _connection.SendPointPosition(PointState); + await _connection.SendPointPosition(PointState); continue; } diff --git a/src/Point/Startup.cs b/src/Point/Startup.cs index ff9c513..4e67cf7 100644 --- a/src/Point/Startup.cs +++ b/src/Point/Startup.cs @@ -21,7 +21,10 @@ public void ConfigureServices(IServiceCollection services) services.AddGrpc(); services.AddGrpcReflection(); - services.AddSingleton(); + services.AddSingleton(x => + { + return new PointToInterlockingConnection(x.GetRequiredService>(), x.GetRequiredService(), CancellationToken.None); + }); // In production, the React files will be served from this directory services.AddSpaStaticFiles(configuration => @@ -29,9 +32,12 @@ public void ConfigureServices(IServiceCollection services) configuration.RootPath = "rasta-point-web/build"; }); - try { + try + { services.AddSingleton(); - } catch (Exception e) { + } + catch (Exception e) + { Console.WriteLine($"Usage: --PointSettings:LocalId=<> --PointSettings:LocalRastaId=<> --PointSettings:RemoteId=<> --PointSettings:RemoteEndpoint=<>. {e.Message}"); Environment.Exit(1); }