Skip to content

Commit

Permalink
Extracted impl from Seqlock.
Browse files Browse the repository at this point in the history
  • Loading branch information
nolan-veed committed Dec 27, 2023
1 parent b98c7ab commit a935aa0
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 95 deletions.
107 changes: 12 additions & 95 deletions src/internal_modules/roc_core/seqlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#ifndef ROC_CORE_SEQLOCK_H_
#define ROC_CORE_SEQLOCK_H_

#include "roc_core/atomic_ops.h"
#include "roc_core/cpu_instructions.h"
#include "roc_core/noncopyable.h"
#include "roc_core/seqlock_impl.h"
#include "roc_core/stddefs.h"

namespace roc {
Expand Down Expand Up @@ -45,13 +44,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! Initialize with given value.
explicit Seqlock(T value)
: val_(value)
, ver_(0) {
, impl_(&val_, sizeof(val_)) {
}

//! Load value version.
//! Wait-free.
inline seqlock_version_t version() const {
return load_version_();
return impl_.version();
}

//! Store value.
Expand All @@ -63,13 +62,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! updated value or fail (if changes are not fully published yet).
inline bool try_store(const T& value) {
seqlock_version_t ver;
return try_store_(value, ver);
return impl_.try_store(&value, ver);
}

//! Store value.
//! Like try_store(), but also returns updated version.
inline bool try_store_ver(const T& value, seqlock_version_t& ver) {
return try_store_(value, ver);
return impl_.try_store(&value, ver);
}

//! Store value.
Expand All @@ -81,13 +80,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! updated value or fail (if changes are not fully published yet).
inline void exclusive_store(const T& value) {
seqlock_version_t ver;
exclusive_store_(value, ver);
impl_.exclusive_store(&value, ver);
}

//! Store value.
//! Like exclusive_store(), but also returns updated version.
inline void exclusive_store_ver(const T& value, seqlock_version_t& ver) {
exclusive_store_(value, ver);
impl_.exclusive_store(&value, ver);
}

//! Try to load value.
Expand All @@ -97,13 +96,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! and never spins.
inline bool try_load(T& value) const {
seqlock_version_t ver;
return try_load_repeat_(value, ver);
return impl_.try_load_repeat(&value, ver);
}

//! Try to load value and version.
//! Like try_load(), but also returns version.
inline bool try_load_ver(T& value, seqlock_version_t& ver) const {
return try_load_repeat_(value, ver);
return impl_.try_load_repeat(&value, ver);
}

//! Load value.
Expand All @@ -112,101 +111,19 @@ template <class T> class Seqlock : public NonCopyable<> {
inline T wait_load() const {
T value;
seqlock_version_t ver;
wait_load_(value, ver);
impl_.wait_load(&value, ver);
return value;
}

//! Load value and version.
//! Like wait_load(), but also returns version.
inline void wait_load_ver(T& value, seqlock_version_t& ver) const {
wait_load_(value, ver);
impl_.wait_load(&value, ver);
}

private:
inline seqlock_version_t load_version_() const {
return AtomicOps::load_seq_cst(ver_);
}

inline void exclusive_store_(const T& value, seqlock_version_t& ver) {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::store_relaxed(ver_, ver0 + 1);
AtomicOps::fence_release();

volatile_copy_(val_, value);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);
}

inline bool try_store_(const T& value, seqlock_version_t& ver) {
seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
if (ver0 & 1) {
return false;
}

if (!AtomicOps::compare_exchange_relaxed(ver_, ver0, ver0 + 1)) {
return false;
}
AtomicOps::fence_release();

volatile_copy_(val_, value);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);

return true;
}

inline void wait_load_(T& value, seqlock_version_t& ver) const {
while (!try_load_(value, ver)) {
cpu_relax();
}
}

// If the concurrent store is running and is not sleeping, retrying 3 times
// should be enough to succeed. This may fail if the concurrent store was
// preempted in the middle, of if there are multiple concurrent stores.
inline bool try_load_repeat_(T& value, seqlock_version_t& ver) const {
if (try_load_(value, ver)) {
return true;
}
if (try_load_(value, ver)) {
return true;
}
if (try_load_(value, ver)) {
return true;
}
return false;
}

inline bool try_load_(T& value, seqlock_version_t& ver) const {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::fence_seq_cst();

volatile_copy_(value, val_);
AtomicOps::fence_acquire();

ver = AtomicOps::load_relaxed(ver_);
return ((ver0 & 1) == 0 && ver0 == ver);
}

// We use hand-rolled loop instead of memcpy() or default (trivial) copy constructor
// to be sure that the copying will be covered by our memory fences. On some
// platforms, memcpy() and copy constructor may be implemented using streaming
// instructions which may ignore memory fences.
static void volatile_copy_(volatile T& dst, const volatile T& src) {
volatile char* dst_ptr = reinterpret_cast<volatile char*>(&dst);
const volatile char* src_ptr = reinterpret_cast<const volatile char*>(&src);

for (size_t n = 0; n < sizeof(T); n++) {
dst_ptr[n] = src_ptr[n];
}
}

T val_;
seqlock_version_t ver_;
SeqlockImpl impl_;
};

} // namespace core
Expand Down
108 changes: 108 additions & 0 deletions src/internal_modules/roc_core/seqlock_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2020 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "roc_core/seqlock_impl.h"

#include "roc_core/atomic_ops.h"
#include "roc_core/cpu_instructions.h"

namespace roc {
namespace core {

SeqlockImpl::SeqlockImpl(void* val, size_t val_size)
: val_(val)
, val_size_(val_size)
, ver_(0) {
}

seqlock_version_t SeqlockImpl::version() const {
return AtomicOps::load_seq_cst(ver_);
}

bool SeqlockImpl::try_store(const void* value, seqlock_version_t& ver) {
seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
if (ver0 & 1) {
return false;
}

if (!AtomicOps::compare_exchange_relaxed(ver_, ver0, ver0 + 1)) {
return false;
}
AtomicOps::fence_release();

volatile_copy_(val_, value, val_size_);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);

return true;
}

void SeqlockImpl::exclusive_store(const void* value, seqlock_version_t& ver) {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::store_relaxed(ver_, ver0 + 1);
AtomicOps::fence_release();

volatile_copy_(val_, value, val_size_);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);
}

// If the concurrent store is running and is not sleeping, retrying 3 times
// should be enough to succeed. This may fail if the concurrent store was
// preempted in the middle, of if there are multiple concurrent stores.
bool SeqlockImpl::try_load_repeat(void* value, seqlock_version_t& ver) const {
if (try_load_(value, ver)) {
return true;
}
if (try_load_(value, ver)) {
return true;
}
if (try_load_(value, ver)) {
return true;
}
return false;
}

void SeqlockImpl::wait_load(void* value, seqlock_version_t& ver) const {
while (!try_load_(value, ver)) {
cpu_relax();
}
}

// We use hand-rolled loop instead of memcpy() or default (trivial) copy constructor
// to be sure that the copying will be covered by our memory fences. On some
// platforms, memcpy() and copy constructor may be implemented using streaming
// instructions which may ignore memory fences.
void SeqlockImpl::volatile_copy_(volatile void* dst,
const volatile void* src,
size_t val_size) {
volatile char* dst_ptr = reinterpret_cast<volatile char*>(&dst);
const volatile char* src_ptr = reinterpret_cast<const volatile char*>(&src);

for (size_t n = 0; n < val_size; n++) {
dst_ptr[n] = src_ptr[n];
}
}

bool SeqlockImpl::try_load_(void* value, seqlock_version_t& ver) const {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::fence_seq_cst();

volatile_copy_(value, val_, val_size_);
AtomicOps::fence_acquire();

ver = AtomicOps::load_relaxed(ver_);
return ((ver0 & 1) == 0 && ver0 == ver);
}

} // namespace core
} // namespace roc
57 changes: 57 additions & 0 deletions src/internal_modules/roc_core/seqlock_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2020 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/seqlock_impl.h
//! @brief Seqlock implementation.

#ifndef ROC_CORE_SEQLOCK_IMPL_H_
#define ROC_CORE_SEQLOCK_IMPL_H_

#include "roc_core/stddefs.h"

namespace roc {
namespace core {

typedef uint32_t seqlock_version_t;

//! Seqlock implementation.
class SeqlockImpl {
public:
//! Initialize.
SeqlockImpl(void* val, size_t val_size);

//! Load value version.
seqlock_version_t version() const;

//! Try to store value.
bool try_store(const void* value, seqlock_version_t& ver);

//! Store value.
void exclusive_store(const void* value, seqlock_version_t& ver);

//! Try to load value and version.
bool try_load_repeat(void* value, seqlock_version_t& ver) const;

//! Load value and version.
void wait_load(void* value, seqlock_version_t& ver) const;

private:
static void
volatile_copy_(volatile void* dst, const volatile void* src, size_t val_size);

bool try_load_(void* value, seqlock_version_t& ver) const;

void* val_;
size_t val_size_;
seqlock_version_t ver_;
};

} // namespace core
} // namespace roc

#endif // ROC_CORE_SEQLOCK_IMPL_H_

0 comments on commit a935aa0

Please sign in to comment.