Skip to content

Commit

Permalink
base: Move some direct crosstalk to callback system
Browse files Browse the repository at this point in the history
  • Loading branch information
tt2468 committed Apr 23, 2024
1 parent 9123879 commit 74719ce
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 76 deletions.
68 changes: 28 additions & 40 deletions src/eventhandler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,49 +73,37 @@ EventHandler::~EventHandler()
blog_debug("[EventHandler::~EventHandler] Finished.");
}

void EventHandler::SetBroadcastCallback(EventHandler::BroadcastCallback cb)
// Function to increment or decrement refcounts for high volume event subscriptions
void EventHandler::ProcessSubscriptionChange(bool type, uint64_t eventSubscriptions)
{
_broadcastCallback = cb;
}

void EventHandler::SetObsReadyCallback(EventHandler::ObsReadyCallback cb)
{
_obsReadyCallback = cb;
}

// Function to increment refcounts for high volume event subscriptions
void EventHandler::ProcessSubscription(uint64_t eventSubscriptions)
{
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
if (_inputVolumeMetersRef.fetch_add(1) == 0) {
if (_inputVolumeMetersHandler)
blog(LOG_WARNING, "[EventHandler::ProcessSubscription] Input volume meter handler already exists!");
else
_inputVolumeMetersHandler = std::make_unique<Utils::Obs::VolumeMeter::Handler>(
std::bind(&EventHandler::HandleInputVolumeMeters, this, std::placeholders::_1));
if (type) {
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
if (_inputVolumeMetersRef.fetch_add(1) == 0) {
if (_inputVolumeMetersHandler)
blog(LOG_WARNING, "[EventHandler::ProcessSubscription] Input volume meter handler already exists!");
else
_inputVolumeMetersHandler = std::make_unique<Utils::Obs::VolumeMeter::Handler>(
std::bind(&EventHandler::HandleInputVolumeMeters, this, std::placeholders::_1));
}
}
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
_inputActiveStateChangedRef++;
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
_inputShowStateChangedRef++;
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
_sceneItemTransformChangedRef++;
} else {
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
if (_inputVolumeMetersRef.fetch_sub(1) == 1)
_inputVolumeMetersHandler.reset();
}
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
_inputActiveStateChangedRef--;
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
_inputShowStateChangedRef--;
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
_sceneItemTransformChangedRef--;
}
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
_inputActiveStateChangedRef++;
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
_inputShowStateChangedRef++;
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
_sceneItemTransformChangedRef++;
}

// Function to decrement refcounts for high volume event subscriptions
void EventHandler::ProcessUnsubscription(uint64_t eventSubscriptions)
{
if ((eventSubscriptions & EventSubscription::InputVolumeMeters) != 0) {
if (_inputVolumeMetersRef.fetch_sub(1) == 1)
_inputVolumeMetersHandler.reset();
}
if ((eventSubscriptions & EventSubscription::InputActiveStateChanged) != 0)
_inputActiveStateChangedRef--;
if ((eventSubscriptions & EventSubscription::InputShowStateChanged) != 0)
_inputShowStateChangedRef--;
if ((eventSubscriptions & EventSubscription::SceneItemTransformChanged) != 0)
_sceneItemTransformChangedRef--;
}

// Function required in order to use default arguments
Expand Down
14 changes: 10 additions & 4 deletions src/eventhandler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,18 @@ class EventHandler {

typedef std::function<void(uint64_t, std::string, json, uint8_t)>
BroadcastCallback; // uint64_t requiredIntent, std::string eventType, json eventData, uint8_t rpcVersion
void SetBroadcastCallback(BroadcastCallback cb);
inline void SetBroadcastCallback(BroadcastCallback cb)
{
_broadcastCallback = cb;
}

typedef std::function<void(bool)> ObsReadyCallback; // bool ready
void SetObsReadyCallback(ObsReadyCallback cb);
inline void SetObsReadyCallback(ObsReadyCallback cb)
{
_obsReadyCallback = cb;
}

void ProcessSubscription(uint64_t eventSubscriptions);
void ProcessUnsubscription(uint64_t eventSubscriptions);
void ProcessSubscriptionChange(bool type, uint64_t eventSubscriptions);

private:
BroadcastCallback _broadcastCallback;
Expand Down
10 changes: 10 additions & 0 deletions src/obs-websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ bool obs_module_load(void)
// Initialize the WebSocket server
_webSocketServer = std::make_shared<WebSocketServer>();

// Attach event handlers between WebSocket server and event handler
_eventHandler->SetBroadcastCallback(std::bind(&WebSocketServer::BroadcastEvent, _webSocketServer.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
_eventHandler->SetObsReadyCallback(std::bind(&WebSocketServer::SetObsReady, _webSocketServer.get(), std::placeholders::_1));
_webSocketServer->SetClientSubscriptionCallback(std::bind(&EventHandler::ProcessSubscriptionChange, _eventHandler.get(), std::placeholders::_1, std::placeholders::_2));

// Initialize the settings dialog
obs_frontend_push_ui_translation(obs_module_get_string);
QMainWindow *mainWindow = static_cast<QMainWindow *>(obs_frontend_get_main_window());
Expand Down Expand Up @@ -123,6 +128,11 @@ void obs_module_unload(void)
_webSocketServer->Stop();
}

// Disconnect event handler from WebSocket server
_eventHandler->SetObsReadyCallback(nullptr);
_eventHandler->SetBroadcastCallback(nullptr);
_webSocketServer->SetClientSubscriptionCallback(nullptr);

// Release the WebSocket server
_webSocketServer = nullptr;

Expand Down
24 changes: 4 additions & 20 deletions src/websocketserver/WebSocketServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ with this program. If not, see <https://www.gnu.org/licenses/>
#include <obs-frontend-api.h>

#include "WebSocketServer.h"
#include "../eventhandler/EventHandler.h"
#include "../obs-websocket.h"
#include "../Config.h"
#include "../utils/Crypto.h"
Expand All @@ -47,23 +46,10 @@ WebSocketServer::WebSocketServer() : QObject(nullptr)
_server.set_close_handler(websocketpp::lib::bind(&WebSocketServer::onClose, this, websocketpp::lib::placeholders::_1));
_server.set_message_handler(websocketpp::lib::bind(&WebSocketServer::onMessage, this, websocketpp::lib::placeholders::_1,
websocketpp::lib::placeholders::_2));

auto eventHandler = GetEventHandler();
if (eventHandler) {
eventHandler->SetBroadcastCallback(std::bind(&WebSocketServer::BroadcastEvent, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
eventHandler->SetObsReadyCallback(std::bind(&WebSocketServer::onObsReady, this, std::placeholders::_1));
}
}

WebSocketServer::~WebSocketServer()
{
auto eventHandler = GetEventHandler();
if (eventHandler) {
eventHandler->SetObsReadyCallback(nullptr);
eventHandler->SetBroadcastCallback(nullptr);
}

if (_server.is_listening())
Stop();
}
Expand Down Expand Up @@ -215,7 +201,7 @@ std::vector<WebSocketServer::WebSocketSessionState> WebSocketServer::GetWebSocke
return webSocketSessions;
}

void WebSocketServer::onObsReady(bool ready)
void WebSocketServer::SetObsReady(bool ready)
{
_obsReady = ready;
}
Expand Down Expand Up @@ -327,11 +313,9 @@ void WebSocketServer::onClose(websocketpp::connection_hdl hdl)
_sessions.erase(hdl);
lock.unlock();

// If client was identified, decrement appropriate refs in eventhandler.
if (isIdentified) {
auto eventHandler = GetEventHandler();
eventHandler->ProcessUnsubscription(eventSubscriptions);
}
// If client was identified, announce unsubscription
if (isIdentified && _clientSubscriptionCallback)
_clientSubscriptionCallback(false, eventSubscriptions);

// Build SessionState object for signal
WebSocketSessionState state;
Expand Down
15 changes: 12 additions & 3 deletions src/websocketserver/WebSocketServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,20 @@ class WebSocketServer : QObject {
void InvalidateSession(websocketpp::connection_hdl hdl);
void BroadcastEvent(uint64_t requiredIntent, const std::string &eventType, const json &eventData = nullptr,
uint8_t rpcVersion = 0);
void SetObsReady(bool ready);

bool IsListening() { return _server.is_listening(); }
// Callback for when a client subscribes or unsubscribes. `true` for sub, `false` for unsub
typedef std::function<void(bool, uint64_t)> ClientSubscriptionCallback; // bool type, uint64_t eventSubscriptions
inline void SetClientSubscriptionCallback(ClientSubscriptionCallback cb)
{
_clientSubscriptionCallback = cb;
}

inline bool IsListening() { return _server.is_listening(); }

std::vector<WebSocketSessionState> GetWebSocketSessions();

QThreadPool *GetThreadPool() { return &_threadPool; }
inline QThreadPool *GetThreadPool() { return &_threadPool; }

signals:
void ClientConnected(WebSocketSessionState state);
Expand All @@ -77,7 +85,6 @@ class WebSocketServer : QObject {

void ServerRunner();

void onObsReady(bool loaded);
bool onValidate(websocketpp::connection_hdl hdl);
void onOpen(websocketpp::connection_hdl hdl);
void onClose(websocketpp::connection_hdl hdl);
Expand All @@ -98,4 +105,6 @@ class WebSocketServer : QObject {
std::map<websocketpp::connection_hdl, SessionPtr, std::owner_less<websocketpp::connection_hdl>> _sessions;

std::atomic<bool> _obsReady = false;

ClientSubscriptionCallback _clientSubscriptionCallback;
};
18 changes: 9 additions & 9 deletions src/websocketserver/WebSocketServer_Protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ with this program. If not, see <https://www.gnu.org/licenses/>
#include "WebSocketServer.h"
#include "../requesthandler/RequestHandler.h"
#include "../requesthandler/RequestBatchHandler.h"
#include "../eventhandler/EventHandler.h"
#include "../obs-websocket.h"
#include "../Config.h"
#include "../utils/Crypto.h"
Expand Down Expand Up @@ -149,9 +148,9 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
if (ret.closeCode != WebSocketCloseCode::DontClose)
return;

// Increment refs for event subscriptions
auto eventHandler = GetEventHandler();
eventHandler->ProcessSubscription(session->EventSubscriptions());
// Announce subscribe
if (_clientSubscriptionCallback)
_clientSubscriptionCallback(true, session->EventSubscriptions());

// Mark session as identified
session->SetIsIdentified(true);
Expand All @@ -172,16 +171,17 @@ void WebSocketServer::ProcessMessage(SessionPtr session, WebSocketServer::Proces
case WebSocketOpCode::Reidentify: { // Reidentify
std::unique_lock<std::mutex> sessionLock(session->OperationMutex);

// Decrement refs for current subscriptions
auto eventHandler = GetEventHandler();
eventHandler->ProcessUnsubscription(session->EventSubscriptions());
// Announce unsubscribe
if (_clientSubscriptionCallback)
_clientSubscriptionCallback(false, session->EventSubscriptions());

SetSessionParameters(session, ret, payloadData);
if (ret.closeCode != WebSocketCloseCode::DontClose)
return;

// Increment refs for new subscriptions
eventHandler->ProcessSubscription(session->EventSubscriptions());
// Announce subscribe
if (_clientSubscriptionCallback)
_clientSubscriptionCallback(true, session->EventSubscriptions());

ret.result["op"] = WebSocketOpCode::Identified;
ret.result["d"]["negotiatedRpcVersion"] = session->RpcVersion();
Expand Down

0 comments on commit 74719ce

Please sign in to comment.