Skip to content
This repository has been archived by the owner on Sep 28, 2019. It is now read-only.

Commit

Permalink
change Pub/Sub
Browse files Browse the repository at this point in the history
  • Loading branch information
minhdtb83 committed Feb 1, 2016
1 parent 7aea8b2 commit 15cf085
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 19 deletions.
9 changes: 2 additions & 7 deletions IEC60870/IEC60870.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="PubSub, Version=1.4.1.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\PubSub.1.4.1\lib\portable-net45+sl5+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\PubSub.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
Expand Down Expand Up @@ -102,12 +98,11 @@
<Compile Include="SAP\ServerSAP.cs" />
<Compile Include="Util\CountDownLatch.cs" />
<Compile Include="Util\PeriodicTaskFactory.cs" />
<Compile Include="Util\PubSubExtensions.cs" />
<Compile Include="Util\PubSubHub.cs" />
<Compile Include="Util\ThreadBase.cs" />
</ItemGroup>
<ItemGroup />
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
Expand Down
15 changes: 7 additions & 8 deletions IEC60870/SAP/ServerSAP.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.IO;
using System.Net;
using System.Net.Sockets;
using PubSub;

namespace IEC60870.SAP
{
Expand All @@ -19,25 +18,25 @@ public ConnectionHandler(Socket socket, ConnectionSettings settings): base()
_socket = socket;
_settings = settings;

this.Subscribe<ASdu>(asdu =>
this.Subscribe<ASdu>("send", asdu =>
{
try
{
serverConnection.Send(asdu);
}
catch (Exception e)
{
Console.WriteLine(e);
this.Publish("error", e);
}
});
}

public override void Run()
{
serverConnection = new Connection(_socket, _settings);
serverConnection.ConnectionClosed += (a) =>
serverConnection.ConnectionClosed += e =>
{
Console.WriteLine(a);
this.Publish<Exception>("error", e);
};

serverConnection.WaitForStartDT(5000);
Expand Down Expand Up @@ -73,11 +72,11 @@ public override void Run()
}
catch (IOException e)
{
Console.WriteLine(e);
this.Publish<Exception>("error", e);
}
catch (Exception e)
{
Console.WriteLine(e);
this.Publish("error", e);
break;
}
}
Expand Down Expand Up @@ -132,7 +131,7 @@ public void StartListen(int backlog)

public void SendASdu(ASdu asdu)
{
this.Publish(asdu);
this.Publish("send", asdu);
}

public void SetMessageFragmentTimeout(int timeout)
Expand Down
19 changes: 19 additions & 0 deletions IEC60870/Util/PubSubExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;

namespace IEC60870.Util
{
static public class PubSubExtensions
{
static private readonly PubSubHub hub = new PubSubHub();

static public void Publish<T>(this object obj, string topic, T data)
{
hub.Publish(obj, topic, data);
}

static public void Subscribe<T>(this object obj, string topic, Action<T> handler)
{
hub.Subscribe(obj, topic, handler);
}
}
}
66 changes: 66 additions & 0 deletions IEC60870/Util/PubSubHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Collections.Generic;

namespace IEC60870.Util
{
public class PubSubHub
{
internal class Handler
{
public Delegate Action { get; set; }
public WeakReference Sender { get; set; }
public Type Type { get; set; }
}

internal object locker = new object();
internal Dictionary<string, List<Handler>> handlers = new Dictionary<string, List<Handler>>();

public void Publish<T>(object sender, string topic, T data)
{
if (handlers.ContainsKey(topic))
{
var listHandler = handlers[topic];
listHandler.ForEach(handler =>
{
if (handler.Sender.IsAlive)
{
((Action<T>)handler.Action)(data);
}
else
{
lock (locker)
{
handlers[topic].Remove(handler);
}
}
});
}
}

public void Subscribe<T>(object sender, string topic, Action<T> handler)
{
var item = new Handler
{
Action = handler,
Sender = new WeakReference(sender),
Type = typeof(T)
};

if (handlers.ContainsKey(topic))
{
lock (locker)
{
handlers[topic].Add(item);
}
}
else
{
lock (locker)
{
handlers[topic] = new List<Handler>();
handlers[topic].Add(item);
}
}
}
}
}
4 changes: 0 additions & 4 deletions IEC60870/packages.config

This file was deleted.

0 comments on commit 15cf085

Please sign in to comment.