From c030f65f7fe9af892ce3490d69afdc8c9727e2b5 Mon Sep 17 00:00:00 2001 From: Christian Berger Date: Tue, 8 May 2018 11:01:51 +0200 Subject: [PATCH] * Upgrade to libcluon v0.0.89 Signed-off-by: Christian Berger --- CMakeLists.txt | 2 +- README.md | 2 +- ...v0.0.73.hpp => cluon-complete-v0.0.89.hpp} | 1107 ++++++++++++++--- 3 files changed, 962 insertions(+), 149 deletions(-) rename src/{cluon-complete-v0.0.73.hpp => cluon-complete-v0.0.89.hpp} (93%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7163b75..02c3ced 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,7 +20,7 @@ project(opendlv-device-gps-pos) ################################################################################ # Defining the relevant versions of OpenDLV Standard Message Set and libcluon. set(OPENDLV_STANDARD_MESSAGE_SET opendlv-standard-message-set-v0.9.1.odvd) -set(CLUON_COMPLETE cluon-complete-v0.0.73.hpp) +set(CLUON_COMPLETE cluon-complete-v0.0.89.hpp) ################################################################################ # This project requires C++14 or newer. diff --git a/README.md b/README.md index b1386b9..40edcf4 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ the messages according to OpenDLV Standard Message Set into session 111 in Google Protobuf format, simply start it as follows: ``` -docker run --init --rm --net=host chalmersrevere/opendlv-device-gps-pos-multi:v0.0.2 opendlv-device-gps-pos --pos_ip=192.168.1.77 --pos_port=5602 --cid=111 --verbose +docker run --init --rm --net=host chalmersrevere/opendlv-device-gps-pos-multi:v0.0.3 opendlv-device-gps-pos --pos_ip=192.168.1.77 --pos_port=5602 --cid=111 --verbose ``` ## Build from sources on the example of Ubuntu 16.04 LTS diff --git a/src/cluon-complete-v0.0.73.hpp b/src/cluon-complete-v0.0.89.hpp similarity index 93% rename from src/cluon-complete-v0.0.73.hpp rename to src/cluon-complete-v0.0.89.hpp index 906fd6f..32e9437 100644 --- a/src/cluon-complete-v0.0.73.hpp +++ b/src/cluon-complete-v0.0.89.hpp @@ -1,6 +1,6 @@ // This is an auto-generated header-only single-file distribution of libcluon. -// Date: Tue, 17 Apr 2018 23:02:52 +0200 -// Version: 0.0.73 +// Date: Mon, 07 May 2018 12:05:07 +0200 +// Version: 0.0.89 // // // Implementation of N4562 std::experimental::any (merged into C++17) for C++11 compilers. @@ -264,7 +264,10 @@ class any final static void swap(storage_union& lhs, storage_union& rhs) noexcept { - std::swap(reinterpret_cast(lhs.stack), reinterpret_cast(rhs.stack)); + storage_union tmp_storage; + move(rhs, tmp_storage); + move(lhs, rhs); + move(tmp_storage, lhs); } }; @@ -4976,6 +4979,65 @@ class LIBCLUON_API MessageParser { }; } // namespace cluon +#endif +/* + * Copyright (C) 2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef CLUON_TERMINATEHANDLER_HPP +#define CLUON_TERMINATEHANDLER_HPP + +//#include "cluon/cluon.hpp" + +#include +#include + +namespace cluon { + +class LIBCLUON_API TerminateHandler { + private: + TerminateHandler(const TerminateHandler &) = delete; + TerminateHandler(TerminateHandler &&) = delete; + TerminateHandler &operator=(const TerminateHandler &) = delete; + TerminateHandler &operator=(TerminateHandler &&) = delete; + + public: + /** + * Define singleton behavior using static initializer (cf. http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2011/n3242.pdf, Sec. 6.7.4). + * @return singleton for an instance of this class. + */ + static TerminateHandler &instance() noexcept { + static TerminateHandler instance; + return instance; + } + + ~TerminateHandler() = default; + + public: + std::atomic isTerminated{false}; + + private: + TerminateHandler() noexcept; + +#ifndef WIN32 + struct sigaction m_signalHandler {}; +#endif +}; +} // namespace cluon + #endif /* * Copyright (C) 2017-2018 Christian Berger @@ -5034,7 +5096,6 @@ namespace cluon { // clang-format off #ifdef WIN32 #include // for WSAStartUp - #include // for struct sockaddr_in #include // for SOCKET #else #include @@ -5132,7 +5193,6 @@ class LIBCLUON_API UDPSender { // clang-format off #ifdef WIN32 #include // for WSAStartUp - #include // for struct sockaddr_in #include // for SOCKET #else #include @@ -5230,13 +5290,19 @@ class LIBCLUON_API UDPReceiver { struct sockaddr_in m_receiveFromAddress {}; struct ip_mreq m_mreq {}; bool m_isMulticast{false}; + std::atomic m_readFromSocketThreadRunning{false}; std::thread m_readFromSocketThread{}; + private: + std::function m_delegate{}; + + private: std::atomic m_pipelineThreadRunning{false}; std::thread m_pipelineThread{}; std::mutex m_pipelineMutex{}; std::condition_variable m_pipelineCondition{}; + class PipelineEntry { public: std::string m_data; @@ -5244,8 +5310,6 @@ class LIBCLUON_API UDPReceiver { std::chrono::system_clock::time_point m_sampleTime; }; std::deque m_pipeline{}; - - std::function m_delegate{}; }; } // namespace cluon @@ -5275,7 +5339,6 @@ class LIBCLUON_API UDPReceiver { // clang-format off #ifdef WIN32 #include // for WSAStartUp - #include // for struct sockaddr_in #include // for SOCKET #else #include @@ -5389,8 +5452,10 @@ class LIBCLUON_API TCPConnection { mutable std::mutex m_socketMutex{}; int32_t m_socket{-1}; struct sockaddr_in m_address {}; + std::atomic m_readFromSocketThreadRunning{false}; std::thread m_readFromSocketThread{}; + std::function m_newDataDelegate{}; std::function m_connectionLostDelegate{}; }; @@ -6203,6 +6268,7 @@ class LIBCLUON_API FromJSONVisitor { #ifndef CLUON_TOJSONVISITOR_HPP #define CLUON_TOJSONVISITOR_HPP +//#include "cluon/any/any.hpp" //#include "cluon/cluon.hpp" #include @@ -6272,9 +6338,12 @@ class LIBCLUON_API ToJSONVisitor { void visit(uint32_t &id, std::string &&typeName, std::string &&name, T &value) noexcept { (void)typeName; if ((0 == m_mask.count(id)) || m_mask[id]) { - ToJSONVisitor jsonVisitor; - value.accept(jsonVisitor); - m_buffer << '\"' << name << '\"' << ':' << jsonVisitor.json() << ',' << '\n'; + try { + ToJSONVisitor jsonVisitor; + value.accept(jsonVisitor); + m_buffer << '\"' << name << '\"' << ':' << jsonVisitor.json() << ',' << '\n'; + } catch (const linb::bad_any_cast &) { // LCOV_EXCL_LINE + } } } @@ -6757,7 +6826,6 @@ class LIBCLUON_API ToMsgPackVisitor { //#include "cluon/ToProtoVisitor.hpp" //#include "cluon/cluonDataStructures.hpp" -#include #include #include #include @@ -6777,24 +6845,30 @@ namespace cluon { inline std::string serializeEnvelope(cluon::data::Envelope &&envelope) noexcept { std::string dataToSend; { + std::stringstream sstr; + cluon::ToProtoVisitor protoEncoder; envelope.accept(protoEncoder); const std::string tmp{protoEncoder.encodedData()}; uint32_t length{static_cast(tmp.size())}; + length <<= 8; length = htole32(length); - // Add OpenDaVINCI header. - std::array header; - header[0] = static_cast(0x0D); - header[1] = static_cast(0xA4); - header[2] = *(reinterpret_cast(&length) + 0); - header[3] = *(reinterpret_cast(&length) + 1); - header[4] = *(reinterpret_cast(&length) + 2); - - std::stringstream sstr; - sstr.write(header.data(), static_cast(header.size())); + // Add OD4 header. + constexpr unsigned char OD4_HEADER_BYTE0 = 0x0D; + constexpr unsigned char OD4_HEADER_BYTE1 = 0xA4; + sstr.put(static_cast(OD4_HEADER_BYTE0)); + auto posByte1 = sstr.tellp(); + sstr.write(reinterpret_cast(&length), static_cast(sizeof(uint32_t))); + auto posByte5 = sstr.tellp(); + sstr.seekp(posByte1); + sstr.put(static_cast(OD4_HEADER_BYTE1)); + sstr.seekp(posByte5); + + // Write payload. sstr.write(tmp.data(), static_cast(tmp.size())); + dataToSend = sstr.str(); } return dataToSend; @@ -7765,9 +7839,12 @@ class LIBCLUON_API OD4Session { * along with this program. If not, see . */ +// clang-format off + #ifndef CLUON_PLAYER_HPP #define CLUON_PLAYER_HPP +//#include "cluon/cluon.hpp" //#include "cluon/cluonDataStructures.hpp" #include @@ -7785,13 +7862,13 @@ namespace cluon { class LIBCLUON_API IndexEntry { public: - IndexEntry() noexcept; + IndexEntry() = default; IndexEntry(const int64_t &sampleTimeStamp, const uint64_t &filePosition) noexcept; public: - int64_t m_sampleTimeStamp; - uint64_t m_filePosition; - bool m_available; + int64_t m_sampleTimeStamp{0}; + uint64_t m_filePosition{0}; + bool m_available{0}; }; class LIBCLUON_API Player { @@ -7830,12 +7907,7 @@ class LIBCLUON_API Player { /** * @return real delay in microseconds to be waited before the next cluon::data::Envelope should be delivered. */ - uint32_t getDelay() const noexcept; - - /** - * @return delay in microseconds to be waited before the next cluon::data::Envelope should be delivered correct by the internal processing time. - */ - uint32_t getCorrectedDelay() const noexcept; + uint32_t delay() const noexcept; /** * @return true if there is more data to replay. @@ -7852,7 +7924,7 @@ class LIBCLUON_API Player { /** * @return total amount of cluon::data::Envelopes in the .rec file. */ - uint32_t getTotalNumberOfEnvelopesInRecFile() const noexcept; + uint32_t totalNumberOfEnvelopesInRecFile() const noexcept; private: // Internal methods without Lock. @@ -7929,7 +8001,6 @@ class LIBCLUON_API Player { uint64_t m_numberOfReturnedEnvelopesInTotal; uint32_t m_delay; - uint32_t m_correctedDelay; private: /** @@ -7974,6 +8045,127 @@ class LIBCLUON_API Player { #endif +// clang-format on +/* + * Copyright (C) 2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef CLUON_SHAREDMEMORY_HPP +#define CLUON_SHAREDMEMORY_HPP + +//#include "cluon/cluon.hpp" + +// clang-format off +#ifdef WIN32 + #include +#else + #include +#endif +// clang-format on + +#include +#include +#include + +namespace cluon { + +class LIBCLUON_API SharedMemory { + private: + SharedMemory(const SharedMemory &) = delete; + SharedMemory(SharedMemory &&) = delete; + SharedMemory &operator=(const SharedMemory &) = delete; + SharedMemory &operator=(SharedMemory &&) = delete; + + public: + /** + * Constructor. + * + * @param name Name of the shared memory area; must start with / and must not + * be longer than NAME_MAX (255) on POSIX or PATH_MAX on WIN32. If the name + * is missing a leading '/' or is longer than 255, it will be adjusted accordingly. + * @param size of the shared memory area to create; if size is 0, the class tries to attach to an existing area. + */ + SharedMemory(const std::string &name, uint32_t size = 0) noexcept; + ~SharedMemory() noexcept; + + /** + * This method locks the shared memory area. + */ + void lock() noexcept; + + /** + * This method unlocks the shared memory area. + */ + void unlock() noexcept; + + /** + * This method waits for being notified from the shared condition. + */ + void wait() noexcept; + + /** + * This method notifies all threads waiting on the shared condition. + */ + void notifyAll() noexcept; + + /** + * @return Pointer to the raw shared memory or nullptr in case of invalid shared memory. + */ + char *data() noexcept; + + /** + * @return The size of the shared memory area. + */ + uint32_t size() const noexcept; + + /** + * @return Name the shared memory area. + */ + const std::string name() const noexcept; + + /** + * @return True if the shared memory area is existing and usable. + */ + bool valid() noexcept; + + private: + std::string m_name{""}; + uint32_t m_size{0}; + char *m_sharedMemory{nullptr}; + char *m_userAccessibleSharedMemory{nullptr}; + bool m_hasOnlyAttachedToSharedMemory{false}; + +#ifdef WIN32 + HANDLE __conditionEvent{nullptr}; + HANDLE __mutex{nullptr}; + HANDLE __sharedMemory{nullptr}; +#else + int32_t m_fd{-1}; + struct SharedMemoryHeader { + uint32_t __size; + pthread_mutex_t __mutex; + pthread_cond_t __condition; + }; + SharedMemoryHeader *m_sharedMemoryHeader{nullptr}; +#endif +}; +} // namespace cluon + +#endif + /* * THIS IS AN AUTO-GENERATED FILE. DO NOT MODIFY AS CHANGES MIGHT BE OVERWRITTEN! */ @@ -8622,7 +8814,7 @@ inline std::pair, MessageParser::MessageParserErrorCode std::pair, MessageParserErrorCodes> retVal{}; std::string inputWithoutComments{input}; try { - const std::string MATCH_COMMENTS_REGEX = R"((//.*)|/\*([^*]|[\r\n]|(\*+([^*/]|[\r\n])))*\*+/)"; + const std::string MATCH_COMMENTS_REGEX = R"(/\*([\s\S]*?)\*/|//.*)"; inputWithoutComments = std::regex_replace(input, std::regex(MATCH_COMMENTS_REGEX), ""); // NOLINT } catch (std::regex_error &) { // LCOV_EXCL_LINE } catch (std::bad_cast &) { // LCOV_EXCL_LINE @@ -8652,6 +8844,65 @@ inline std::pair, MessageParser::MessageParserErrorCode } return retVal; } +} // namespace cluon +/* + * Copyright (C) 2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +//#include "cluon/TerminateHandler.hpp" + +#include +#include +#include + +namespace cluon { + +inline void cluon_handleExit() { + TerminateHandler::instance().isTerminated.store(true); +} + +inline void cluon_handleSignal(int32_t /*signal*/) { // LCOV_EXCL_LINE + TerminateHandler::instance().isTerminated.store(true); // LCOV_EXCL_LINE +} + +inline TerminateHandler::TerminateHandler() noexcept { + if (0 != std::atexit(cluon_handleExit)) { + std::cerr << "[cluon::TerminateHandler] Failed to register cluon_exitHandler()." << std::endl; // LCOV_EXCL_LINE + } + +#ifdef WIN32 + if (SIG_ERR == ::signal(SIGINT, &cluon_handleSignal)) { + std::cerr << "[cluon::TerminateHandler] Failed to register signal SIGINT." << std::endl; + } + if (SIG_ERR == ::signal(SIGTERM, &cluon_handleSignal)) { + std::cerr << "[cluon::TerminateHandler] Failed to register signal SIGTERM." << std::endl; + } +#else + std::memset(&m_signalHandler, 0, sizeof(m_signalHandler)); + m_signalHandler.sa_handler = &cluon_handleSignal; + + if (::sigaction(SIGINT, &m_signalHandler, NULL) < 0) { + std::cerr << "[cluon::TerminateHandler] Failed to register signal SIGINT." << std::endl; // LCOV_EXCL_LINE + } + if (::sigaction(SIGTERM, &m_signalHandler, NULL) < 0) { + std::cerr << "[cluon::TerminateHandler] Failed to register signal SIGTERM." << std::endl; // LCOV_EXCL_LINE + } +#endif +} + } // namespace cluon /* * Copyright (C) 2017-2018 Christian Berger @@ -8787,6 +9038,7 @@ inline std::pair UDPSender::send(std::string &&data) const noe */ //#include "cluon/UDPReceiver.hpp" +//#include "cluon/TerminateHandler.hpp" //#include "cluon/UDPPacketSizeConstraints.hpp" // clang-format off @@ -8892,7 +9144,7 @@ inline UDPReceiver::UDPReceiver(const std::string &receiveFromAddress, u_long nonBlocking = 1; m_isBlockingSocket = !(NO_ERROR == ::ioctlsocket(m_socket, FIONBIO, &nonBlocking)); #else - const int FLAGS = ::fcntl(m_socket, F_GETFL, 0); + const int FLAGS = ::fcntl(m_socket, F_GETFL, 0); m_isBlockingSocket = !(0 == ::fcntl(m_socket, F_SETFL, FLAGS | O_NONBLOCK)); #endif } @@ -8903,7 +9155,7 @@ inline UDPReceiver::UDPReceiver(const std::string &receiveFromAddress, u_long nonBlocking = 1; m_isBlockingSocket = !(NO_ERROR == ::ioctlsocket(m_socket, FIONBIO, &nonBlocking)); #else - const int FLAGS = ::fcntl(m_socket, F_GETFL, 0); + const int FLAGS = ::fcntl(m_socket, F_GETFL, 0); m_isBlockingSocket = !(0 == ::fcntl(m_socket, F_SETFL, FLAGS | O_NONBLOCK)); #endif } @@ -8917,7 +9169,7 @@ inline UDPReceiver::UDPReceiver(const std::string &receiveFromAddress, auto errorCode = WSAGetLastError(); #else auto errorCode = errno; // LCOV_EXCL_LINE -#endif // LCOV_EXCL_LINE +#endif // LCOV_EXCL_LINE std::cerr << "[cluon::UDPReceiver] Error while trying to set SO_RCVBUF to " << recvBuffer << ": " << errorCode << std::endl; // LCOV_EXCL_LINE } } @@ -9040,7 +9292,7 @@ inline void UDPReceiver::closeSocket(int errorCode) noexcept { } inline bool UDPReceiver::isRunning() const noexcept { - return m_readFromSocketThreadRunning.load(); + return (m_readFromSocketThreadRunning.load() && !TerminateHandler::instance().isTerminated.load()); } inline void UDPReceiver::processPipeline() noexcept { @@ -9050,7 +9302,7 @@ inline void UDPReceiver::processPipeline() noexcept { while (m_pipelineThreadRunning.load()) { std::unique_lock lck(m_pipelineMutex); // Wait until the thread should stop or data is available. - m_pipelineCondition.wait(lck, [this]{return (!this->m_pipelineThreadRunning.load() || !this->m_pipeline.empty());}); + m_pipelineCondition.wait(lck, [this] { return (!this->m_pipelineThreadRunning.load() || !this->m_pipeline.empty()); }); // The condition will automatically lock the mutex after waking up. // As we are locking per entry, we need to unlock the mutex first. @@ -9090,12 +9342,7 @@ inline void UDPReceiver::readFromSocket() noexcept { - static_cast(UDPPacketSizeConstraints::SIZE_UDP_HEADER); std::array buffer{}; - // Define timeout for select system call. struct timeval timeout {}; - timeout.tv_sec = 1; - timeout.tv_usec = 0; -// timeout.tv_sec = 0; -// timeout.tv_usec = 20 * 1000; // Check for new data with 50Hz. // Define file descriptor set to watch for read operations. fd_set setOfFiledescriptorsToReadFrom{}; @@ -9111,6 +9358,12 @@ inline void UDPReceiver::readFromSocket() noexcept { m_readFromSocketThreadRunning.store(true); while (m_readFromSocketThreadRunning.load()) { + // Define timeout for select system call. The timeval struct must be + // reinitialized for every select call as it might be modified containing + // the actual time slept. + timeout.tv_sec = 0; + timeout.tv_usec = 20 * 1000; // Check for new data with 50Hz. + FD_ZERO(&setOfFiledescriptorsToReadFrom); // NOLINT FD_SET(m_socket, &setOfFiledescriptorsToReadFrom); // NOLINT ::select(m_socket + 1, &setOfFiledescriptorsToReadFrom, nullptr, nullptr, &timeout); @@ -9153,8 +9406,8 @@ inline void UDPReceiver::readFromSocket() noexcept { // Create a pipeline entry to be processed concurrently. { PipelineEntry pe; - pe.m_data = std::string(buffer.data(), static_cast(bytesRead)); - pe.m_from = std::string(remoteAddress.data()) + ':' + std::to_string(RECVFROM_PORT); + pe.m_data = std::string(buffer.data(), static_cast(bytesRead)); + pe.m_from = std::string(remoteAddress.data()) + ':' + std::to_string(RECVFROM_PORT); pe.m_sampleTime = timestamp; // Store entry in queue. @@ -9166,13 +9419,9 @@ inline void UDPReceiver::readFromSocket() noexcept { totalBytesRead += bytesRead; } } while (!m_isBlockingSocket && (bytesRead > 0)); - } else { - // Let the operating system yield other threads. - using namespace std::literals::chrono_literals; // NOLINT - std::this_thread::sleep_for(1ms); } - if (totalBytesRead > 0) { + if (static_cast(totalBytesRead) > 0) { m_pipelineCondition.notify_all(); } } @@ -9196,6 +9445,7 @@ inline void UDPReceiver::readFromSocket() noexcept { */ //#include "cluon/TCPConnection.hpp" +//#include "cluon/TerminateHandler.hpp" // clang-format off #ifdef WIN32 @@ -9323,7 +9573,7 @@ inline void TCPConnection::closeSocket(int errorCode) noexcept { } inline bool TCPConnection::isRunning() const noexcept { - return m_readFromSocketThreadRunning.load(); + return (m_readFromSocketThreadRunning.load() && !TerminateHandler::instance().isTerminated.load()); } inline std::pair TCPConnection::send(std::string &&data) const noexcept { @@ -9355,10 +9605,7 @@ inline void TCPConnection::readFromSocket() noexcept { constexpr uint16_t MAX_LENGTH{65535}; std::array buffer{}; - // Define timeout for select system call. struct timeval timeout {}; - timeout.tv_sec = 0; - timeout.tv_usec = 20 * 1000; // Check for new data with 50Hz. // Define file descriptor set to watch for read operations. fd_set setOfFiledescriptorsToReadFrom{}; @@ -9368,6 +9615,12 @@ inline void TCPConnection::readFromSocket() noexcept { m_readFromSocketThreadRunning.store(true); while (m_readFromSocketThreadRunning.load()) { + // Define timeout for select system call. The timeval struct must be + // reinitialized for every select call as it might be modified containing + // the actual time slept. + timeout.tv_sec = 0; + timeout.tv_usec = 20 * 1000; // Check for new data with 50Hz. + FD_ZERO(&setOfFiledescriptorsToReadFrom); FD_SET(m_socket, &setOfFiledescriptorsToReadFrom); ::select(m_socket + 1, &setOfFiledescriptorsToReadFrom, nullptr, nullptr, &timeout); @@ -9400,10 +9653,6 @@ inline void TCPConnection::readFromSocket() noexcept { // Call newDataDelegate. m_newDataDelegate(std::string(buffer.data(), static_cast(bytesRead)), timestamp); } - } else { - // Let the operating system yield other threads. - using namespace std::literals::chrono_literals; - std::this_thread::sleep_for(1ms); } } } @@ -9621,9 +9870,6 @@ inline uint64_t ToProtoVisitor::encodeKey(uint32_t fieldIdentifier, uint8_t prot } inline std::size_t ToProtoVisitor::toVarInt(std::ostream &out, uint64_t v) noexcept { - // VarInt is little endian. - v = htole64(v); - // Minimum size is of the encoded data. std::size_t size{1}; uint8_t b{0}; @@ -9773,19 +10019,27 @@ inline uint64_t FromProtoVisitor::ProtoKeyValue::valueAsVarInt() const noexcept } inline float FromProtoVisitor::ProtoKeyValue::valueAsFloat() const noexcept { - float retVal{0}; + union FloatValue { + uint32_t uint32Value; + float floatValue{0}; + } retVal; if (!m_value.empty() && (length() == sizeof(float)) && (m_value.size() == sizeof(float)) && (type() == ProtoConstants::FOUR_BYTES)) { - std::memmove(&retVal, &m_value[0], sizeof(float)); + std::memmove(&retVal.uint32Value, &m_value[0], sizeof(float)); + retVal.uint32Value = le32toh(retVal.uint32Value); } - return retVal; + return retVal.floatValue; } inline double FromProtoVisitor::ProtoKeyValue::valueAsDouble() const noexcept { - double retVal{0}; + union DoubleValue { + uint64_t uint64Value; + double doubleValue{0}; + } retVal; if (!m_value.empty() && (length() == sizeof(double)) && (m_value.size() == sizeof(double)) && (type() == ProtoConstants::EIGHT_BYTES)) { - std::memmove(&retVal, &m_value[0], sizeof(double)); + std::memmove(&retVal.uint64Value, &m_value[0], sizeof(double)); + retVal.uint64Value = le64toh(retVal.uint64Value); } - return retVal; + return retVal.doubleValue; } inline std::string FromProtoVisitor::ProtoKeyValue::valueAsString() const noexcept { @@ -9967,8 +10221,6 @@ inline std::size_t FromProtoVisitor::fromVarInt(std::istream &in, uint64_t &valu } } - // VarInt is little endian. - value = le64toh(value); return size; } } // namespace cluon @@ -11559,22 +11811,22 @@ inline std::string ToJSONVisitor::encodeBase64(const std::string &input) const n value = static_cast(input.at(index++)) << 16; value |= static_cast(input.at(index++)) << 8; value |= static_cast(input.at(index++)); - retVal += ALPHABET.at(value >> 18 & 63); + retVal += ALPHABET.at((value >> 18) & 63); retVal += ALPHABET.at((value >> 12) & 63); retVal += ALPHABET.at((value >> 6) & 63); - retVal += ALPHABET.at((value)&63); + retVal += ALPHABET.at(value & 63); length -= 3; } if (length == 2) { value = static_cast(input.at(index++)) << 16; value |= static_cast(input.at(index++)) << 8; - retVal += ALPHABET.at(value >> 18 & 63); + retVal += ALPHABET.at((value >> 18) & 63); retVal += ALPHABET.at((value >> 12) & 63); retVal += ALPHABET.at((value >> 6) & 63); retVal += "="; } else if (length == 1) { value = static_cast(input.at(index++)) << 16; - retVal += ALPHABET.at(value >> 18 & 63); + retVal += ALPHABET.at((value >> 18) & 63); retVal += ALPHABET.at((value >> 12) & 63); retVal += "=="; } @@ -12406,6 +12658,7 @@ inline void ToMsgPackVisitor::visit(uint32_t id, std::string &&typeName, std::st //#include "cluon/OD4Session.hpp" //#include "cluon/Envelope.hpp" //#include "cluon/FromProtoVisitor.hpp" +//#include "cluon/TerminateHandler.hpp" //#include "cluon/Time.hpp" #include @@ -12420,10 +12673,10 @@ inline OD4Session::OD4Session(uint16_t CID, std::function("225.0.0." + std::to_string(CID), 12175, - [this](std::string &&data, std::string &&from, std::chrono::system_clock::time_point &&timepoint) { - this->callback(std::move(data), std::move(from), std::move(timepoint)); - }); + m_receiver = std::make_unique( + "225.0.0." + std::to_string(CID), 12175, [this](std::string &&data, std::string &&from, std::chrono::system_clock::time_point &&timepoint) { + this->callback(std::move(data), std::move(from), std::move(timepoint)); + }); } inline void OD4Session::timeTrigger(float freq, std::function delegate) noexcept { @@ -12451,7 +12704,7 @@ inline void OD4Session::timeTrigger(float freq, std::function delegate) } else { std::cerr << "[cluon::OD4Session]: time-triggered delegate violated allocated time slice." << std::endl; } - } while (delegateIsRunning); + } while (delegateIsRunning && !TerminateHandler::instance().isTerminated.load()); } } @@ -12474,7 +12727,7 @@ inline bool OD4Session::dataTrigger(int32_t messageIdentifier, std::function #include @@ -12768,7 +13022,7 @@ inline std::string EnvelopeConverter::getJSONFromEnvelope(cluon::data::Envelope try { // Catch possible linb::any exception. gm.accept(payloadToJSON); - } catch (...) {} // LCOV_EXCL_LINE + } catch (const linb::bad_any_cast &) {} // LCOV_EXCL_LINE std::string tmp{payload.messageName()}; std::replace(tmp.begin(), tmp.end(), '.', '_'); @@ -12779,7 +13033,9 @@ inline std::string EnvelopeConverter::getJSONFromEnvelope(cluon::data::Envelope return retVal; } +// clang-format off inline std::string EnvelopeConverter::getProtoEncodedEnvelopeFromJSONWithoutTimeStamps(const std::string &json, int32_t messageIdentifier, uint32_t senderStamp) noexcept { + // clang-format on std::string retVal; if (0 < m_scopeOfMetaMessages.count(messageIdentifier)) { // Get specification for message to be created. @@ -12827,6 +13083,7 @@ inline std::string EnvelopeConverter::getProtoEncodedEnvelopeFromJSONWithoutTime * along with this program. If not, see . */ +// clang-format off //#include "cluon/Envelope.hpp" //#include "cluon/Player.hpp" @@ -12844,9 +13101,6 @@ inline std::string EnvelopeConverter::getProtoEncodedEnvelopeFromJSONWithoutTime namespace cluon { -inline IndexEntry::IndexEntry() noexcept : - IndexEntry(0, 0) {} - inline IndexEntry::IndexEntry(const int64_t &sampleTimeStamp, const uint64_t &filePosition) noexcept : m_sampleTimeStamp(sampleTimeStamp), m_filePosition(filePosition), @@ -12870,7 +13124,6 @@ inline Player::Player(const std::string &file, const bool &autoRewind, const boo m_firstTimePointReturningAEnvelope(), m_numberOfReturnedEnvelopesInTotal(0), m_delay(0), - m_correctedDelay(0), m_envelopeCacheFillingThreadIsRunningMutex(), m_envelopeCacheFillingThreadIsRunning(false), m_envelopeCacheFillingThread(), @@ -12958,11 +13211,10 @@ inline void Player::resetCaches() noexcept { try { std::lock_guard lck(m_indexMutex); m_delay = 0; - m_correctedDelay = 0; m_numberOfReturnedEnvelopesInTotal = 0; m_envelopeCache.clear(); } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE } inline void Player::resetIterators() noexcept { @@ -12976,7 +13228,7 @@ inline void Player::resetIterators() noexcept { // Invalidate iterator for erasing entries point. m_previousPreviousEnvelopeAlreadyReplayed = m_index.end(); } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE } inline void Player::computeInitialCacheLevelAndFillCache() noexcept { @@ -13019,7 +13271,7 @@ inline uint32_t Player::fillEnvelopeCache(const uint32_t &maxNumberOfEntriesToRe std::lock_guard lck(m_indexMutex); m_nextEntryToReadFromRecFile->second.m_available = m_envelopeCache.emplace(std::make_pair(m_nextEntryToReadFromRecFile->second.m_filePosition, retVal.second)).second; } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE m_nextEntryToReadFromRecFile++; entriesReadFromFile++; @@ -13054,7 +13306,7 @@ inline std::pair Player::getNextEnvelopeToBeReplaye cluon::data::Envelope &nextEnvelope = m_envelopeCache[m_currentEnvelopeToReplay->second.m_filePosition]; envelopeToReturn = nextEnvelope; - m_correctedDelay = m_delay = static_cast(m_currentEnvelopeToReplay->first - m_previousEnvelopeAlreadyReplayed->first); + m_delay = static_cast(m_currentEnvelopeToReplay->first - m_previousEnvelopeAlreadyReplayed->first); // TODO: Delegate deleting into own thread. if (m_previousPreviousEnvelopeAlreadyReplayed != m_index.end()) { @@ -13080,7 +13332,7 @@ inline std::pair Player::getNextEnvelopeToBeReplaye // Store sample time stamp as int64 to avoid unnecessary copying of Envelopes. hasEnvelopeToReturn = true; } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE } return std::make_pair(hasEnvelopeToReturn, envelopeToReturn); } @@ -13093,11 +13345,11 @@ inline void Player::checkAvailabilityOfNextEnvelopeToBeReplayed() noexcept { std::lock_guard lck(m_indexMutex); numberOfEntries = m_envelopeCache.size(); } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE } if (0 == numberOfEntries) { - using namespace std::chrono_literals; - std::this_thread::sleep_for(10ms); + using namespace std::chrono_literals; // LCOV_EXCL_LINE + std::this_thread::sleep_for(10ms); // LCOV_EXCL_LINE } } while (0 == numberOfEntries); @@ -13105,23 +13357,17 @@ inline void Player::checkAvailabilityOfNextEnvelopeToBeReplayed() noexcept { //////////////////////////////////////////////////////////////////////// -inline uint32_t Player::getTotalNumberOfEnvelopesInRecFile() const noexcept { +inline uint32_t Player::totalNumberOfEnvelopesInRecFile() const noexcept { std::lock_guard lck(m_indexMutex); return static_cast(m_index.size()); } -inline uint32_t Player::getDelay() const noexcept { +inline uint32_t Player::delay() const noexcept { std::lock_guard lck(m_indexMutex); // Make sure that delay is not exceeding the specified maximum delay. return std::min(m_delay, Player::MAX_DELAY_IN_MICROSECONDS); } -inline uint32_t Player::getCorrectedDelay() const noexcept { - std::lock_guard lck(m_indexMutex); - // Make sure that delay is not exceeding the specified maximum delay. - return std::min(m_correctedDelay, Player::MAX_DELAY_IN_MICROSECONDS); -} - inline void Player::rewind() noexcept { if (m_threading) { // Stop concurrent thread. @@ -13158,13 +13404,15 @@ inline void Player::seekTo(float ratio) noexcept { std::lock_guard lck(m_indexMutex); numberOfEntriesInIndex = static_cast(m_index.size()); } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE // Fast forward. m_numberOfReturnedEnvelopesInTotal = 0; std::clog << "[cluon::Player]: Seeking to " << static_cast(numberOfEntriesInIndex)*ratio << "/" << numberOfEntriesInIndex << std::endl; - for (m_numberOfReturnedEnvelopesInTotal = 0; m_numberOfReturnedEnvelopesInTotal < static_cast(static_cast(numberOfEntriesInIndex)*ratio)-1; m_numberOfReturnedEnvelopesInTotal++) { - m_currentEnvelopeToReplay++; + if (0 < ratio) { + for (m_numberOfReturnedEnvelopesInTotal = 0; m_numberOfReturnedEnvelopesInTotal < static_cast(static_cast(numberOfEntriesInIndex)*ratio)-1; m_numberOfReturnedEnvelopesInTotal++) { + m_currentEnvelopeToReplay++; + } } m_nextEntryToReadFromRecFile = m_previousEnvelopeAlreadyReplayed @@ -13174,8 +13422,10 @@ inline void Player::seekTo(float ratio) noexcept { m_envelopeCache.clear(); fillEnvelopeCache(static_cast(static_cast(m_desiredInitialLevel)*.3f)); - // Correct iterators. - getNextEnvelopeToBeReplayed(); + // Correct iterators if not at the beginning. + if ( (0 < ratio) && (ratio < 1) ) { + getNextEnvelopeToBeReplayed(); + } std::clog << "[cluon::Player]: Seeking done." << std::endl; if (enableThreading) { @@ -13221,7 +13471,7 @@ inline void Player::manageCache() noexcept { std::lock_guard lck(m_indexMutex); numberOfEntries = static_cast(m_envelopeCache.size()); } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE // Check if refilling of the cache is needed. refillMultiplicator = checkRefillingCache(numberOfEntries, refillMultiplicator); @@ -13241,7 +13491,7 @@ inline void Player::manageCache() noexcept { numberOfReturnedEnvelopesInTotal = m_numberOfReturnedEnvelopesInTotal; totalNumberOfEnvelopes = static_cast(m_index.size()); } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE try { std::lock_guard lck(m_playerListenerMutex); @@ -13253,7 +13503,7 @@ inline void Player::manageCache() noexcept { m_playerListener(ps); } } - catch (...) {} + catch (...) {} // LCOV_EXCL_LINE statisticsCounter = 0; } @@ -13273,6 +13523,434 @@ inline float Player::checkRefillingCache(const uint32_t &numberOfEntries, float } } + +// clang-format on +/* + * Copyright (C) 2017-2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +//#include "cluon/SharedMemory.hpp" + +// clang-format off +#ifdef WIN32 + #include +#else + #include + #include + #include + #include +#endif +// clang-format on + +#include +#include + +namespace cluon { + +inline SharedMemory::SharedMemory(const std::string &name, uint32_t size) noexcept + : m_size(size) { + if (!name.empty()) { +#ifdef WIN32 + constexpr int MAX_LENGTH_NAME{MAX_PATH}; +#else + constexpr int MAX_LENGTH_NAME{254}; +#endif + const std::string n{name.substr(0, (name.size() > MAX_LENGTH_NAME ? MAX_LENGTH_NAME : name.size()))}; + if ('/' != n[0]) { + m_name = "/"; + } + m_name += n; + if (m_name.size() > MAX_LENGTH_NAME) { + m_name = m_name.substr(0, MAX_LENGTH_NAME); + } + +#ifdef WIN32 + std::string mutexName = m_name; + if (mutexName.size() > MAX_LENGTH_NAME) { + mutexName = mutexName.substr(0, MAX_LENGTH_NAME - 6); + } + const std::string conditionEventName = mutexName + "_event"; + mutexName += "_mutex"; + + if (0 < size) { + // Create a shared memory area and semaphores. + const LONG MUTEX_INITIAL_COUNT = 1; + const LONG MUTEX_MAX_COUNT = 1; + const DWORD FLAGS = 0; // Reserved. + __mutex = CreateSemaphoreEx(NULL, MUTEX_INITIAL_COUNT, MUTEX_MAX_COUNT, mutexName.c_str(), FLAGS, SEMAPHORE_ALL_ACCESS); + if (nullptr != __mutex) { + __conditionEvent = CreateEvent( + NULL /*use default security*/, TRUE /*manually resetting event*/, FALSE /*initial state is not signaled*/, conditionEventName.c_str()); + if (nullptr != __conditionEvent) { + __sharedMemory = CreateFileMapping(INVALID_HANDLE_VALUE /*use paging file*/, + NULL /*use default security*/, + PAGE_READWRITE, + 0, + m_size + sizeof(uint32_t) /*size + size-information (uint32_t)*/, + m_name.c_str()); + if (nullptr != __sharedMemory) { + m_sharedMemory = (char *)MapViewOfFile(__sharedMemory, FILE_MAP_ALL_ACCESS, 0, 0, m_size + sizeof(uint32_t)); + if (nullptr != m_sharedMemory) { + // Provide size information at the beginning of the shared memory. + *(uint32_t *)m_sharedMemory = m_size; + m_userAccessibleSharedMemory = m_sharedMemory + sizeof(uint32_t); + } else { + std::cerr << "[cluon::SharedMemory] Failed to map shared memory '" << m_name << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__sharedMemory); + __sharedMemory = nullptr; + + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to request shared memory '" << m_name << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to request event '" << conditionEventName << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to create mutex '" << mutexName << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + // Open a shared memory area and semaphores. + m_hasOnlyAttachedToSharedMemory = true; + const BOOL INHERIT_HANDLE = FALSE; + __mutex = OpenSemaphore(SEMAPHORE_ALL_ACCESS, INHERIT_HANDLE, mutexName.c_str()); + if (nullptr != __mutex) { + __conditionEvent = OpenEvent(EVENT_ALL_ACCESS, FALSE /*do not inherit the name*/, conditionEventName.c_str()); + if (nullptr != __conditionEvent) { + __sharedMemory = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE /*do not inherit the name*/, m_name.c_str()); + if (nullptr != __sharedMemory) { + // Firstly, map only for the size of a uint32_t to read the entire size. + m_sharedMemory = (char *)MapViewOfFile(__sharedMemory, FILE_MAP_ALL_ACCESS, 0, 0, sizeof(uint32_t)); + if (nullptr != m_sharedMemory) { + // Now, read the real size... + m_size = *(uint32_t *)m_sharedMemory; + // ..unmap and re-map. + UnmapViewOfFile(m_sharedMemory); + m_sharedMemory = (char *)MapViewOfFile(__sharedMemory, FILE_MAP_ALL_ACCESS, 0, 0, m_size + sizeof(uint32_t)); + if (nullptr != m_sharedMemory) { + m_userAccessibleSharedMemory = m_sharedMemory + sizeof(uint32_t); + } else { + std::cerr << "[cluon::SharedMemory] Failed to finally map shared memory '" << m_name << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__sharedMemory); + __sharedMemory = nullptr; + + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to temporarily map shared memory '" << m_name << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__sharedMemory); + __sharedMemory = nullptr; + + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to open shared memory '" << m_name << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to open event '" << conditionEventName << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__conditionEvent); + __conditionEvent = nullptr; + + CloseHandle(__mutex); + __mutex = nullptr; + } + } else { + std::cerr << "[cluon::SharedMemory] Failed to open mutex '" << mutexName << "': " + << " (" << GetLastError() << ")" << std::endl; + CloseHandle(__mutex); + __mutex = nullptr; + } + } +#endif + +#ifndef WIN32 + // If size is greater than 0, the caller wants to create a new shared + // memory area. Otherwise, the caller wants to open an existing shared memory. + int flags = O_RDWR; + if (0 < size) { + flags |= O_CREAT | O_EXCL; + } + + m_fd = ::shm_open(m_name.c_str(), flags, S_IRUSR | S_IWUSR); + if (-1 == m_fd) { + std::cerr << "[cluon::SharedMemory] Failed to open shared memory '" << m_name << "': " << ::strerror(errno) << " (" << errno << ")" << std::endl; + // Try to remove existing shared memory segment and try again. + if ((flags & O_CREAT) == O_CREAT) { + std::clog << "[cluon::SharedMemory] Trying to remove existing shared memory '" << m_name << "' and trying again... "; + if (0 == ::shm_unlink(m_name.c_str())) { + m_fd = ::shm_open(m_name.c_str(), flags, S_IRUSR | S_IWUSR); + } + + if (-1 == m_fd) { + std::cerr << "failed: " << ::strerror(errno) << " (" << errno << ")" << std::endl; // LCOV_EXCL_LINE + } else { + std::cerr << "succeeded." << std::endl; + } + } + } + + if (-1 != m_fd) { + bool retVal{true}; + + // When creating a shared memory segment, truncate it. + if (0 < m_size) { + retVal = (0 == ::ftruncate(m_fd, static_cast(sizeof(SharedMemoryHeader) + m_size))); + if (!retVal) { + std::cerr << "[cluon::SharedMemory] Failed to truncate '" << m_name << "': " << ::strerror(errno) << " (" << errno << ")" // LCOV_EXCL_LINE + << std::endl; // LCOV_EXCL_LINE + } + } + + // Accessing shared memory segment. + if (retVal) { + // On opening (i.e., NOT creating) a shared memory segment, m_size is still 0 and we need to figure out the size first. + m_sharedMemory = static_cast(::mmap(0, sizeof(SharedMemoryHeader) + m_size, PROT_READ | PROT_WRITE, MAP_SHARED, m_fd, 0)); + if (MAP_FAILED != m_sharedMemory) { + m_sharedMemoryHeader = reinterpret_cast(m_sharedMemory); + + // On creating (i.e., NOT opening) a shared memory segment, setup the shared memory header. + if (0 < m_size) { + // Store user accessible size in shared memory. + m_sharedMemoryHeader->__size = m_size; + + // Create process-shared mutex (fastest approach, cf. Stevens & Rago: "Advanced Programming in the UNIX (R) Environment"). + pthread_mutexattr_t mutexAttribute; + ::pthread_mutexattr_init(&mutexAttribute); + ::pthread_mutexattr_setpshared(&mutexAttribute, PTHREAD_PROCESS_SHARED); // Share between unrelated processes. +#ifndef __APPLE__ + ::pthread_mutexattr_setrobust(&mutexAttribute, PTHREAD_MUTEX_ROBUST); // Allow continuation of other processes waiting for this mutex + // when the currently holding process unexpectedly terminates. +#endif + ::pthread_mutexattr_settype(&mutexAttribute, PTHREAD_MUTEX_NORMAL); // Using regular mutex with deadlock behavior. + ::pthread_mutex_init(&(m_sharedMemoryHeader->__mutex), &mutexAttribute); + ::pthread_mutexattr_destroy(&mutexAttribute); + + // Create shared condition. + pthread_condattr_t conditionAttribute; + ::pthread_condattr_init(&conditionAttribute); +#ifndef __APPLE__ + ::pthread_condattr_setclock(&conditionAttribute, CLOCK_MONOTONIC); // Use realtime clock for timed waits with non-negative jumps. +#endif + ::pthread_condattr_setpshared(&conditionAttribute, PTHREAD_PROCESS_SHARED); // Share between unrelated processes. + ::pthread_cond_init(&(m_sharedMemoryHeader->__condition), &conditionAttribute); + ::pthread_condattr_destroy(&conditionAttribute); + } else { + // Indicate that this instance is attaching to an existing shared memory segment. + m_hasOnlyAttachedToSharedMemory = true; + + // Read size as we are attaching to an existing shared memory. + m_size = m_sharedMemoryHeader->__size; + + // Now, as we know the real size, unmap the first mapping that did not know the size. + if (::munmap(m_sharedMemory, sizeof(SharedMemoryHeader))) { + std::cerr << "[cluon::SharedMemory] Failed to unmap shared memory: " << ::strerror(errno) << " (" << errno << ")" // LCOV_EXCL_LINE + << std::endl; // LCOV_EXCL_LINE + } + + // Invalidate all pointers. + m_sharedMemory = nullptr; + m_sharedMemoryHeader = nullptr; + + // Re-map with the correct size parameter. + m_sharedMemory = static_cast(::mmap(0, sizeof(SharedMemoryHeader) + m_size, PROT_READ | PROT_WRITE, MAP_SHARED, m_fd, 0)); + if (MAP_FAILED != m_sharedMemory) { + m_sharedMemoryHeader = reinterpret_cast(m_sharedMemory); + } + } + } else { // LCOV_EXCL_LINE + std::cerr << "[cluon::SharedMemory] Failed to map '" << m_name << "': " << ::strerror(errno) << " (" << errno << ")" // LCOV_EXCL_LINE + << std::endl; // LCOV_EXCL_LINE + } + + // If the shared memory segment is correctly available, store the pointer for the user data. + if (MAP_FAILED != m_sharedMemory) { + m_userAccessibleSharedMemory = m_sharedMemory + sizeof(SharedMemoryHeader); + + // Lock the shared memory into RAM for performance reasons. + if (-1 == ::mlock(m_sharedMemory, sizeof(SharedMemoryHeader) + m_size)) { + std::cerr << "[cluon::SharedMemory] Failed to mlock shared memory: " // LCOV_EXCL_LINE + << ::strerror(errno) << " (" << errno << ")" << std::endl; // LCOV_EXCL_LINE + } + } + } else { // LCOV_EXCL_LINE + if (-1 != m_fd) { // LCOV_EXCL_LINE + if (-1 == ::shm_unlink(m_name.c_str())) { // LCOV_EXCL_LINE + std::cerr << "[cluon::SharedMemory] Failed to unlink shared memory: " << ::strerror(errno) << " (" << errno << ")" // LCOV_EXCL_LINE + << std::endl; // LCOV_EXCL_LINE + } + } + m_fd = -1; // LCOV_EXCL_LINE + } + } +#endif + } +} + +inline SharedMemory::~SharedMemory() noexcept { +#ifdef WIN32 + if (nullptr != __conditionEvent) { + SetEvent(__conditionEvent); + CloseHandle(__conditionEvent); + } + if (nullptr != __mutex) { + unlock(); + CloseHandle(__mutex); + } + if (nullptr != m_sharedMemory) { + UnmapViewOfFile(m_sharedMemory); + } + if (nullptr != __sharedMemory) { + CloseHandle(__sharedMemory); + } +#else + if ((nullptr != m_sharedMemoryHeader) && (!m_hasOnlyAttachedToSharedMemory)) { + // Wake any waiting threads as we are going to end the shared memory session. + ::pthread_cond_broadcast(&(m_sharedMemoryHeader->__condition)); + ::pthread_cond_destroy(&(m_sharedMemoryHeader->__condition)); + ::pthread_mutex_destroy(&(m_sharedMemoryHeader->__mutex)); + } + if ((nullptr != m_sharedMemory) && ::munmap(m_sharedMemory, sizeof(SharedMemoryHeader) + m_size)) { + std::cerr << "[cluon::SharedMemory] Failed to unmap shared memory: " << ::strerror(errno) << " (" << errno << ")" << std::endl; // LCOV_EXCL_LINE + } + if (!m_hasOnlyAttachedToSharedMemory && (-1 != m_fd) && (-1 == ::shm_unlink(m_name.c_str()) && (ENOENT != errno))) { + std::cerr << "[cluon::SharedMemory] Failed to unlink shared memory: " << ::strerror(errno) << " (" << errno << ")" << std::endl; // LCOV_EXCL_LINE + } +#endif +} + +inline void SharedMemory::lock() noexcept { +#ifdef WIN32 + if (nullptr != __mutex) { + WaitForSingleObject(__mutex, INFINITE); + } +#else + if (nullptr != m_sharedMemoryHeader) { + if (EOWNERDEAD == ::pthread_mutex_lock(&(m_sharedMemoryHeader->__mutex))) { + std::cerr << "[cluon::SharedMemory] pthread_mutex_lock returned for EOWNERDEAD for mutex in shared memory '" << m_name // LCOV_EXCL_LINE + << "': " << ::strerror(errno) // LCOV_EXCL_LINE + << " (" << errno << ")" << std::endl; // LCOV_EXCL_LINE + } + } +#endif +} + +inline void SharedMemory::unlock() noexcept { +#ifdef WIN32 + if (nullptr != __mutex) { + const LONG RELEASE_COUNT = 1; + ReleaseSemaphore(__mutex, RELEASE_COUNT, 0); + } +#else + if (nullptr != m_sharedMemoryHeader) { + ::pthread_mutex_unlock(&(m_sharedMemoryHeader->__mutex)); + } +#endif +} + +inline void SharedMemory::wait() noexcept { +#ifdef WIN32 + if (nullptr != __conditionEvent) { + WaitForSingleObject(__conditionEvent, INFINITE); + } +#else + if (nullptr != m_sharedMemoryHeader) { + lock(); + ::pthread_cond_wait(&(m_sharedMemoryHeader->__condition), &(m_sharedMemoryHeader->__mutex)); + unlock(); + } +#endif +} + +inline void SharedMemory::notifyAll() noexcept { +#ifdef WIN32 + if (nullptr != __conditionEvent) { + SetEvent(__conditionEvent); + ResetEvent(__conditionEvent); + } +#else + if (nullptr != m_sharedMemoryHeader) { + ::pthread_cond_broadcast(&(m_sharedMemoryHeader->__condition)); + } +#endif +} + +inline char *SharedMemory::data() noexcept { + return m_userAccessibleSharedMemory; +} + +inline uint32_t SharedMemory::size() const noexcept { + return m_size; +} + +inline const std::string SharedMemory::name() const noexcept { + return m_name; +} + +inline bool SharedMemory::valid() noexcept { + bool valid{true}; +#ifndef WIN32 + valid &= (-1 != m_fd); +#endif + valid &= (nullptr != m_sharedMemory); +#ifndef WIN32 + valid &= (MAP_FAILED != m_sharedMemory); +#endif + valid &= (0 < m_size); + return valid; +} + +} // namespace cluon #endif #ifdef HAVE_CLUON_MSC /* @@ -14905,6 +15583,9 @@ void MetaMessageToProtoTransformator::visit(const MetaMessage &mm) noexcept { * along with this program. If not, see . */ +#ifndef CLUON_MSC_HPP +#define CLUON_MSC_HPP + //#include "cluon/MessageParser.hpp" //#include "cluon/MetaMessage.hpp" //#include "cluon/MetaMessageToCPPTransformator.hpp" @@ -14917,7 +15598,7 @@ void MetaMessageToProtoTransformator::visit(const MetaMessage &mm) noexcept { #include #include -int main(int argc, char **argv) { +inline int32_t cluon_msc(int32_t argc, char **argv) { const std::string PROGRAM{argv[0]}; // NOLINT argh::parser commandline(argc, argv); @@ -14990,8 +15671,8 @@ int main(int argc, char **argv) { outputFile << content << std::endl; outputFile.close(); } - else { - std::cout << content << std::endl; + else { // LCOV_EXCL_LINE + std::cout << content << std::endl; // LCOV_EXCL_LINE } } } @@ -15001,6 +15682,35 @@ int main(int argc, char **argv) { return retVal; } + +#endif +/* + * Copyright (C) 2017-2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +// This test for a compiler definition is necessary to preserve single-file, header-only compability. +#ifndef HAVE_CLUON_MSC +#include "cluon-msc.hpp" +#endif + +#include + +int32_t main(int32_t argc, char **argv) { + return cluon_msc(argc, argv); +} #endif #ifdef HAVE_CLUON_REPLAY /* @@ -15020,6 +15730,9 @@ int main(int argc, char **argv) { * along with this program. If not, see . */ +#ifndef CLUON_REPLAY_HPP +#define CLUON_REPLAY_HPP + //#include "cluon/cluon.hpp" //#include "cluon/Envelope.hpp" //#include "cluon/OD4Session.hpp" @@ -15035,7 +15748,7 @@ int main(int argc, char **argv) { #include #include -int main(int argc, char **argv) { +inline int32_t cluon_replay(int32_t argc, char **argv, bool monitorSTDIN) { int32_t retCode{0}; const std::string PROGRAM{argv[0]}; // NOLINT auto commandlineArguments = cluon::getCommandlineArguments(argc, argv); @@ -15064,21 +15777,23 @@ int main(int argc, char **argv) { std::atomic playCommandUpdate{false}; std::mutex playerCommandMutex; cluon::data::PlayerCommand playerCommand; - std::thread t([&playCommandUpdate, &playerCommandMutex, &playerCommand](){ - while (std::cin.good()) { - auto tmp{cluon::extractEnvelope(std::cin)}; - if (tmp.first) { - if (tmp.second.dataType() == cluon::data::PlayerCommand::ID()) { - cluon::data::PlayerCommand pc = cluon::extractMessage(std::move(tmp.second)); - { - std::lock_guard lck(playerCommandMutex); - playerCommand = pc; + if (monitorSTDIN) { + std::thread t([&playCommandUpdate, &playerCommandMutex, &playerCommand](){ + while (std::cin.good()) { + auto tmp{cluon::extractEnvelope(std::cin)}; + if (tmp.first) { + if (tmp.second.dataType() == cluon::data::PlayerCommand::ID()) { + cluon::data::PlayerCommand pc = cluon::extractMessage(std::move(tmp.second)); + { + std::lock_guard lck(playerCommandMutex); + playerCommand = pc; + } + playCommandUpdate = true; } - playCommandUpdate = true; } } - } - }); + }); + } // Listen for PlayerStatus updates. std::atomic playerStatusUpdate{false}; @@ -15130,7 +15845,7 @@ int main(int argc, char **argv) { { std::string s; - playerStatus.numberOfEntries(player.getTotalNumberOfEnvelopesInRecFile()); + playerStatus.numberOfEntries(player.totalNumberOfEnvelopesInRecFile()); playerStatus.state(2); // playback file { std::lock_guard lck(playerStatusMutex); @@ -15204,20 +15919,53 @@ int main(int argc, char **argv) { std::cout << cluon::serializeEnvelope(std::move(next.second)); std::cout.flush(); } - std::this_thread::sleep_for(std::chrono::duration(player.getDelay())); + std::this_thread::sleep_for(std::chrono::duration(player.delay())); } } - else { - std::this_thread::sleep_for(std::chrono::duration(100)); + else { // LCOV_EXCL_LINE + std::this_thread::sleep_for(std::chrono::duration(100)); // LCOV_EXCL_LINE } } + retCode = 0; } else { std::cerr << PROGRAM << ": file '" << recFile << "' not found." << std::endl; + retCode = 1; } } return retCode; } + +#endif + +/* + * Copyright (C) 2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +// This test for a compiler definition is necessary to preserve single-file, header-only compability. +#ifndef HAVE_CLUON_REPLAY +#include "cluon-replay.hpp" +#endif + +#include + +int32_t main(int32_t argc, char **argv) { + constexpr bool monitorSTDIN{true}; + return cluon_replay(argc, argv, monitorSTDIN); +} #endif #ifdef HAVE_CLUON_LIVEFEED /* @@ -15237,12 +15985,16 @@ int main(int argc, char **argv) { * along with this program. If not, see . */ +#ifndef CLUON_LIVEFEED_HPP +#define CLUON_LIVEFEED_HPP + //#include "cluon/cluon.hpp" //#include "cluon/MetaMessage.hpp" //#include "cluon/MessageParser.hpp" //#include "cluon/OD4Session.hpp" #include +#include #include #include #include @@ -15259,19 +16011,15 @@ enum Color { DEFAULT = 39, }; -void clearScreen(); -void writeText(Color c, uint8_t y, uint8_t x, const std::string &text); -std::string formatTimeStamp(const cluon::data::TimeStamp &ts); - -void clearScreen() { +inline void clearScreen() { std::cout << "\033[2J" << std::endl; } -void writeText(Color c, uint8_t y, uint8_t x, const std::string &text) { +inline void writeText(Color c, uint8_t y, uint8_t x, const std::string &text) { std::cout << "\033[" << +y << ";" << +x << "H" << "\033[0;" << +c << "m" << text << "\033[0m" << std::endl; } -std::string formatTimeStamp(const cluon::data::TimeStamp &ts) { +inline std::string formatTimeStamp(const cluon::data::TimeStamp &ts) { std::time_t temp = static_cast(ts.seconds()); std::tm* t = std::gmtime(&temp); std::stringstream sstr; @@ -15280,7 +16028,7 @@ std::string formatTimeStamp(const cluon::data::TimeStamp &ts) { return str; } -int main(int argc, char **argv) { +inline int32_t cluon_livefeed(int32_t argc, char **argv) { int retVal{1}; const std::string PROGRAM{argv[0]}; // NOLINT auto commandlineArguments = cluon::getCommandlineArguments(argc, argv); @@ -15355,15 +16103,47 @@ int main(int argc, char **argv) { } }); - using namespace std::literals::chrono_literals; // NOLINT - while (od4Session.isRunning()) { - std::this_thread::sleep_for(1s); - } + if (od4Session.isRunning()) { + using namespace std::literals::chrono_literals; // NOLINT + while (od4Session.isRunning()) { + std::this_thread::sleep_for(1s); + } - retVal = 0; + retVal = 0; + } } return retVal; } + +#endif + +/* + * Copyright (C) 2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +// This test for a compiler definition is necessary to preserve single-file, header-only compability. +#ifndef HAVE_CLUON_LIVEFEED +#include "cluon-livefeed.hpp" +#endif + +#include + +int32_t main(int32_t argc, char **argv) { + return cluon_livefeed(argc, argv); +} #endif #ifdef HAVE_CLUON_REC2CSV /* @@ -15383,6 +16163,9 @@ int main(int argc, char **argv) { * along with this program. If not, see . */ +#ifndef CLUON_REC2CSV_HPP +#define CLUON_REC2CSV_HPP + //#include "cluon/cluon.hpp" //#include "cluon/GenericMessage.hpp" //#include "cluon/MessageParser.hpp" @@ -15400,7 +16183,7 @@ int main(int argc, char **argv) { #include #include -int32_t main(int32_t argc, char **argv) { +inline int32_t cluon_rec2csv(int32_t argc, char **argv) { int32_t retCode{0}; auto commandlineArguments = cluon::getCommandlineArguments(argc, argv); if ( (0 == commandlineArguments.count("rec")) || (0 == commandlineArguments.count("odvd")) ) { @@ -15425,7 +16208,7 @@ int32_t main(int32_t argc, char **argv) { } else { std::cerr << argv[0] << ": Message specification '" << commandlineArguments["odvd"] << "' not found." << std::endl; - return retCode; + return retCode = 1; } } @@ -15448,7 +16231,7 @@ int32_t main(int32_t argc, char **argv) { if (next.first) { { envelopeCounter++; - const int32_t percentage = static_cast((static_cast(envelopeCounter)*100.0f)/static_cast(player.getTotalNumberOfEnvelopesInRecFile())); + const int32_t percentage = static_cast((static_cast(envelopeCounter)*100.0f)/static_cast(player.totalNumberOfEnvelopesInRecFile())); if ( (percentage % 5 == 0) && (percentage != oldPercentage) ) { std::cerr << argv[0] << ": Processed " << percentage << "%." << std::endl; oldPercentage = percentage; @@ -15520,8 +16303,38 @@ int32_t main(int32_t argc, char **argv) { } else { std::cerr << argv[0] << ": Recording '" << commandlineArguments["rec"] << "' not found." << std::endl; + retCode = 1; } } return retCode; } + +#endif +/* + * Copyright (C) 2018 Christian Berger + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +// This test for a compiler definition is necessary to preserve single-file, header-only compability. +#ifndef HAVE_CLUON_REC2CSV +#include "cluon-rec2csv.hpp" +#endif + +#include + +int32_t main(int32_t argc, char **argv) { + return cluon_rec2csv(argc, argv); +} #endif