diff --git a/blocks/soapy/src/CMakeLists.txt b/blocks/soapy/src/CMakeLists.txt index 0ec648e21..93f4a7a68 100644 --- a/blocks/soapy/src/CMakeLists.txt +++ b/blocks/soapy/src/CMakeLists.txt @@ -1,2 +1,8 @@ add_executable(soapy_example soapy_example.cpp) -target_link_libraries(soapy_example PRIVATE gr-basic gr-fileio gr-testing gr-soapy ut) +target_link_libraries( + soapy_example + PRIVATE gr-basic + gr-fileio + gr-testing + gr-soapy + ut) diff --git a/blocks/soapy/test/CMakeLists.txt b/blocks/soapy/test/CMakeLists.txt index f541a625c..70d970b51 100644 --- a/blocks/soapy/test/CMakeLists.txt +++ b/blocks/soapy/test/CMakeLists.txt @@ -1,4 +1,12 @@ add_executable(qa_Soapy qa_Soapy.cpp) target_include_directories(qa_Soapy PRIVATE ${CMAKE_BINARY_DIR}/include ${CMAKE_CURRENT_BINARY_DIR}) -target_link_libraries(qa_Soapy PRIVATE gnuradio-options gnuradio-core fmt fftw gr-soapy gr-testing ut) +target_link_libraries( + qa_Soapy + PRIVATE gnuradio-options + gnuradio-core + fmt + fftw + gr-soapy + gr-testing + ut) add_test(NAME qa_Soapy COMMAND ${CMAKE_CROSSCOMPILING_EMULATOR} ${CMAKE_CURRENT_BINARY_DIR}/qa_Soapy) diff --git a/core/include/gnuradio-4.0/Block.hpp b/core/include/gnuradio-4.0/Block.hpp index 9bb03d7cf..8b6527ac9 100644 --- a/core/include/gnuradio-4.0/Block.hpp +++ b/core/include/gnuradio-4.0/Block.hpp @@ -327,7 +327,7 @@ enum class Category { * } * * void start() override { - * propertyCallbacks.emplace(kMyCustomProperty, &MyBlock::propertyCallbackMyCustom); + * propertyCallbacks.emplace(kMyCustomProperty, std::mem_fn(&MyBlock::propertyCallbackMyCustom)); * } * }; * @endcode @@ -428,18 +428,18 @@ class Block : public lifecycle::StateMachine { MsgPortInBuiltin msgIn; MsgPortOutBuiltin msgOut; - using PropertyCallback = std::optional (Derived::*)(std::string_view, Message); + using PropertyCallback = std::function(Derived&, std::string_view, Message)>; std::map propertyCallbacks{ - {block::property::kHeartbeat, &Block::propertyCallbackHeartbeat}, // - {block::property::kEcho, &Block::propertyCallbackEcho}, // - {block::property::kLifeCycleState, &Block::propertyCallbackLifecycleState}, // - {block::property::kSetting, &Block::propertyCallbackSettings}, // - {block::property::kStagedSetting, &Block::propertyCallbackStagedSettings}, // - {block::property::kStoreDefaults, &Block::propertyCallbackStoreDefaults}, // - {block::property::kResetDefaults, &Block::propertyCallbackResetDefaults}, // - {block::property::kActiveContext, &Block::propertyCallbackActiveContext}, // - {block::property::kSettingsCtx, &Block::propertyCallbackSettingsCtx}, // - {block::property::kSettingsContexts, &Block::propertyCallbackSettingsContexts}, // + {block::property::kHeartbeat, std::mem_fn(&Block::propertyCallbackHeartbeat)}, // + {block::property::kEcho, std::mem_fn(&Block::propertyCallbackEcho)}, // + {block::property::kLifeCycleState, std::mem_fn(&Block::propertyCallbackLifecycleState)}, // + {block::property::kSetting, std::mem_fn(&Block::propertyCallbackSettings)}, // + {block::property::kStagedSetting, std::mem_fn(&Block::propertyCallbackStagedSettings)}, // + {block::property::kStoreDefaults, std::mem_fn(&Block::propertyCallbackStoreDefaults)}, // + {block::property::kResetDefaults, std::mem_fn(&Block::propertyCallbackResetDefaults)}, // + {block::property::kActiveContext, std::mem_fn(&Block::propertyCallbackActiveContext)}, // + {block::property::kSettingsCtx, std::mem_fn(&Block::propertyCallbackSettingsCtx)}, // + {block::property::kSettingsContexts, std::mem_fn(&Block::propertyCallbackSettingsContexts)}, // }; std::map> propertySubscriptions; @@ -1358,17 +1358,20 @@ class Block : public lifecycle::StateMachine { } std::size_t getMergedBlockLimit() { - if constexpr (requires(const Derived& d) { - { available_samples(d) } -> std::same_as; - }) { + if constexpr (Derived::blockCategory != block::Category::NormalBlock) { + return 0; + } else if constexpr (requires(const Derived& d) { + { available_samples(d) } -> std::same_as; + }) { return available_samples(self()); } else if constexpr (traits::block::stream_input_port_types::size == 0 && traits::block::stream_output_port_types::size == 0) { // allow blocks that have neither input nor output ports (by merging source to sink block) -> use internal buffer size constexpr gr::Size_t chunkSize = Derived::merged_work_chunk_size(); static_assert(chunkSize != std::dynamic_extent && chunkSize > 0, "At least one internal port must define a maximum number of samples or the non-member/hidden " "friend function `available_samples(const BlockType&)` must be defined."); return chunkSize; + } else { + return std::numeric_limits::max(); } - return std::numeric_limits::max(); } template @@ -1670,7 +1673,11 @@ class Block : public lifecycle::StateMachine { } } } else { // block does not define any valid processing function - static_assert(meta::always_false>, "neither processBulk(...) nor processOne(...) implemented"); + if constexpr (Derived::blockCategory != block::Category::NormalBlock) { + return {requestedWork, 0UZ, OK}; + } else { + static_assert(meta::always_false>, "neither processBulk(...) nor processOne(...) implemented"); + } } // sanitise input/output samples based on explicit user-defined processBulk(...) return status @@ -1852,7 +1859,7 @@ class Block : public lifecycle::StateMachine { std::optional retMessage; try { - retMessage = (self().*callback)(message.endpoint, message); // N.B. life-time: message is copied + retMessage = callback(self(), message.endpoint, message); // N.B. life-time: message is copied } catch (const gr::exception& e) { retMessage = Message{message}; retMessage->data = std::unexpected(Error(e)); diff --git a/core/include/gnuradio-4.0/BlockModel.hpp b/core/include/gnuradio-4.0/BlockModel.hpp index 2a0449604..16a9920d3 100644 --- a/core/include/gnuradio-4.0/BlockModel.hpp +++ b/core/include/gnuradio-4.0/BlockModel.hpp @@ -55,9 +55,26 @@ struct Edge { Edge& operator=(const Edge&) = delete; - Edge(Edge&&) noexcept = default; - - Edge& operator=(Edge&&) noexcept = default; + Edge(Edge&& other) noexcept : _sourceBlock(std::exchange(other._sourceBlock, nullptr)), _destinationBlock(std::exchange(other._destinationBlock, nullptr)), _sourcePortDefinition(std::move(other._sourcePortDefinition)), _destinationPortDefinition(std::move(other._destinationPortDefinition)), _state(other._state), _actualBufferSize(other._actualBufferSize), _edgeType(other._edgeType), _sourcePort(std::exchange(other._sourcePort, nullptr)), _destinationPort(std::exchange(other._destinationPort, nullptr)), _minBufferSize(other._minBufferSize), _weight(other._weight), _name(std::move(other._name)) {} + + Edge& operator=(Edge&& other) noexcept { + auto tmp = std::move(other); + std::swap(tmp._sourceBlock, _sourceBlock); + std::swap(tmp._destinationBlock, _destinationBlock); + std::swap(tmp._sourcePortDefinition, _sourcePortDefinition); + std::swap(tmp._destinationPortDefinition, _destinationPortDefinition); + std::swap(tmp._state, _state); + std::swap(tmp._actualBufferSize, _actualBufferSize); + std::swap(tmp._edgeType, _edgeType); + std::swap(tmp._sourcePort, _sourcePort); + std::swap(tmp._destinationPort, _destinationPort); + + std::swap(tmp._minBufferSize, _minBufferSize); + std::swap(tmp._weight, _weight); + std::swap(tmp._name, _name); + + return *this; + } Edge(BlockModel* sourceBlock, PortDefinition sourcePortDefinition, BlockModel* destinationBlock, PortDefinition destinationPortDefinition, std::size_t minBufferSize, std::int32_t weight, std::string name) : _sourceBlock(sourceBlock), _destinationBlock(destinationBlock), _sourcePortDefinition(sourcePortDefinition), _destinationPortDefinition(destinationPortDefinition), _minBufferSize(minBufferSize), _weight(weight), _name(std::move(name)) {} @@ -160,10 +177,10 @@ class BlockModel { MsgPortInBuiltin* msgIn; MsgPortOutBuiltin* msgOut; - static std::string_view portName(const DynamicPortOrCollection& portOrCollection) { + static std::string portName(const DynamicPortOrCollection& portOrCollection) { return std::visit(meta::overloaded{ // [](const gr::DynamicPort& port) { return port.name; }, // - [](const NamedPortCollection& namedCollection) { return namedCollection.name; }}, + [](const NamedPortCollection& namedCollection) { return std::string(namedCollection.name); }}, portOrCollection); } @@ -375,7 +392,7 @@ constexpr bool contains_type = (std::is_same_v || ...); template requires std::is_constructible_v class BlockWrapper : public BlockModel { -private: +protected: static_assert(std::is_same_v>); T _block; std::string _type_name = gr::meta::type_name(); @@ -401,36 +418,37 @@ class BlockWrapper : public BlockModel { msgOut = std::addressof(_block.msgOut); } - template - constexpr static auto& processPort(auto& where, TPort& port) noexcept { - where.push_back(gr::DynamicPort(port, DynamicPort::non_owned_reference_tag{})); - return where.back(); - } - - void dynamicPortLoader() { + void dynamicPortsLoader() { if (_dynamicPortsLoaded) { return; } - auto registerPort = [this](DynamicPorts& where, auto, CurrentPortType*) noexcept { - if constexpr (CurrentPortType::kIsDynamicCollection) { - auto& collection = CurrentPortType::getPortObject(blockRef()); - NamedPortCollection result; - result.name = CurrentPortType::Name; - for (auto& port : collection) { - processPort(result.ports, port); - } - where.push_back(std::move(result)); - } else { - auto& port = CurrentPortType::getPortObject(blockRef()); - port.name = CurrentPortType::Name; - processPort(where, port); - } + auto processPort = [](auto& where, TPort& port) -> auto& { + where.push_back(gr::DynamicPort(port, DynamicPort::non_owned_reference_tag{})); + return where.back(); }; - using Node = std::remove_cvref_t; - traits::block::all_input_ports::for_each(registerPort, _dynamicInputPorts); - traits::block::all_output_ports::for_each(registerPort, _dynamicOutputPorts); + using TBlock = std::remove_cvref_t; + if constexpr (TBlock::blockCategory == block::Category::NormalBlock) { + auto registerPort = [this, processPort](DynamicPorts& where, auto, CurrentPortType*) noexcept { + if constexpr (CurrentPortType::kIsDynamicCollection) { + auto& collection = CurrentPortType::getPortObject(blockRef()); + NamedPortCollection result; + result.name = CurrentPortType::Name; + for (auto& port : collection) { + processPort(result.ports, port); + } + where.push_back(std::move(result)); + } else { + auto& port = CurrentPortType::getPortObject(blockRef()); + port.name = CurrentPortType::Name; + processPort(where, port); + } + }; + + traits::block::all_input_ports::for_each(registerPort, _dynamicInputPorts); + traits::block::all_output_ports::for_each(registerPort, _dynamicOutputPorts); + } _dynamicPortsLoaded = true; } @@ -439,7 +457,7 @@ class BlockWrapper : public BlockModel { BlockWrapper() : BlockWrapper(gr::property_map()) {} explicit BlockWrapper(gr::property_map initParameter) : _block(std::move(initParameter)) { initMessagePorts(); - _dynamicPortsLoader = std::bind_front(&BlockWrapper::dynamicPortLoader, this); + _dynamicPortsLoader = [this] { this->dynamicPortsLoader(); }; } BlockWrapper(const BlockWrapper& other) = delete; diff --git a/core/include/gnuradio-4.0/Buffer.hpp b/core/include/gnuradio-4.0/Buffer.hpp index 0707c278a..2ac82175a 100644 --- a/core/include/gnuradio-4.0/Buffer.hpp +++ b/core/include/gnuradio-4.0/Buffer.hpp @@ -1,6 +1,7 @@ #ifndef GNURADIO_BUFFER2_H #define GNURADIO_BUFFER2_H +#include #include #include #include diff --git a/core/include/gnuradio-4.0/Graph.hpp b/core/include/gnuradio-4.0/Graph.hpp index 86c00b5c4..b81a1f851 100644 --- a/core/include/gnuradio-4.0/Graph.hpp +++ b/core/include/gnuradio-4.0/Graph.hpp @@ -54,8 +54,145 @@ inline static const char* kGraphInspect = "GraphInspect"; inline static const char* kGraphInspected = "GraphInspected"; inline static const char* kRegistryBlockTypes = "RegistryBlockTypes"; + +inline static const char* kSubgraphExportPort = "SubgraphExportPort"; +inline static const char* kSubgraphExportedPort = "SubgraphExportedPort"; } // namespace graph::property +template +class GraphWrapper : public BlockWrapper { +private: + std::unordered_multimap _exportedInputPortsForBlock; + std::unordered_multimap _exportedOutputPortsForBlock; + +public: + GraphWrapper() { + // We need to make sure nobody touches our dynamic ports + // as this class will handle them + this->_dynamicPortsLoader = [] {}; + + this->_block.propertyCallbacks[graph::property::kSubgraphExportPort] = [this](auto& self, std::string_view property, Message message) -> std::optional { + const auto& data = message.data.value(); + const std::string& uniqueBlockName = std::get(data.at("uniqueBlockName"s)); + auto portDirection = std::get(data.at("portDirection"s)) == "input" ? PortDirection::INPUT : PortDirection::OUTPUT; + const std::string& portName = std::get(data.at("portName"s)); + const bool exportFlag = std::get(data.at("exportFlag"s)); + + exportPort(exportFlag, uniqueBlockName, portDirection, portName); + + message.endpoint = graph::property::kSubgraphExportedPort; + return message; + }; + } + + void exportPort(bool exportFlag, const std::string& uniqueBlockName, PortDirection portDirection, const std::string& portName) { + auto [infoIt, infoFound] = findExportedPortInfo(uniqueBlockName, portDirection, portName); + if (infoFound == exportFlag) { + throw Error(fmt::format("Port {} in block {} export status already as desired {}", portName, uniqueBlockName, exportFlag)); + } + + auto& port = findPortInBlock(uniqueBlockName, portDirection, portName); + auto& bookkeepingCollection = portDirection == PortDirection::INPUT ? _exportedInputPortsForBlock : _exportedOutputPortsForBlock; + auto& portCollection = portDirection == PortDirection::INPUT ? this->_dynamicInputPorts : this->_dynamicOutputPorts; + if (exportFlag) { + bookkeepingCollection.emplace(uniqueBlockName, portName); + portCollection.push_back(port.weakRef()); + } else { + bookkeepingCollection.erase(infoIt); + // TODO: Add support for exporting port collections + auto portIt = std::ranges::find_if(portCollection, [needleName = port.name](const auto& portOrCollection) { + return std::visit(meta::overloaded{ + // + [&](DynamicPort& in) { return in.name == needleName; }, // + [](auto&) { return false; } // + }, + portOrCollection); + }); + if (portIt != portCollection.end()) { + portCollection.erase(portIt); + } else { + throw Error("Port was not exported, while it is registered as such"); + } + } + + updateMetaInformation(); + } + + auto& blockRef() { return BlockWrapper::blockRef(); } + auto& blockRef() const { return BlockWrapper::blockRef(); } + + const std::unordered_multimap& exportedInputPortsForBlock() const { return _exportedInputPortsForBlock; } + const std::unordered_multimap& exportedOutputPortsForBlock() const { return _exportedOutputPortsForBlock; } + + BlockModel& findBlockWithUniqueName(std::string uniqueBlockName) { + for (const auto& block : this->blocks()) { + if (std::string(block->uniqueName()) == uniqueBlockName) { + return *block; + } + } + throw Error(fmt::format("Block {} not found in {}", uniqueBlockName, this->uniqueName())); + } + + BlockModel& findFirstBlockWithName(std::string blockName) { + for (const auto& block : this->blocks()) { + if (std::string(block->name()) == blockName) { + return *block; + } + } + throw Error(fmt::format("Block {} not found in {}", blockName, this->uniqueName())); + } + +private: + DynamicPort& findPortInBlock(const std::string& uniqueBlockName, PortDirection portDirection, const std::string& portName) { + auto& block = findBlockWithUniqueName(uniqueBlockName); + + if (portDirection == PortDirection::INPUT) { + return block.dynamicInputPort(portName); + } else { + return block.dynamicOutputPort(portName); + } + } + + auto findExportedPortInfo(const std::string& uniqueBlockName, PortDirection portDirection, const std::string& portName) const { + auto& bookkeepingCollection = portDirection == PortDirection::INPUT ? _exportedInputPortsForBlock : _exportedOutputPortsForBlock; + const auto& [from, to] = bookkeepingCollection.equal_range(std::string(uniqueBlockName)); + for (auto it = from; it != to; it++) { + if (it->second == portName) { + return std::make_pair(it, true); + } + } + return std::make_pair(bookkeepingCollection.end(), false); + } + + void updateMetaInformation() { + auto& info = BlockWrapper::metaInformation(); + + auto fillMetaInformation = [](property_map& dest, auto& bookkeepingCollection) { + std::string previousUniqueName; + std::vector collectedPorts; + for (const auto& [blockUniqueName, portName] : bookkeepingCollection) { + if (previousUniqueName != blockUniqueName && !collectedPorts.empty()) { + dest[previousUniqueName] = std::move(collectedPorts); + collectedPorts.clear(); + } + collectedPorts.push_back(portName); + previousUniqueName = blockUniqueName; + } + if (!collectedPorts.empty()) { + dest[previousUniqueName] = std::move(collectedPorts); + collectedPorts.clear(); + } + }; + + property_map exportedInputPorts, exportedOutputPorts; + fillMetaInformation(exportedInputPorts, _exportedInputPortsForBlock); + fillMetaInformation(exportedOutputPorts, _exportedOutputPortsForBlock); + + info["exportedInputPorts"] = std::move(exportedInputPorts); + info["exportedOutputPorts"] = std::move(exportedOutputPorts); + } +}; + class Graph : public gr::Block { private: std::shared_ptr _progress = std::make_shared(); @@ -184,14 +321,14 @@ class Graph : public gr::Block { Graph(property_map settings = {}) : gr::Block(std::move(settings)) { _blocks.reserve(100); // TODO: remove - propertyCallbacks[graph::property::kEmplaceBlock] = &Graph::propertyCallbackEmplaceBlock; - propertyCallbacks[graph::property::kRemoveBlock] = &Graph::propertyCallbackRemoveBlock; - propertyCallbacks[graph::property::kInspectBlock] = &Graph::propertyCallbackInspectBlock; - propertyCallbacks[graph::property::kReplaceBlock] = &Graph::propertyCallbackReplaceBlock; - propertyCallbacks[graph::property::kEmplaceEdge] = &Graph::propertyCallbackEmplaceEdge; - propertyCallbacks[graph::property::kRemoveEdge] = &Graph::propertyCallbackRemoveEdge; - propertyCallbacks[graph::property::kGraphInspect] = &Graph::propertyCallbackGraphInspect; - propertyCallbacks[graph::property::kRegistryBlockTypes] = &Graph::propertyCallbackRegistryBlockTypes; + propertyCallbacks[graph::property::kEmplaceBlock] = std::mem_fn(&Graph::propertyCallbackEmplaceBlock); + propertyCallbacks[graph::property::kRemoveBlock] = std::mem_fn(&Graph::propertyCallbackRemoveBlock); + propertyCallbacks[graph::property::kInspectBlock] = std::mem_fn(&Graph::propertyCallbackInspectBlock); + propertyCallbacks[graph::property::kReplaceBlock] = std::mem_fn(&Graph::propertyCallbackReplaceBlock); + propertyCallbacks[graph::property::kEmplaceEdge] = std::mem_fn(&Graph::propertyCallbackEmplaceEdge); + propertyCallbacks[graph::property::kRemoveEdge] = std::mem_fn(&Graph::propertyCallbackRemoveEdge); + propertyCallbacks[graph::property::kGraphInspect] = std::mem_fn(&Graph::propertyCallbackGraphInspect); + propertyCallbacks[graph::property::kRegistryBlockTypes] = std::mem_fn(&Graph::propertyCallbackRegistryBlockTypes); } Graph(Graph&) = delete; // there can be only one owner of Graph Graph& operator=(Graph&) = delete; // there can be only one owner of Graph @@ -318,13 +455,13 @@ class Graph : public gr::Block { property_map inputPorts; for (auto& portOrCollection : block->dynamicInputPorts()) { - inputPorts[std::string(BlockModel::portName(portOrCollection))] = serializePortOrCollection(portOrCollection); + inputPorts[BlockModel::portName(portOrCollection)] = serializePortOrCollection(portOrCollection); } result["inputPorts"] = std::move(inputPorts); property_map outputPorts; for (auto& portOrCollection : block->dynamicOutputPorts()) { - outputPorts[std::string(BlockModel::portName(portOrCollection))] = serializePortOrCollection(portOrCollection); + outputPorts[BlockModel::portName(portOrCollection)] = serializePortOrCollection(portOrCollection); } result["outputPorts"] = std::move(outputPorts); @@ -396,6 +533,9 @@ class Graph : public gr::Block { throw gr::exception(fmt::format("Block {} was not found in {}", uniqueName, this->unique_name)); } + std::erase_if(_edges, [&it](const Edge& edge) { // + return std::addressof(edge.sourceBlock()) == it->get() || std::addressof(edge.destinationBlock()) == it->get(); + }); _blocks.erase(it); message.endpoint = graph::property::kBlockRemoved; diff --git a/core/include/gnuradio-4.0/Graph_yaml_importer.hpp b/core/include/gnuradio-4.0/Graph_yaml_importer.hpp index 6ac020418..c2b0a48d2 100644 --- a/core/include/gnuradio-4.0/Graph_yaml_importer.hpp +++ b/core/include/gnuradio-4.0/Graph_yaml_importer.hpp @@ -10,61 +10,84 @@ namespace gr { -inline gr::Graph loadGrc(PluginLoader& loader, std::string_view yamlSrc) { - Graph testGraph; +namespace detail { - std::map createdBlocks; +inline void loadGraphFromMap(PluginLoader& loader, gr::Graph& resultGraph, gr::property_map yaml) { - const auto yaml = pmtv::yaml::deserialize(yamlSrc); - if (!yaml) { - throw gr::exception(fmt::format("Could not parse yaml: {}:{}\n{}", yaml.error().message, yaml.error().line, yamlSrc)); - } + std::map createdBlocks; - auto blks = std::get>(yaml.value().at("blocks")); + auto blks = std::get>(yaml.at("blocks")); for (const auto& blk : blks) { auto grcBlock = std::get(blk); - const auto name = std::get(grcBlock["name"]); - const auto id = std::get(grcBlock["id"]); + const auto blockName = std::get(grcBlock["name"]); + const auto blockType = std::get(grcBlock["id"]); - std::string type = "double"; + std::string blockParametrization = "double"; /// TODO: when using saveGrc template_args is not saved, this has to be implemented if (auto it = grcBlock.find("template_args"); it != grcBlock.end()) { - type = std::get(it->second); + blockParametrization = std::get(it->second); } - auto currentBlock = loader.instantiate(id, type); - if (!currentBlock) { - throw fmt::format("Unable to create block of type '{}'", id); - } + if (blockType == "SUBGRAPH") { + auto& subGraph = resultGraph.addBlock(std::make_unique>()); + createdBlocks[blockName] = &subGraph; + subGraph.setName(blockName); + + auto* subGraphDirect = static_cast*>(&subGraph); + subGraphDirect->setName(blockName); + + const auto& graphData = std::get(grcBlock["graph"]); + loadGraphFromMap(loader, subGraphDirect->blockRef(), graphData); - currentBlock->setName(name); + const auto& exportedPorts = std::get>(graphData.at("exported_ports")); + for (const auto& exportedPort_ : exportedPorts) { + auto exportedPort = std::get>(exportedPort_); + if (exportedPort.size() != 3) { + throw fmt::format("Unable to parse exported port ({} instead of 4 elements)", exportedPort.size()); + } + + auto& block = subGraphDirect->findFirstBlockWithName(std::get(exportedPort[0])); - const auto parametersPmt = grcBlock["parameters"]; - if (const auto parameters = std::get_if(¶metersPmt)) { - currentBlock->settings().loadParametersFromPropertyMap(*parameters); + subGraphDirect->exportPort(true, + /* block's unique name */ std::string(block.uniqueName()), + /* port direction */ std::get(exportedPort[1]) == "INPUT" ? PortDirection::INPUT : PortDirection::OUTPUT, + /* port name */ std::get(exportedPort[2])); + } } else { - currentBlock->settings().loadParametersFromPropertyMap({}); - } + auto currentBlock = loader.instantiate(blockType, blockParametrization); + if (!currentBlock) { + throw fmt::format("Unable to create block of type '{}'", blockType); + } - if (auto it = grcBlock.find("ctx_parameters"); it != grcBlock.end()) { - auto parametersCtx = std::get>(it->second); - for (const auto& ctxPmt : parametersCtx) { - auto ctxPar = std::get(ctxPmt); - const auto ctxName = std::get(ctxPar["context"]); - const auto ctxTime = std::get(ctxPar["time"]); // in ns - const auto ctxParameters = std::get(ctxPar["parameters"]); + currentBlock->setName(blockName); - currentBlock->settings().loadParametersFromPropertyMap(ctxParameters, SettingsCtx{ctxTime, ctxName}); + const auto parametersPmt = grcBlock["parameters"]; + if (const auto parameters = std::get_if(¶metersPmt)) { + currentBlock->settings().loadParametersFromPropertyMap(*parameters); + } else { + currentBlock->settings().loadParametersFromPropertyMap({}); } + + if (auto it = grcBlock.find("ctx_parameters"); it != grcBlock.end()) { + auto parametersCtx = std::get>(it->second); + for (const auto& ctxPmt : parametersCtx) { + auto ctxPar = std::get(ctxPmt); + const auto ctxName = std::get(ctxPar["context"]); + const auto ctxTime = std::get(ctxPar["time"]); // in ns + const auto ctxParameters = std::get(ctxPar["parameters"]); + + currentBlock->settings().loadParametersFromPropertyMap(ctxParameters, SettingsCtx{ctxTime, ctxName}); + } + } + if (const auto failed = currentBlock->settings().activateContext(); failed == std::nullopt) { + throw gr::exception("Settings for context could not be activated"); + } + createdBlocks[blockName] = &resultGraph.addBlock(std::move(currentBlock)); } - if (const auto failed = currentBlock->settings().activateContext(); failed == std::nullopt) { - throw gr::exception("Settings for context could not be activated"); - } - createdBlocks[name] = &testGraph.addBlock(std::move(currentBlock)); } // for blocks - auto connections = std::get>(yaml.value().at("connections")); + auto connections = std::get>(yaml.at("connections")); for (const auto& conn : connections) { auto connection = std::get>(conn); if (connection.size() != 4) { @@ -73,14 +96,14 @@ inline gr::Graph loadGrc(PluginLoader& loader, std::string_view yamlSrc) { auto parseBlockPort = [&](const auto& blockField, const auto& portField) { const auto blockName = std::get(blockField); - auto node = createdBlocks.find(blockName); - if (node == createdBlocks.end()) { - throw fmt::format("Unknown node '{}'", blockName); + auto block = createdBlocks.find(blockName); + if (block == createdBlocks.end()) { + throw fmt::format("Unknown block '{}'", blockName); } struct result { - decltype(node) block_it; - PortDefinition port_definition; + decltype(block) block_it; + PortDefinition port_definition; }; if (const auto portFields = std::get_if>(&portField)) { @@ -89,127 +112,164 @@ inline gr::Graph loadGrc(PluginLoader& loader, std::string_view yamlSrc) { } const auto index = std::get(portFields->at(0)); const auto subIndex = std::get(portFields->at(1)); - return result{node, {static_cast(index), static_cast(subIndex)}}; + return result{block, {static_cast(index), static_cast(subIndex)}}; } else { const auto index = std::get(portField); - return result{node, {static_cast(index)}}; + return result{block, {static_cast(index)}}; } }; auto src = parseBlockPort(connection[0], connection[1]); auto dst = parseBlockPort(connection[2], connection[3]); - testGraph.connect(*src.block_it->second, src.port_definition, *dst.block_it->second, dst.port_definition); + resultGraph.connect(*src.block_it->second, src.port_definition, *dst.block_it->second, dst.port_definition); } // for connections - - return testGraph; } -inline std::string saveGrc(const gr::Graph& testGraph) { +inline gr::property_map saveGraphToMap(const gr::Graph& rootGraph) { + pmtv::map_t result; + + { + std::vector serializedBlocks; + rootGraph.forEachBlock([&](const auto& block) { + pmtv::map_t map; + map["name"] = std::string(block.name()); + + const auto& fullTypeName = block.typeName(); + if (fullTypeName == "gr::Graph") { + map.emplace("id", "SUBGRAPH"); + auto* subGraphDirect = dynamic_cast*>(std::addressof(block)); + if (subGraphDirect == nullptr) { + throw gr::Error(fmt::format("Can not serialize gr::Graph-based subgraph {} which is not added to the parent graph {} via GraphWrapper", block.uniqueName(), rootGraph.unique_name)); + } + property_map graphYaml = detail::saveGraphToMap(subGraphDirect->blockRef()); - pmtv::map_t yaml; + std::vector exportedPortsData; + for (const auto& [blockName, portName] : subGraphDirect->exportedInputPortsForBlock()) { + exportedPortsData.push_back(std::vector{blockName, "INPUT"s, portName}); + } + for (const auto& [blockName, portName] : subGraphDirect->exportedOutputPortsForBlock()) { + exportedPortsData.push_back(std::vector{blockName, "OUTPUT"s, portName}); + } - std::vector blocks; - testGraph.forEachBlock([&](const auto& node) { - pmtv::map_t map; - map["name"] = std::string(node.name()); + graphYaml["exported_ports"] = std::move(exportedPortsData); + map.emplace("graph", std::move(graphYaml)); - const auto& fullTypeName = node.typeName(); - std::string typeName(fullTypeName.cbegin(), std::find(fullTypeName.cbegin(), fullTypeName.cend(), '<')); - map.emplace("id", std::move(typeName)); + } else { + std::string typeName(fullTypeName.cbegin(), std::find(fullTypeName.cbegin(), fullTypeName.cend(), '<')); + map.emplace("id", std::move(typeName)); + + // Helper function to write parameters + auto writeParameters = [&](const property_map& settingsMap, const property_map& metaInformation = {}) { + pmtv::map_t parameters; + auto writeMap = [&](const auto& localMap) { + for (const auto& [settingsKey, settingsValue] : localMap) { + std::visit([&](const T& value) { parameters[settingsKey] = value; }, settingsValue); + } + }; + writeMap(settingsMap); + if (!metaInformation.empty()) { + writeMap(metaInformation); + } + return parameters; + }; + + const auto& stored = block.settings().getStoredAll(); + if (stored.contains("")) { + const auto& ctxParameters = stored.at(""); + const auto& settingsMap = ctxParameters.back().second; // write only the last parameters + if (!block.metaInformation().empty() || !settingsMap.empty()) { + map["parameters"] = writeParameters(settingsMap, block.metaInformation()); + } + } - // Helper function to write parameters - auto writeParameters = [&](const property_map& settingsMap, const property_map& metaInformation = {}) { - pmtv::map_t parameters; - auto writeMap = [&](const auto& localMap) { - for (const auto& [settingsKey, settingsValue] : localMap) { - std::visit([&](const T& value) { parameters[settingsKey] = value; }, settingsValue); + std::vector ctxParamsSeq; + for (const auto& [ctx, ctxParameters] : stored) { + if (ctx == "") { + continue; + } + + for (const auto& [ctxTime, settingsMap] : ctxParameters) { + pmtv::map_t ctxParam; + + // Convert ctxTime.context to a string, regardless of its actual type + std::string contextStr = std::visit( + [](const auto& arg) -> std::string { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return arg; + } else if constexpr (std::is_arithmetic_v) { + return std::to_string(arg); + } + return ""; + }, + ctxTime.context); + + ctxParam["context"] = contextStr; + ctxParam["time"] = ctxTime.time; + ctxParam["parameters"] = writeParameters(settingsMap); + ctxParamsSeq.emplace_back(std::move(ctxParam)); + } } - }; - writeMap(settingsMap); - if (!metaInformation.empty()) { - writeMap(metaInformation); + map["ctx_parameters"] = ctxParamsSeq; } - return parameters; - }; - const auto& stored = node.settings().getStoredAll(); - if (stored.contains("")) { - const auto& ctxParameters = stored.at(""); - const auto& settingsMap = ctxParameters.back().second; // write only the last parameters - if (!node.metaInformation().empty() || !settingsMap.empty()) { - map["parameters"] = writeParameters(settingsMap, node.metaInformation()); - } - } + serializedBlocks.emplace_back(std::move(map)); + }); + result["blocks"] = std::move(serializedBlocks); + } - std::vector ctxParamsSeq; - for (const auto& [ctx, ctxParameters] : stored) { - if (ctx == "") { - continue; - } + { + std::vector serializedConnections; + rootGraph.forEachEdge([&](const auto& edge) { + std::vector seq; + + auto writePortDefinition = [&](const auto& definition) { // + std::visit(meta::overloaded( // + [&](const PortDefinition::IndexBased& _definition) { + if (_definition.subIndex != meta::invalid_index) { + std::vector seqPort; + + seqPort.push_back(std::int64_t(_definition.topLevel)); + seqPort.push_back(std::int64_t(_definition.subIndex)); + seq.push_back(seqPort); + } else { + seq.push_back(std::int64_t(_definition.topLevel)); + } + }, // + [&](const PortDefinition::StringBased& _definition) { seq.push_back(_definition.name); }), + definition.definition); + }; - for (const auto& [ctxTime, settingsMap] : ctxParameters) { - pmtv::map_t ctxParam; - - // Convert ctxTime.context to a string, regardless of its actual type - std::string contextStr = std::visit( - [](const auto& arg) -> std::string { - using T = std::decay_t; - if constexpr (std::is_same_v) { - return arg; - } else if constexpr (std::is_arithmetic_v) { - return std::to_string(arg); - } - return ""; - }, - ctxTime.context); - - ctxParam["context"] = contextStr; - ctxParam["time"] = ctxTime.time; - ctxParam["parameters"] = writeParameters(settingsMap); - ctxParamsSeq.emplace_back(std::move(ctxParam)); - } - } - map["ctx_parameters"] = ctxParamsSeq; - - blocks.emplace_back(std::move(map)); - }); - yaml["blocks"] = blocks; - - std::vector connections; - testGraph.forEachEdge([&](const auto& edge) { - std::vector seq; - - auto writePortDefinition = [&](const auto& definition) { // - std::visit(meta::overloaded( // - [&](const PortDefinition::IndexBased& _definition) { - if (_definition.subIndex != meta::invalid_index) { - std::vector seqPort; - - seqPort.push_back(std::int64_t(_definition.topLevel)); - seqPort.push_back(std::int64_t(_definition.subIndex)); - seq.push_back(seqPort); - } else { - seq.push_back(std::int64_t(_definition.topLevel)); - } - }, // - [&](const PortDefinition::StringBased& _definition) { seq.push_back(_definition.name); }), - definition.definition); - }; + seq.push_back(edge.sourceBlock().name().data()); + writePortDefinition(edge.sourcePortDefinition()); + + seq.push_back(edge.destinationBlock().name().data()); + writePortDefinition(edge.destinationPortDefinition()); - seq.push_back(edge.sourceBlock().name().data()); - writePortDefinition(edge.sourcePortDefinition()); + serializedConnections.emplace_back(seq); + }); + result["connections"] = std::move(serializedConnections); + } + + return result; +} - seq.push_back(edge.destinationBlock().name().data()); - writePortDefinition(edge.destinationPortDefinition()); +} // namespace detail - connections.emplace_back(seq); - }); - yaml["connections"] = connections; +inline gr::Graph loadGrc(PluginLoader& loader, std::string_view yamlSrc) { + Graph resultGraph; + const auto yaml = pmtv::yaml::deserialize(yamlSrc); + if (!yaml) { + throw gr::exception(fmt::format("Could not parse yaml: {}:{}\n{}", yaml.error().message, yaml.error().line, yamlSrc)); + } - return pmtv::yaml::serialize(yaml); + detail::loadGraphFromMap(loader, resultGraph, *yaml); + return resultGraph; } +inline std::string saveGrc(const gr::Graph& rootGraph) { return pmtv::yaml::serialize(detail::saveGraphToMap(rootGraph)); } + } // namespace gr #endif // include guard diff --git a/core/include/gnuradio-4.0/Message.hpp b/core/include/gnuradio-4.0/Message.hpp index 504561c3c..af5e2f659 100644 --- a/core/include/gnuradio-4.0/Message.hpp +++ b/core/include/gnuradio-4.0/Message.hpp @@ -134,6 +134,7 @@ void sendMessage(auto& port, std::string_view serviceName, std::string_view endp } WriterSpanLike auto msgSpan = port.streamWriter().template reserve(1UZ); msgSpan[0] = std::move(message); + msgSpan.publish(1UZ); } } // namespace detail diff --git a/core/include/gnuradio-4.0/Port.hpp b/core/include/gnuradio-4.0/Port.hpp index 68101a8d8..4c55d95db 100644 --- a/core/include/gnuradio-4.0/Port.hpp +++ b/core/include/gnuradio-4.0/Port.hpp @@ -937,15 +937,19 @@ static_assert(std::is_default_constructible_v>); */ class DynamicPort { public: - std::string_view name; - std::int16_t& priority; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) - std::size_t& min_samples; - std::size_t& max_samples; + std::string name; + std::int16_t priority; // → dependents of a higher-prio port should be scheduled first (Q: make this by order of ports?) + std::size_t min_samples; + std::size_t max_samples; private: struct model { // intentionally class-private definition to limit interface exposure and enhance composition virtual ~model() = default; + [[nodiscard]] virtual DynamicPort weakRef() const noexcept = 0; + + [[nodiscard]] virtual std::intptr_t internalId() const noexcept = 0; + [[nodiscard]] virtual std::any defaultValue() const noexcept = 0; [[nodiscard]] virtual bool setDefaultValue(const std::any& val) noexcept = 0; @@ -1021,6 +1025,10 @@ class DynamicPort { ~PortWrapper() override = default; + [[nodiscard]] virtual DynamicPort weakRef() const noexcept override; + + [[nodiscard]] std::intptr_t internalId() const noexcept override { return reinterpret_cast(std::addressof(_value)); } + [[nodiscard]] std::any defaultValue() const noexcept override { return _value.defaultValue(); } [[nodiscard]] bool setDefaultValue(const std::any& val) noexcept override { return _value.setDefaultValue(val); } @@ -1071,8 +1079,19 @@ class DynamicPort { DynamicPort(const DynamicPort& arg) = delete; DynamicPort& operator=(const DynamicPort& arg) = delete; - DynamicPort(DynamicPort&& arg) = default; - DynamicPort& operator=(DynamicPort&& arg) = delete; + DynamicPort(DynamicPort&& other) noexcept : name(other.name), priority(other.priority), min_samples(other.min_samples), max_samples(other.max_samples), _accessor(std::move(other._accessor)) {} + auto& operator=(DynamicPort&& other) noexcept { + auto tmp = std::move(other); + std::swap(_accessor, tmp._accessor); + std::swap(name, tmp.name); + std::swap(priority, tmp.priority); + std::swap(min_samples, tmp.min_samples); + std::swap(max_samples, tmp.max_samples); + return *this; + } + + bool operator==(const DynamicPort& other) const noexcept { return _accessor->internalId() == other._accessor->internalId(); } + bool operator!=(const DynamicPort& other) const noexcept { return _accessor->internalId() != other._accessor->internalId(); } // TODO: The lifetime of ports is a problem here, if we keep a reference to the port in DynamicPort, the port object/ can not be reallocated template @@ -1081,6 +1100,8 @@ class DynamicPort { template explicit constexpr DynamicPort(T&& arg, owned_value_tag) noexcept : name(arg.name), priority(arg.priority), min_samples(arg.min_samples), max_samples(arg.max_samples), _accessor{std::make_unique>(std::forward(arg))} {} + [[nodiscard]] DynamicPort weakRef() const noexcept { return _accessor->weakRef(); } + [[nodiscard]] std::any defaultValue() const noexcept { return _accessor->defaultValue(); } [[nodiscard]] bool setDefaultValue(const std::any& val) noexcept { return _accessor->setDefaultValue(val); } @@ -1113,6 +1134,11 @@ class DynamicPort { [[nodiscard]] ConnectionResult connect(DynamicPort& dst_port) { return _accessor->connect(dst_port); } }; +template +[[nodiscard]] DynamicPort DynamicPort::PortWrapper::weakRef() const noexcept { + return DynamicPort(_value, DynamicPort::non_owned_reference_tag{}); +} + static_assert(PortLike); namespace detail { diff --git a/core/test/message_utils.hpp b/core/test/message_utils.hpp new file mode 100644 index 000000000..530756031 --- /dev/null +++ b/core/test/message_utils.hpp @@ -0,0 +1,109 @@ +#ifndef CORE_TEST_MESSAGE_UTILS_HPP +#define CORE_TEST_MESSAGE_UTILS_HPP + +#include +#include +#include + +#include + +#include +#include +#include + +namespace gr::testing { + +using namespace boost::ut; +using namespace gr; + +using namespace std::chrono_literals; +using enum gr::message::Command; + +template +bool awaitCondition(std::chrono::milliseconds timeout, Condition condition) { + auto start = std::chrono::steady_clock::now(); + while (std::chrono::steady_clock::now() - start < timeout) { + if (condition()) { + return true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return false; +} + +inline auto returnReplyMsg(gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { + auto available = port.streamReader().available(); + expect(gt(available, 0UZ)) << "didn't receive a reply message, caller: " << caller.file_name() << ":" << caller.line() << "\n"; + ReaderSpanLike auto messages = port.streamReader().get(available); + Message result; + + if (expectedEndpoint) { + auto it = std::ranges::find_if(messages, [endpoint = *expectedEndpoint](const auto& message) { return message.endpoint == endpoint; }); + if (it == messages.end()) { + expect(gt(available, 0UZ)) << "didn't receive the expected reply message, caller: " << caller.file_name() << ":" << caller.line() << "\n"; + } else { + result = *it; + } + } else { + result = messages[0]; + } + + expect(messages.consume(messages.size())); + fmt::print("Test got a reply: {}\n", result); + if (expectedEndpoint) { + expect(eq(*expectedEndpoint, result.endpoint)); + } + return result; +}; + +template +auto awaitReplyMsg(auto& graph, std::chrono::milliseconds timeout, gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { + awaitCondition(timeout, [&port, &graph] { + if constexpr (processScheduledMessages) { + graph.processScheduledMessages(); + } + return port.streamReader().available() > 0; + }); + + return returnReplyMsg(port, expectedEndpoint, caller); +}; + +inline auto waitForAReply(gr::MsgPortIn& fromGraph, std::chrono::milliseconds maxWait = 1s, std::source_location currentSource = std::source_location::current()) { + auto startedAt = std::chrono::system_clock::now(); + while (fromGraph.streamReader().available() == 0) { + std::this_thread::sleep_for(100ms); + if (std::chrono::system_clock::now() - startedAt > maxWait) { + break; + } + } + expect(fromGraph.streamReader().available() > 0) << "Caller at" << currentSource.file_name() << ":" << currentSource.line(); + return fromGraph.streamReader().available() > 0; +}; + +inline auto sendEmplaceTestBlockMsg(gr::MsgPortOut& toGraph, gr::MsgPortIn& fromGraph, std::string type, std::string params, property_map properties) { + sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceBlock /* endpoint */, // + {{"type", std::move(type)}, {"parameters", std::move(params)}, {"properties", std::move(properties)}} /* data */); + expect(waitForAReply(fromGraph)) << "didn't receive a reply message"; + + const Message reply = returnReplyMsg(fromGraph); + expect(reply.data.has_value()) << "emplace block failed and returned an error"; + return reply.data.has_value() ? std::get(reply.data.value().at("uniqueName"s)) : std::string{}; +}; + +inline auto sendEmplaceTestEdgeMsg(gr::MsgPortOut& toGraph, gr::MsgPortIn& fromGraph, std::string sourceBlock, std::string sourcePort, std::string destinationBlock, std::string destinationPort) { + gr::property_map data = {{"sourceBlock", sourceBlock}, {"sourcePort", sourcePort}, // + {"destinationBlock", destinationBlock}, {"destinationPort", destinationPort}, // + {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; + sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceEdge /* endpoint */, data /* data */); + if (!waitForAReply(fromGraph)) { + fmt::println("didn't receive a reply message for {}", data); + return false; + } + + const Message reply = returnReplyMsg(fromGraph); + return reply.data.has_value(); +}; + +} // namespace gr::testing + +#endif // include guard diff --git a/core/test/qa_GraphMessages.cpp b/core/test/qa_GraphMessages.cpp index d47f58df6..3c1173fc5 100644 --- a/core/test/qa_GraphMessages.cpp +++ b/core/test/qa_GraphMessages.cpp @@ -1,7 +1,7 @@ #include -#include "gnuradio-4.0/Block.hpp" -#include "gnuradio-4.0/Message.hpp" +#include +#include #include #include #include @@ -10,7 +10,7 @@ #include #include -#include +#include "message_utils.hpp" using namespace std::chrono_literals; using namespace std::string_literals; @@ -55,47 +55,6 @@ const boost::ut::suite<"Graph Formatter Tests"> graphFormatterTests = [] { }; }; -template -bool awaitCondition(std::chrono::milliseconds timeout, Condition condition) { - auto start = std::chrono::steady_clock::now(); - while (std::chrono::steady_clock::now() - start < timeout) { - if (condition()) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - return false; -} - -auto returnReplyMsg(gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { - auto available = port.streamReader().available(); - expect(gt(available, 0UZ)) << "didn't receive a reply message, caller: " << caller.file_name() << ":" << caller.line(); - ReaderSpanLike auto messages = port.streamReader().get(available); - Message result; - - if (expectedEndpoint) { - auto it = std::ranges::find_if(messages, [endpoint = *expectedEndpoint](const auto& message) { return message.endpoint == endpoint; }); - expect(gt(available, 0UZ)) << "didn't receive the expected reply message, caller: " << caller.file_name() << ":" << caller.line(); - } else { - result = messages[0]; - } - - expect(messages.consume(messages.size())); - fmt::print("Test got a reply: {}\n", result); - return result; -}; - -auto awaitReplyMsg(auto& graph, std::chrono::milliseconds timeout, gr::MsgPortIn& port, std::optional expectedEndpoint = {}, std::source_location caller = std::source_location::current()) { - Message msg; - - awaitCondition(timeout, [&port, &graph] { - graph.processScheduledMessages(); - return port.streamReader().available() > 0; - }); - - return returnReplyMsg(port, expectedEndpoint, caller); -}; - const boost::ut::suite NonRunningGraphTests = [] { using namespace std::string_literals; using namespace boost::ut; @@ -348,42 +307,6 @@ const boost::ut::suite RunningGraphTests = [] { expect(eq(ConnectionResult::SUCCESS, toGraph.connect(scheduler.msgIn))); expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromGraph))); - auto waitForAReply = [&](std::chrono::milliseconds maxWait = 1s, std::source_location currentSource = std::source_location::current()) { - auto startedAt = std::chrono::system_clock::now(); - while (fromGraph.streamReader().available() == 0) { - std::this_thread::sleep_for(100ms); - if (std::chrono::system_clock::now() - startedAt > maxWait) { - break; - } - } - expect(fromGraph.streamReader().available() > 0) << "Caller at" << currentSource.file_name() << ":" << currentSource.line(); - return fromGraph.streamReader().available() > 0; - }; - - auto emplaceTestBlock = [&](std::string type, std::string params, property_map properties) { - sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceBlock /* endpoint */, // - {{"type", std::move(type)}, {"parameters", std::move(params)}, {"properties", std::move(properties)}} /* data */); - expect(waitForAReply()) << "didn't receive a reply message"; - - const Message reply = returnReplyMsg(fromGraph); - expect(reply.data.has_value()) << "emplace block failed and returned an error"; - return reply.data.has_value() ? std::get(reply.data.value().at("uniqueName"s)) : std::string{}; - }; - - auto emplaceTestEdge = [&](std::string sourceBlock, std::string sourcePort, std::string destinationBlock, std::string destinationPort) { - property_map data = {{"sourceBlock", sourceBlock}, {"sourcePort", sourcePort}, // - {"destinationBlock", destinationBlock}, {"destinationPort", destinationPort}, // - {"minBufferSize", gr::Size_t()}, {"weight", 0}, {"edgeName", "unnamed edge"}}; - sendMessage(toGraph, "" /* serviceName */, graph::property::kEmplaceEdge /* endpoint */, data /* data */); - if (!waitForAReply()) { - fmt::println("didn't receive a reply message for {}", data); - return false; - } - - const Message reply = returnReplyMsg(fromGraph); - return reply.data.has_value(); - }; - std::expected schedulerRet; auto runScheduler = [&scheduler, &schedulerRet] { schedulerRet = scheduler.runAndWait(); }; @@ -397,8 +320,8 @@ const boost::ut::suite RunningGraphTests = [] { fmt::println("executed basic graph"); // Adding a few blocks - auto multiply1 = emplaceTestBlock("gr::testing::Copy"s, "float"s, property_map{}); - auto multiply2 = emplaceTestBlock("gr::testing::Copy"s, "float"s, property_map{}); + auto multiply1 = sendEmplaceTestBlockMsg(toGraph, fromGraph, "gr::testing::Copy"s, "float"s, property_map{}); + auto multiply2 = sendEmplaceTestBlockMsg(toGraph, fromGraph, "gr::testing::Copy"s, "float"s, property_map{}); scheduler.processScheduledMessages(); for (const auto& block : scheduler.graph().blocks()) { @@ -406,15 +329,15 @@ const boost::ut::suite RunningGraphTests = [] { } expect(eq(scheduler.graph().blocks().size(), 4UZ)) << "should contain sink->multiply1->multiply2->sink"; - expect(emplaceTestEdge(source.unique_name, "out", multiply1, "in")) << "emplace edge source -> multiply1 failed and returned an error"; - expect(emplaceTestEdge(multiply1, "out", multiply2, "in")) << "emplace edge multiply1 -> multiply2 failed and returned an error"; - expect(emplaceTestEdge(multiply2, "out", sink.unique_name, "in")) << "emplace edge multiply2 -> sink failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, source.unique_name, "out", multiply1, "in")) << "emplace edge source -> multiply1 failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, multiply1, "out", multiply2, "in")) << "emplace edge multiply1 -> multiply2 failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, multiply2, "out", sink.unique_name, "in")) << "emplace edge multiply2 -> sink failed and returned an error"; scheduler.processScheduledMessages(); // Get the whole graph { sendMessage(toGraph, "" /* serviceName */, graph::property::kGraphInspect /* endpoint */, property_map{} /* data */); - if (!waitForAReply()) { + if (!waitForAReply(fromGraph)) { fmt::println("didn't receive a reply message for kGraphInspect"); expect(false); } diff --git a/core/test/qa_HierBlock.cpp b/core/test/qa_HierBlock.cpp index 7312261a3..e175289fc 100644 --- a/core/test/qa_HierBlock.cpp +++ b/core/test/qa_HierBlock.cpp @@ -1,202 +1,156 @@ -#include +#include #include #include #include +#include +#include +#include -template() * std::declval())> -struct scale : public gr::Block> { - gr::PortIn original; - gr::PortOut scaled; - GR_MAKE_REFLECTABLE(scale, original, scaled); +#include "message_utils.hpp" - template V> - [[nodiscard]] constexpr auto processOne(V a) const noexcept { - return a * 2; - } -}; +namespace gr::testing { -template() + std::declval())> -struct adder : public gr::Block> { - gr::PortIn addend0; - gr::PortIn addend1; - gr::PortOut sum; - GR_MAKE_REFLECTABLE(adder, addend0, addend1, sum); +using namespace std::chrono_literals; +using namespace std::string_literals; - template V> - [[nodiscard]] constexpr auto processOne(V a, V b) const noexcept { - return a + b; - } -}; -// TODO: These lines is commented out -// HieBlock has to be reimplemented to support recent changes +using namespace boost::ut; +using namespace gr; +using namespace gr::message; -/* template -class HierBlock : public gr::lifecycle::StateMachine>, public gr::BlockModel { -private: - static std::atomic_size_t _unique_id_counter; - const std::size_t _unique_id = _unique_id_counter++; - const std::string _unique_name = fmt::format("multi_adder#{}", _unique_id); - -protected: - using setting_map = std::map>; - std::string _name = "multi_adder"; - std::string _type_name = "multi_adder"; - gr::property_map _meta_information; /// used to store non-graph-processing information like UI block position etc. - bool _input_tags_present = false; - bool _output_tags_changed = false; - std::vector _tags_at_input; - std::vector _tags_at_output; - gr::CtxSettings> _settings = gr::CtxSettings>(*this); - - using in_port_t = gr::PortIn; - - gr::scheduler::Simple<> _scheduler; - - gr::Graph make_graph() { - gr::Graph graph; - auto& adder_block = graph.emplaceBlock>({{"name", "adder"}}); - auto& left_scale_block = graph.emplaceBlock>(); - auto& right_scale_block = graph.emplaceBlock>(); - - assert(gr::ConnectionResult::SUCCESS == graph.connect<"scaled">(left_scale_block).to<"addend0">(adder_block)); - assert(gr::ConnectionResult::SUCCESS == graph.connect<"scaled">(right_scale_block).to<"addend1">(adder_block)); - - _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0, gr::PortType::STREAM>(&left_scale_block), gr::DynamicPort::non_owned_reference_tag{})); - _dynamicInputPorts.emplace_back(gr::DynamicPort(gr::inputPort<0, gr::PortType::STREAM>(&right_scale_block), gr::DynamicPort::non_owned_reference_tag{})); - _dynamicOutputPorts.emplace_back(gr::DynamicPort(gr::outputPort<0, gr::PortType::STREAM>(&adder_block), gr::DynamicPort::non_owned_reference_tag{})); - - _dynamicPortsLoaded = true; - return graph; - } - +struct DemoSubGraph : public gr::Graph { public: - HierBlock() : _scheduler(make_graph()) {}; - - ~HierBlock() override = default; - - void init(std::shared_ptr progress, std::shared_ptr ioThreadPool) override {} - - [[nodiscard]] std::string_view name() const override { return _unique_name; } + gr::testing::Copy* pass1 = nullptr; + gr::testing::Copy* pass2 = nullptr; - std::string_view typeName() const override { return _type_name; } + DemoSubGraph(const gr::property_map& init) : gr::Graph(init) { + pass1 = std::addressof(emplaceBlock>()); + pass2 = std::addressof(emplaceBlock>()); - constexpr bool isBlocking() const noexcept override { return false; } - - [[nodiscard]] std::expected changeState(gr::lifecycle::State newState) noexcept override { return this->changeStateTo(newState); } + std::ignore = gr::Graph::connect(*pass1, PortDefinition("out"), *pass2, PortDefinition("in")); + } +}; - constexpr gr::lifecycle::State state() const noexcept override { return gr::lifecycle::StateMachine>::state(); } +const boost::ut::suite ExportPortsTests_ = [] { + using namespace std::string_literals; + using namespace boost::ut; + using namespace gr; + using enum gr::message::Command; - gr::work::Result work(std::size_t requested_work) override { - if (state() == gr::lifecycle::State::STOPPED) { - return {requested_work, 0UL, gr::work::Status::DONE}; - } - _scheduler.runAndWait(); - bool ok = this->changeStateTo(gr::lifecycle::State::STOPPED).has_value(); - return {requested_work, requested_work, ok ? gr::work::Status::DONE : gr::work::Status::ERROR}; - } + gr::scheduler::Simple scheduler{gr::Graph()}; + auto& graph = scheduler.graph(); + fmt::print(">>> Starting test with root graph {} <<<\n", graph.unique_name); - gr::work::Status draw(const property_map& config = {}) override { return gr::work::Status::OK; } + // Basic source and sink + auto& source = graph.emplaceBlock>(); + auto& sink = graph.emplaceBlock>(); - void processScheduledMessages() override { - // TODO - } + // Subgraph with a single block inside + using SubGraphType = GraphWrapper>; + auto& subGraph = graph.addBlock(std::make_unique()); + auto* subGraphDirect = dynamic_cast(&subGraph); - void* raw() override { return this; } + // Connecting the message ports + gr::MsgPortOut toGraph; + gr::MsgPortIn fromGraph; + expect(eq(ConnectionResult::SUCCESS, toGraph.connect(scheduler.msgIn))); + expect(eq(ConnectionResult::SUCCESS, scheduler.msgOut.connect(fromGraph))); - void setName(std::string name) noexcept override {} + std::expected schedulerRet; + auto runScheduler = [&scheduler, &schedulerRet] { schedulerRet = scheduler.runAndWait(); }; - [[nodiscard]] gr::property_map& metaInformation() noexcept override { return _meta_information; } + std::thread schedulerThread1(runScheduler); - [[nodiscard]] const gr::property_map& metaInformation() const override { return _meta_information; } + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::RUNNING; })) << "scheduler thread up and running w/ timeout"; + expect(scheduler.state() == lifecycle::State::RUNNING) << "scheduler thread up and running"; - [[nodiscard]] gr::SettingsBase& settings() override { return _settings; } + for (const auto& block : graph.blocks()) { + fmt::println("block in list: {} - state() : {}", block->name(), magic_enum::enum_name(block->state())); + } + expect(eq(graph.blocks().size(), 3UZ)) << "should contain source->(copy->copy)->sink"; + + // Export ports from the sub-graph + + sendMessage(toGraph, subGraph.uniqueName(), graph::property::kSubgraphExportPort, + property_map{ + {"uniqueBlockName"s, subGraphDirect->blockRef().pass2->unique_name}, // + {"portDirection"s, "output"}, // + {"portName"s, "out"}, // + {"exportFlag"s, true} // + }); + sendMessage(toGraph, subGraph.uniqueName(), graph::property::kSubgraphExportPort, + property_map{ + {"uniqueBlockName"s, subGraphDirect->blockRef().pass1->unique_name}, // + {"portDirection"s, "input"}, // + {"portName"s, "in"}, // + {"exportFlag"s, true} // + }); + if (!waitForAReply(fromGraph)) { + fmt::println("didn't receive a reply message for kSubgraphExportPort"); + expect(false); + } - [[nodiscard]] const gr::SettingsBase& settings() const override { return _settings; } + // Make connections - [[nodiscard]] std::string_view uniqueName() const override { return _unique_name; } -}; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, source.unique_name, "out", std::string(subGraph.uniqueName()), "in")) << "emplace edge source -> group failed and returned an error"; + expect(sendEmplaceTestEdgeMsg(toGraph, fromGraph, std::string(subGraph.uniqueName()), "out", sink.unique_name, "in")) << "emplace edge multiply2 -> sink failed and returned an error"; -static_assert(gr::BlockLike>); + // Get the whole graph + { + sendMessage(toGraph, graph.unique_name /* serviceName */, graph::property::kGraphInspect /* endpoint */, property_map{} /* data */); + if (!waitForAReply(fromGraph)) { + fmt::println("didn't receive a reply message for kGraphInspect"); + expect(false); + } -template -std::atomic_size_t HierBlock::_unique_id_counter = 0; -*/ -template -struct fixed_source : public gr::Block> { - gr::PortOut> out; - std::size_t remaining_events_count; + const Message reply = returnReplyMsg(fromGraph); + expect(reply.data.has_value()); - GR_MAKE_REFLECTABLE(fixed_source, out, remaining_events_count); + const auto& data = reply.data.value(); + const auto& children = std::get(data.at("children"s)); + expect(eq(children.size(), 3UZ)); - T value = 1; + const auto& edges = std::get(data.at("edges"s)); + expect(eq(edges.size(), 2UZ)); - gr::work::Result work(std::size_t requested_work) { - if (this->state() == gr::lifecycle::State::STOPPED) { - return {requested_work, 0UL, gr::work::Status::DONE}; - } + std::size_t subGraphInConnections = 0UZ; + std::size_t subGraphOutConnections = 0UZ; - if (remaining_events_count != 0) { - auto& writer = out.streamWriter(); - auto data = writer.reserve(1UZ); - data[0] = value; - data.publish(1UZ); + // Check that the subgraph is connected properly - remaining_events_count--; - if (remaining_events_count == 0) { - fmt::print("Last value sent was {}\n", value); + for (const auto& [index, edge_] : edges) { + const auto& edge = std::get(edge_); + if (std::get(edge.at("destinationBlock")) == subGraph.uniqueName()) { + subGraphInConnections++; } - - value += 1; - return {requested_work, 1UL, gr::work::Status::OK}; - } else { - // TODO: Investigate what schedulers do when there is an event written, but we return DONE - if (auto ret = this->changeStateTo(gr::lifecycle::State::STOPPED); !ret) { - this->emitErrorMessage("requested STOPPED", ret.error()); + if (std::get(edge.at("sourceBlock")) == subGraph.uniqueName()) { + subGraphOutConnections++; } - this->publishTag({{gr::tag::END_OF_STREAM, true}}, 0); - return {requested_work, 1UL, gr::work::Status::DONE}; } + expect(eq(subGraphInConnections, 1UZ)); + expect(eq(subGraphOutConnections, 1UZ)); + + // Check subgraph topology + const auto& subGraphData = std::get(children.at(std::string(subGraph.uniqueName()))); + const auto& subGraphChildren = std::get(subGraphData.at("children"s)); + const auto& subGraphEdges = std::get(subGraphData.at("edges"s)); + expect(eq(subGraphChildren.size(), 2UZ)); + expect(eq(subGraphEdges.size(), 1UZ)); } -}; - -template -struct cout_sink : public gr::Block> { - gr::PortIn> in; - std::size_t remaining = 0; - GR_MAKE_REFLECTABLE(cout_sink, in, remaining); - - void processOne(T value) { - remaining--; - if (remaining == 0) { - std::cerr << "last value was: " << value << "\n"; - } + // Stopping scheduler + scheduler.requestStop(); + schedulerThread1.join(); + if (!schedulerRet.has_value()) { + expect(false) << fmt::format("scheduler.runAndWait() failed:\n{}\n", schedulerRet.error()); } -}; - -gr::Graph make_graph(std::size_t /*events_count*/) { - gr::Graph graph; - // auto& source_leftBlock = graph.emplaceBlock>({{"remaining_events_count", events_count}}); - // auto& source_rightBlock = graph.emplaceBlock>({{"remaining_events_count", events_count}}); - // auto& sink = graph.emplaceBlock>({{"remaining", events_count}}); - - // auto& hier = graph.addBlock(std::make_unique>()); - - // graph.connect(source_leftBlock, 0, hier, 0); - // graph.connect(source_rightBlock, 0, hier, 1); - // graph.connect(hier, 0, sink, 0); - - return graph; -} + // return to initial state + expect(scheduler.changeStateTo(lifecycle::State::INITIALISED).has_value()) << "could switch to INITIALISED?"; + expect(awaitCondition(1s, [&scheduler] { return scheduler.state() == lifecycle::State::INITIALISED; })) << "scheduler INITIALISED w/ timeout"; + expect(scheduler.state() == lifecycle::State::INITIALISED) << fmt::format("scheduler INITIALISED - actual: {}\n", magic_enum::enum_name(scheduler.state())); +}; -int main() { - // TODO: These lines is commented because of failing tests - // TODO: HierBlock as it is implemented now does not support tag handling and can not be used with new DONE mechanism via EOS tag - // TODO: Review HierBlock implementation - // auto thread_pool = std::make_shared("custom pool", gr::thread_pool::CPU_BOUND, 2, 2); // use custom pool to limit number of threads for emscripten - // gr::scheduler::Simple scheduler(make_graph(10), thread_pool); - // scheduler.runAndWait(); -} +} // namespace gr::testing +int main() { /* tests are statically executed */ } diff --git a/core/test/qa_grc.cpp b/core/test/qa_grc.cpp index 23f719275..a9b984b81 100644 --- a/core/test/qa_grc.cpp +++ b/core/test/qa_grc.cpp @@ -188,16 +188,20 @@ const boost::ut::suite GrcTests = [] { parameters: total_count: !!uint32 100 unknown_property: 42 + connections: - [main_source, 0, multiplier, 0] - [multiplier, 0, counter, 0] - [counter, 0, sink, 0] )"; - const auto context = getContext(); - const auto graphSrc = ymlDecodeEncode(pluginsTestGrc); - auto graph = gr::loadGrc(context->loader, graphSrc); - auto graphSavedSrc = gr::saveGrc(graph); + const auto context = getContext(); + const auto graphSrc = ymlDecodeEncode(pluginsTestGrc); + auto graph = gr::loadGrc(context->loader, graphSrc); + + expect(eq(graph.blocks().size(), 4UZ)); + + auto graphSavedSrc = gr::saveGrc(graph); expect(checkAndPrintMissingLines(graphSrc, graphSavedSrc)); @@ -208,6 +212,62 @@ const boost::ut::suite GrcTests = [] { expect(false); } }; + + "Basic graph and subgraph loading and storing using plugins"_test = [] { + try { + using namespace gr; + + constexpr std::string_view pluginsTestGrc = R"( +blocks: + - name: main_source + id: good::fixed_source + parameters: + event_count: !!uint32 100 + unknown_property: 42 + - name: multiplier + id: good::multiply + - name: counter + id: builtin_counter + - name: sink + id: good::cout_sink + parameters: + total_count: !!uint32 100 + unknown_property: 42 + - name: chained_multiplier + id: SUBGRAPH + graph: + blocks: + - name: multiplier1 + id: good::multiply + - name: multiplier2 + id: good::multiply + connections: + - [multiplier1, 0, multiplier2, 0] + exported_ports: + - [multiplier1, INPUT, in] + - [multiplier2, OUTPUT, out] + +connections: + - [main_source, 0, multiplier, 0] + - [multiplier, 0, chained_multiplier, 0] + - [chained_multiplier, 0, counter, 0] + - [counter, 0, sink, 0] +)"; + + const auto context = getContext(); + const auto graphSrc = ymlDecodeEncode(pluginsTestGrc); + auto graph = gr::loadGrc(context->loader, graphSrc); + + expect(eq(graph.blocks().size(), 5UZ)); + + auto graphSavedSrc = gr::saveGrc(graph); + + expect(checkAndPrintMissingLines(graphSrc, graphSavedSrc)); + } catch (const std::string& e) { + fmt::println(std::cerr, "Unexpected exception: {}", e); + expect(false); + } + }; #endif "Save and load"_test = [] {