diff --git a/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp b/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp index ff3becb638..fc46a5d7ca 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-behavior.hpp @@ -21,7 +21,7 @@ class behavior_observer : public detail::multicast_observer class behavior_observer_state : public std::enable_shared_from_this { - mutable std::mutex lock; + mutable std::recursive_mutex lock; mutable T value; public: @@ -31,13 +31,19 @@ class behavior_observer : public detail::multicast_observer } void reset(T v) const { - std::unique_lock guard(lock); + std::lock_guard guard(lock); value = std::move(v); } + T get() const { - std::unique_lock guard(lock); + std::lock_guard guard(lock); return value; } + + std::recursive_mutex& get_lock() const + { + return lock; + } }; std::shared_ptr state; @@ -62,6 +68,11 @@ class behavior_observer : public detail::multicast_observer state->reset(v); base_type::on_next(std::move(v)); } + + std::recursive_mutex& get_state_lock() const + { + return state->get_lock(); + } }; } @@ -92,10 +103,11 @@ class behavior observable get_observable() const { auto keepAlive = s; return make_observable_dynamic([=](subscriber o){ + std::lock_guard guard(keepAlive.get_state_lock()); if (keepAlive.get_subscription().is_subscribed()) { - o.on_next(get_value()); + o.on_next(keepAlive.get_value()); } - keepAlive.add(s.get_subscriber(), std::move(o)); + keepAlive.add(keepAlive.get_subscriber(), std::move(o)); }); } };