Skip to content

Commit

Permalink
fix: message serialisation to custom type not being casted to T in em…
Browse files Browse the repository at this point in the history
…itEvent method
  • Loading branch information
mohitpubnub committed Nov 21, 2024
1 parent 7ab51cc commit 22b0e63
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 51 deletions.
3 changes: 1 addition & 2 deletions src/Api/PubnubApi/EventEngine/Common/EventEmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public void RemoveListener(SubscribeCallback listener, string[] channels, string

public void EmitEvent<T>(object e)
{
Message<T> eventData = e as Message<T>;

var eventData = e as Message<object>;
string currentMessageChannel = eventData.Channel;
string currentMessageChannelGroup = eventData.SubscriptionMatch;

Expand Down
118 changes: 69 additions & 49 deletions src/Api/PubnubApi/EventEngine/Subscribe/Effects/EmitMessagesHandler.cs
Original file line number Diff line number Diff line change
@@ -1,67 +1,87 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using PubnubApi.EventEngine.Common;
using PubnubApi.EventEngine.Core;
using PubnubApi.EventEngine.Subscribe.Invocations;

namespace PubnubApi.EventEngine.Subscribe.Effects
{
public class EmitMessagesHandler : EffectHandler<EmitMessagesInvocation>
{
private readonly Dictionary<string, Type> channelTypeMap;
private readonly Dictionary<string, Type> channelGroupTypeMap;
private readonly IJsonPluggableLibrary jsonPluggableLibrary;
private readonly EventEmitter eventEmitter;
public class EmitMessagesHandler : EffectHandler<EmitMessagesInvocation>
{
private readonly Dictionary<string, Type> channelTypeMap;
private readonly Dictionary<string, Type> channelGroupTypeMap;
private readonly IJsonPluggableLibrary jsonPluggableLibrary;
private readonly EventEmitter eventEmitter;

public EmitMessagesHandler(EventEmitter eventEmitter,
IJsonPluggableLibrary jsonPluggableLibrary,
Dictionary<string, Type> channelTypeMap = null,
Dictionary<string, Type> channelGroupTypeMap = null)
{
this.eventEmitter = eventEmitter;
this.channelTypeMap = channelTypeMap;
this.channelGroupTypeMap = channelGroupTypeMap;
this.jsonPluggableLibrary = jsonPluggableLibrary;
}
public EmitMessagesHandler(EventEmitter eventEmitter,
IJsonPluggableLibrary jsonPluggableLibrary,
Dictionary<string, Type> channelTypeMap = null,
Dictionary<string, Type> channelGroupTypeMap = null)
{
this.eventEmitter = eventEmitter;
this.channelTypeMap = channelTypeMap;
this.channelGroupTypeMap = channelGroupTypeMap;
this.jsonPluggableLibrary = jsonPluggableLibrary;
}

public async override Task Run(EmitMessagesInvocation invocation)
{
var processedMessages = invocation.Messages?.Messages?.Select(m => {
m.Payload = DeserializePayload(m.Channel, m.Payload);
return m;
});
public async override Task Run(EmitMessagesInvocation invocation)
{
var processedMessages = invocation.Messages?.Messages?.Select(m => {
m.Payload = DeserializePayload(m.Channel, m.Payload);
return m;
});

if (processedMessages is null) return;
if (processedMessages is null) return;

foreach (var message in processedMessages) {
eventEmitter.EmitEvent<object>(message);
}
}
foreach (var message in processedMessages)
{
Type type = MessageTypeValue(message.SubscriptionMatch??message.Channel);
var methodInfo = eventEmitter.GetType().GetMethod("EmitEvent");
var genericMethod = methodInfo?.MakeGenericMethod([type]);
genericMethod?.Invoke(eventEmitter, [message]);
}
}

private object DeserializePayload(string key, object rawMessage)
{
try {
Type t;
if ((channelTypeMap is not null && channelTypeMap.TryGetValue(key, out t) ||
channelGroupTypeMap is not null && channelGroupTypeMap.TryGetValue(key, out t)) &&
t != typeof(string))
{
return jsonPluggableLibrary.DeserializeToObject(rawMessage, t);
} else {
return rawMessage.ToString();
}
} catch (Exception) {
return rawMessage;
}
}
private Type MessageTypeValue(string channel)
{
try {
Type t;
if ((channelTypeMap is not null && channelTypeMap.TryGetValue(channel, out t) ||
channelGroupTypeMap is not null && channelGroupTypeMap.TryGetValue(channel, out t)) &&
t != typeof(string))
{
return t;
}
return typeof(string);
} catch (Exception) {
return typeof(object);
}
}
private object DeserializePayload(string key, object rawMessage)
{
try {
Type t;
if ((channelTypeMap is not null && channelTypeMap.TryGetValue(key, out t) ||
channelGroupTypeMap is not null && channelGroupTypeMap.TryGetValue(key, out t)) &&
t != typeof(string))
{
return jsonPluggableLibrary.DeserializeToObject(rawMessage, t);
} else {
return rawMessage.ToString();
}
} catch (Exception) {
return rawMessage;
}
}

public override bool IsBackground(EmitMessagesInvocation invocation) => false;
public override bool IsBackground(EmitMessagesInvocation invocation) => false;

public override Task Cancel()
{
throw new NotImplementedException();
}
}
public override Task Cancel()
{
throw new NotImplementedException();
}
}
}

0 comments on commit 22b0e63

Please sign in to comment.