diff --git a/.gitmodules b/.gitmodules index d5d1a187478..48b1e677402 100644 --- a/.gitmodules +++ b/.gitmodules @@ -112,3 +112,6 @@ [submodule "3rdparty/unordered_dense"] path = 3rdparty/unordered_dense url = https://github.com/martinus/unordered_dense +[submodule "3rdparty/async-mqtt5"] + path = 3rdparty/async-mqtt5 + url = https://github.com/mireo/async-mqtt5 diff --git a/3rdparty/async-mqtt5 b/3rdparty/async-mqtt5 new file mode 160000 index 00000000000..aea5175e8a7 --- /dev/null +++ b/3rdparty/async-mqtt5 @@ -0,0 +1 @@ +Subproject commit aea5175e8a7260d204cc986744de1ddc7a7231be diff --git a/cmake/OssiaDeps.cmake b/cmake/OssiaDeps.cmake index 064f69c3359..18284ef717f 100644 --- a/cmake/OssiaDeps.cmake +++ b/cmake/OssiaDeps.cmake @@ -99,6 +99,13 @@ include(deps/unordered_dense) include(deps/verdigris) include(deps/websocketpp) +if(OSSIA_PROTOCOL_MQTT5) + include(deps/mqtt) + if(NOT TARGET Async::MQTT5) + set(OSSIA_PROTOCOL_MQTT5 FALSE CACHE INTERNAL "" FORCE) + endif() +endif() + if(OSSIA_PROTOCOL_MIDI) include(deps/libremidi) endif() diff --git a/cmake/OssiaOptions.cmake b/cmake/OssiaOptions.cmake index 6b9f4afc1f7..b5bb7a9a020 100644 --- a/cmake/OssiaOptions.cmake +++ b/cmake/OssiaOptions.cmake @@ -56,6 +56,7 @@ option(OSSIA_PROTOCOL_MIDI "Enable MIDI protocol" ON) option(OSSIA_PROTOCOL_OSC "Enable OSC protocol" ON) option(OSSIA_PROTOCOL_MINUIT "Enable Minuit protocol" ON) option(OSSIA_PROTOCOL_OSCQUERY "Enable OSCQuery protocol" ON) +option(OSSIA_PROTOCOL_MQTT5 "Enable MQTT 5 protocol" ON) option(OSSIA_PROTOCOL_HTTP "Enable HTTP protocol" ON) # Requires Qt option(OSSIA_PROTOCOL_WEBSOCKETS "Enable WebSockets protocol" OFF) # Requires Qt option(OSSIA_PROTOCOL_SERIAL "Enable Serial port protocol" OFF) # Requires Qt @@ -89,6 +90,7 @@ set(OSSIA_AVAILABLE_PROTOCOLS WIIMOTE ARTNET LIBMAPPER + MQTT5 ) set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH};${PROJECT_SOURCE_DIR}/CMake;${PROJECT_SOURCE_DIR}/cmake/cmake-modules;") diff --git a/cmake/deps/mqtt.cmake b/cmake/deps/mqtt.cmake new file mode 100644 index 00000000000..c4a4cea3b2e --- /dev/null +++ b/cmake/deps/mqtt.cmake @@ -0,0 +1,8 @@ +if(OSSIA_USE_SYSTEM_LIBRARIES) + find_package(async-mqtt5 CONFIG GLOBAL) +endif() + +if(NOT TARGET Async::MQTT5) + add_subdirectory("${OSSIA_3RDPARTY_FOLDER}/async-mqtt5" async-mqtt5) +endif() + diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d02fe5b7975..a4dc466459a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -115,6 +115,10 @@ if(OSSIA_PROTOCOL_SERIAL) ossia_add_example(network_arduino "${CMAKE_CURRENT_SOURCE_DIR}/Network/Arduino.cpp") endif() +if(OSSIA_PROTOCOL_MQTT5) + ossia_add_example(mqtt_publish "${CMAKE_CURRENT_SOURCE_DIR}/Network/Mqtt_publication.cpp") +endif() + ossia_add_example(export_namespace_to_json "${CMAKE_CURRENT_SOURCE_DIR}/Preset/Export_namespace_to_json.cpp") ossia_add_example(logger "${CMAKE_CURRENT_SOURCE_DIR}/Common/Logger.cpp") ossia_add_example(fuzzysearch "${CMAKE_CURRENT_SOURCE_DIR}/Common/FuzzySearch.cpp") diff --git a/examples/Network/Mqtt_publication.cpp b/examples/Network/Mqtt_publication.cpp new file mode 100644 index 00000000000..0613fb632d1 --- /dev/null +++ b/examples/Network/Mqtt_publication.cpp @@ -0,0 +1,41 @@ +#include +#include +#include +#include +#include + +#include + +#include +#include + +void do_post(boost::asio::io_context& ctx, ossia::net::parameter_base& param) +{ + boost::asio::post(ctx, [&ctx, ¶m] { + param.push_value("oy! " + std::to_string(rand() % 10)); + + do_post(ctx, param); + }); +}; + +int main(int argc, char** argv) +{ + auto ctx = std::make_shared(); + + ossia::net::mqtt5_configuration conf{.host = "127.0.0.1", .port = 1883}; + auto proto = std::make_unique(ctx, conf); + ossia::net::generic_device device{std::move(proto), "P"}; + + // libossia v3: we really need to prevent live modification of the tree + // and have an api to update it async + ossia::create_parameter(device.get_root_node(), "/foo", "string") + ->set_access(ossia::access_mode::GET) + .add_callback([](const ossia::value& val) { + std::cerr << "Received: " << ossia::value_to_pretty_string(val) << "\n"; + }); + auto& bar = ossia::create_parameter(device.get_root_node(), "/bar", "string") + ->set_access(ossia::access_mode::SET); + + do_post(ctx->context, bar); + ctx->run(); +} diff --git a/examples/Network/joystick/joystick.cpp b/examples/Network/joystick/joystick.cpp index a63ba86d7cc..57391b1067e 100644 --- a/examples/Network/joystick/joystick.cpp +++ b/examples/Network/joystick/joystick.cpp @@ -18,11 +18,11 @@ int main() fmt::print("Using joystick {}\n", ossia::net::joystick_info::get_joystick_name(0)); auto ctx = ossia::net::create_network_context(); - ossia::net::generic_device source_dev{ - std::make_unique( - std::make_unique(ctx, 0, 0), - std::make_unique(5579, 5589)), - "joystick"}; + auto proto = std::make_unique(); + proto->expose_to(std::make_unique(ctx, 0, 0)); + proto->expose_to( + std::make_unique(5579, 5589)); + ossia::net::generic_device source_dev{std::move(proto), "joystick"}; source_dev.set_echo(true); auto on_message = [](const ossia::net::parameter_base& param) { diff --git a/src/ossia/network/base/protocol.cpp b/src/ossia/network/base/protocol.cpp index b95feebaa02..2c6c62bb146 100644 --- a/src/ossia/network/base/protocol.cpp +++ b/src/ossia/network/base/protocol.cpp @@ -89,4 +89,14 @@ std::future protocol_base::pull_async(parameter_base&) { return {}; } + +bool protocol_base::publish(const parameter_base&) +{ + return false; +} + +bool protocol_base::unpublish(const parameter_base&) +{ + return false; +} } diff --git a/src/ossia/network/base/protocol.hpp b/src/ossia/network/base/protocol.hpp index cb97792822e..c2983e9c3b6 100644 --- a/src/ossia/network/base/protocol.hpp +++ b/src/ossia/network/base/protocol.hpp @@ -125,6 +125,22 @@ class OSSIA_EXPORT protocol_base */ virtual bool observe(parameter_base&, bool) = 0; + /** + * @brief Notify the network that a parameter is to be published + * + * In some protocols (MQTT), this may send a message to the broker to indicate + * that a new topic is being made available for publication. + */ + virtual bool publish(const parameter_base&); + + /** + * @brief Notify the network that a parameter is to be removed + * + * In some protocols (MQTT), this may send a message to the broker to indicate + * that a previously published topic is disappearing. + */ + virtual bool unpublish(const parameter_base&); + /** * @brief Begin observation without notifying the other computers. */ diff --git a/src/ossia/protocols/mqtt/mqtt_protocol.cpp b/src/ossia/protocols/mqtt/mqtt_protocol.cpp new file mode 100644 index 00000000000..8a72b8afa3e --- /dev/null +++ b/src/ossia/protocols/mqtt/mqtt_protocol.cpp @@ -0,0 +1,224 @@ +#include "mqtt_protocol.hpp" + +#include + +#include +#include +#include +#include + +#include + +#include + +namespace ossia::net +{ +static constexpr auto use_nothrow_awaitable + = boost::asio::as_tuple(boost::asio::use_awaitable); +struct mqtt5_client +{ + async_mqtt5::mqtt_client client; + explicit mqtt5_client(boost::asio::io_context& ioc) + : client{ioc} + { + } + + boost::asio::awaitable publish(ossia::net::parameter_base& param) + { + auto&& [ec, rc, props] + = co_await client.async_publish( + param.get_node().osc_address(), ossia::convert(param.value()), + async_mqtt5::retain_e::no, async_mqtt5::publish_props{}, + use_nothrow_awaitable); + + co_return ec || !rc; + } + + boost::asio::awaitable subscribe(const std::string& topic) + { + // Configure the request to subscribe to a Topic. + async_mqtt5::subscribe_topic sub_topic = async_mqtt5::subscribe_topic{ + topic, async_mqtt5::subscribe_options{ + async_mqtt5::qos_e::exactly_once, async_mqtt5::no_local_e::no, + async_mqtt5::retain_as_published_e::retain, + async_mqtt5::retain_handling_e::send}}; + + auto&& [ec, sub_codes, sub_props] = co_await client.async_subscribe( + sub_topic, async_mqtt5::subscribe_props{}, use_nothrow_awaitable); + + co_return !ec && !sub_codes[0]; + } + + static void dump_props(const auto& publish_props) + { + publish_props.visit([](const Key& x, const auto& v) -> bool { + if constexpr(requires { v.has_value(); }) + { + if(v.has_value()) + std::cerr << "prop " << (int)Key::value << " : " << *v << "\n"; + else + std::cerr << "prop " << (int)Key::value << " (unset) \n"; + } + else if constexpr(requires { v.size(); }) + { + std::cerr << "arr prop (" << v.size() << "): [ "; + for(size_t i = 0; i < v.size(); i++) + { + if constexpr(requires { + v[i].first; + v[i].second; + }) + std::cerr << "(" << v[i].first << ", " << v[i].second << ")"; + else + std::cerr << v[i]; + if(i + 1 < v.size()) + std::cerr << ", "; + } + std::cerr << "]\n"; + } + return true; + }); + } + + boost::asio::awaitable subscribe_and_receive(ossia::net::parameter_base& param) + { + // subscribe + if(!(co_await subscribe(param.get_node().osc_address()))) + co_return; + + // receive + for(;;) + { + auto&& [ec, topic, payload, publish_props] + = co_await client.async_receive(use_nothrow_awaitable); + + if(ec == async_mqtt5::client::error::session_expired) + { + if(co_await subscribe(topic)) + continue; + else + break; + } + else if(ec) + break; + + param.push_value(ossia::convert(payload, param.get_value_type())); + } + + co_return; + } +}; + +struct mqtt5_protocol::subscribe_state +{ + boost::asio::cancellation_signal cancellation; + enum + { + created, + operating, + cancelled + } state{created}; +}; + +mqtt5_protocol::mqtt5_protocol(network_context_ptr ctx, const mqtt5_configuration& conf) + : m_context{ctx} + , m_conf{conf} + , m_client{std::make_unique(m_context->context)} +{ +} + +mqtt5_protocol::~mqtt5_protocol() { } + +bool mqtt5_protocol::pull(ossia::net::parameter_base&) +{ + return false; +} + +bool mqtt5_protocol::push(const ossia::net::parameter_base& p, const ossia::value& v) +{ + co_spawn( + m_context->context, m_client->publish(const_cast(p)), + boost::asio::detached); + return false; +} + +bool mqtt5_protocol::push_raw(const ossia::net::full_parameter_data&) +{ + return false; +} + +bool mqtt5_protocol::observe(ossia::net::parameter_base&, bool) +{ + return false; +} + +void mqtt5_protocol::on_new_param(const ossia::net::parameter_base& param) +{ + // FIXME we should create a "safe" data object copied to the thread there + boost::asio::post([¶m, this] { + switch(param.get_access()) + { + case ossia::access_mode::GET: + on_subscribe(param); + break; + case ossia::access_mode::SET: + // Nothing to do. publishing is just "push". + break; + case ossia::access_mode::BI: + // does not make sense with MQTT... + // let's try to subscribe anyways as most likely we want to read some sensor + on_subscribe(param); + break; + } + }); +} + +void mqtt5_protocol::on_removed_param(const ossia::net::parameter_base& p) +{ + on_unsubscribe(p); +} + +void mqtt5_protocol::on_subscribe(const ossia::net::parameter_base& p) +{ + m_subscriptions.visit(&p, [this, &p](auto& res) { + subscribe_state& sub = res.second; + + if(std::exchange(sub.state, sub.operating) == sub.operating) + return; + + co_spawn( + m_context->context, + m_client->subscribe_and_receive(const_cast(p)), + boost::asio::bind_cancellation_slot( + sub.cancellation.slot(), boost::asio::detached)); + }); +} + +void mqtt5_protocol::on_unsubscribe(const ossia::net::parameter_base& p) +{ + m_subscriptions.visit(&p, [this, &p](auto& res) { + subscribe_state& sub = res.second; + if(std::exchange(sub.state, sub.cancelled) == sub.operating) + sub.cancellation.emit(boost::asio::cancellation_type::all); + }); +} + +bool mqtt5_protocol::update(ossia::net::node_base& node_base) +{ + return false; +} + +void mqtt5_protocol::set_device(device_base& dev) +{ + dev.on_parameter_created.connect<&mqtt5_protocol::on_new_param>(*this); + dev.on_parameter_removing.connect<&mqtt5_protocol::on_removed_param>(*this); + + // Connect to the broker + m_client->client.brokers(m_conf.host, m_conf.port).async_run(boost::asio::detached); + + // Publish all existing nodes + ossia::net::iterate_all_children( + &dev.get_root_node(), + [this](ossia::net::parameter_base& param) { on_new_param(param); }); +} +} diff --git a/src/ossia/protocols/mqtt/mqtt_protocol.hpp b/src/ossia/protocols/mqtt/mqtt_protocol.hpp new file mode 100644 index 00000000000..846e03d395e --- /dev/null +++ b/src/ossia/protocols/mqtt/mqtt_protocol.hpp @@ -0,0 +1,48 @@ +#pragma once +#include +#include + +#include +#include +namespace ossia::net +{ +struct mqtt5_configuration +{ + std::string host; + int port; +}; + +struct mqtt5_client; + +class OSSIA_EXPORT mqtt5_protocol + : public ossia::net::protocol_base + , public Nano::Observer +{ +public: + explicit mqtt5_protocol( + ossia::net::network_context_ptr, const mqtt5_configuration& conf); + ~mqtt5_protocol(); + + bool pull(parameter_base&) override; + bool push(const parameter_base&, const value& v) override; + bool push_raw(const full_parameter_data&) override; + bool observe(parameter_base&, bool) override; + bool update(node_base& node_base) override; + void set_device(device_base& dev) override; + +private: + void on_new_param(const parameter_base& param); + void on_removed_param(const parameter_base& param); + void on_subscribe(const parameter_base& param); + void on_unsubscribe(const parameter_base& param); + + ossia::net::network_context_ptr m_context; + mqtt5_configuration m_conf{}; + std::unique_ptr m_client; + + struct subscribe_state; + boost::unordered::concurrent_flat_map< + const ossia::net::parameter_base*, subscribe_state> + m_subscriptions; +}; +} diff --git a/src/ossia_features.cmake b/src/ossia_features.cmake index bba833fbb36..3151cf86fad 100644 --- a/src/ossia_features.cmake +++ b/src/ossia_features.cmake @@ -23,6 +23,7 @@ if(IOS OR CMAKE_SYSTEM_NAME MATCHES Emscripten) set(OSSIA_PROTOCOL_WEBSOCKETS FALSE CACHE INTERNAL "") set(OSSIA_PROTOCOL_SERIAL FALSE CACHE INTERNAL "") set(OSSIA_PROTOCOL_ARTNET FALSE CACHE INTERNAL "") + set(OSSIA_PROTOCOL_MQTT5 FALSE CACHE INTERNAL "") endif() if(NOT OSSIA_QML) @@ -164,6 +165,12 @@ if (OSSIA_PROTOCOL_LIBMAPPER) set(OSSIA_PROTOCOLS ${OSSIA_PROTOCOLS} libmapper) endif() +if (OSSIA_PROTOCOL_MQTT5) + target_sources(ossia PRIVATE ${OSSIA_MQTT5_SRCS} ${OSSIA_MQTT5_HEADERS}) + target_link_libraries(ossia PRIVATE Async::MQTT5) + set(OSSIA_PROTOCOLS ${OSSIA_PROTOCOLS} mqtt5) +endif() + # Additional features if(OSSIA_C) diff --git a/src/ossia_sources.cmake b/src/ossia_sources.cmake index fa65a7616b4..d9b7a6cf7cb 100644 --- a/src/ossia_sources.cmake +++ b/src/ossia_sources.cmake @@ -536,6 +536,12 @@ set(OSSIA_ARTNET_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/ossia/protocols/artnet/artnet_protocol.cpp" ) +set(OSSIA_MQTT5_HEADERS + "${CMAKE_CURRENT_SOURCE_DIR}/ossia/protocols/mqtt/mqtt_protocol.hpp") + +set(OSSIA_MQTT5_SRCS + "${CMAKE_CURRENT_SOURCE_DIR}/ossia/protocols/mqtt/mqtt_protocol.cpp") + set(OSSIA_WS_CLIENT_HEADERS "${CMAKE_CURRENT_SOURCE_DIR}/ossia-qt/websocket-generic-client/ws_generic_client_protocol.hpp")