Skip to content

Commit

Permalink
gh-599 Seqlock implementation extract
Browse files Browse the repository at this point in the history
  • Loading branch information
nolan-veed authored and gavv committed Dec 28, 2023
1 parent 9b159e8 commit 49137bc
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 96 deletions.
108 changes: 12 additions & 96 deletions src/internal_modules/roc_core/seqlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
#ifndef ROC_CORE_SEQLOCK_H_
#define ROC_CORE_SEQLOCK_H_

#include "roc_core/atomic_ops.h"
#include "roc_core/cpu_instructions.h"
#include "roc_core/noncopyable.h"
#include "roc_core/seqlock_impl.h"
#include "roc_core/stddefs.h"

namespace roc {
Expand Down Expand Up @@ -44,14 +43,13 @@ template <class T> class Seqlock : public NonCopyable<> {
public:
//! Initialize with given value.
explicit Seqlock(T value)
: val_(value)
, ver_(0) {
: val_(value) {
}

//! Load value version.
//! Wait-free.
inline seqlock_version_t version() const {
return load_version_();
return impl_.version();
}

//! Store value.
Expand All @@ -63,13 +61,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! updated value or fail (if changes are not fully published yet).
inline bool try_store(const T& value) {
seqlock_version_t ver;
return try_store_(value, ver);
return impl_.try_store(ver, &val_, sizeof(val_), &value);
}

//! Store value.
//! Like try_store(), but also returns updated version.
inline bool try_store_ver(const T& value, seqlock_version_t& ver) {
return try_store_(value, ver);
return impl_.try_store(ver, &val_, sizeof(val_), &value);
}

//! Store value.
Expand All @@ -81,13 +79,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! updated value or fail (if changes are not fully published yet).
inline void exclusive_store(const T& value) {
seqlock_version_t ver;
exclusive_store_(value, ver);
impl_.exclusive_store(ver, &val_, sizeof(val_), &value);
}

//! Store value.
//! Like exclusive_store(), but also returns updated version.
inline void exclusive_store_ver(const T& value, seqlock_version_t& ver) {
exclusive_store_(value, ver);
impl_.exclusive_store(ver, &val_, sizeof(val_), &value);
}

//! Try to load value.
Expand All @@ -97,13 +95,13 @@ template <class T> class Seqlock : public NonCopyable<> {
//! and never spins.
inline bool try_load(T& value) const {
seqlock_version_t ver;
return try_load_repeat_(value, ver);
return impl_.try_load_repeat(ver, &val_, sizeof(val_), &value);
}

//! Try to load value and version.
//! Like try_load(), but also returns version.
inline bool try_load_ver(T& value, seqlock_version_t& ver) const {
return try_load_repeat_(value, ver);
return impl_.try_load_repeat(ver, &val_, sizeof(val_), &value);
}

//! Load value.
Expand All @@ -112,101 +110,19 @@ template <class T> class Seqlock : public NonCopyable<> {
inline T wait_load() const {
T value;
seqlock_version_t ver;
wait_load_(value, ver);
impl_.wait_load(ver, &val_, sizeof(val_), &value);
return value;
}

//! Load value and version.
//! Like wait_load(), but also returns version.
inline void wait_load_ver(T& value, seqlock_version_t& ver) const {
wait_load_(value, ver);
impl_.wait_load(ver, &val_, sizeof(val_), &value);
}

private:
inline seqlock_version_t load_version_() const {
return AtomicOps::load_seq_cst(ver_);
}

inline void exclusive_store_(const T& value, seqlock_version_t& ver) {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::store_relaxed(ver_, ver0 + 1);
AtomicOps::fence_release();

volatile_copy_(val_, value);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);
}

inline bool try_store_(const T& value, seqlock_version_t& ver) {
seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
if (ver0 & 1) {
return false;
}

if (!AtomicOps::compare_exchange_relaxed(ver_, ver0, ver0 + 1)) {
return false;
}
AtomicOps::fence_release();

volatile_copy_(val_, value);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);

return true;
}

inline void wait_load_(T& value, seqlock_version_t& ver) const {
while (!try_load_(value, ver)) {
cpu_relax();
}
}

// If the concurrent store is running and is not sleeping, retrying 3 times
// should be enough to succeed. This may fail if the concurrent store was
// preempted in the middle, of if there are multiple concurrent stores.
inline bool try_load_repeat_(T& value, seqlock_version_t& ver) const {
if (try_load_(value, ver)) {
return true;
}
if (try_load_(value, ver)) {
return true;
}
if (try_load_(value, ver)) {
return true;
}
return false;
}

inline bool try_load_(T& value, seqlock_version_t& ver) const {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::fence_seq_cst();

volatile_copy_(value, val_);
AtomicOps::fence_acquire();

ver = AtomicOps::load_relaxed(ver_);
return ((ver0 & 1) == 0 && ver0 == ver);
}

// We use hand-rolled loop instead of memcpy() or default (trivial) copy constructor
// to be sure that the copying will be covered by our memory fences. On some
// platforms, memcpy() and copy constructor may be implemented using streaming
// instructions which may ignore memory fences.
static void volatile_copy_(volatile T& dst, const volatile T& src) {
volatile char* dst_ptr = reinterpret_cast<volatile char*>(&dst);
const volatile char* src_ptr = reinterpret_cast<const volatile char*>(&src);

for (size_t n = 0; n < sizeof(T); n++) {
dst_ptr[n] = src_ptr[n];
}
}

T val_;
seqlock_version_t ver_;
SeqlockImpl impl_;
};

} // namespace core
Expand Down
121 changes: 121 additions & 0 deletions src/internal_modules/roc_core/seqlock_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2020 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "roc_core/seqlock_impl.h"
#include "roc_core/atomic_ops.h"
#include "roc_core/cpu_instructions.h"

namespace {
// We use hand-rolled loop instead of memcpy() or default (trivial) copy constructor
// to be sure that the copying will be covered by our memory fences. On some
// platforms, memcpy() and copy constructor may be implemented using streaming
// instructions which may ignore memory fences.
void volatile_copy(volatile void* dst, const volatile void* src, size_t val_size) {
volatile char* dst_ptr = reinterpret_cast<volatile char*>(dst);
const volatile char* src_ptr = reinterpret_cast<const volatile char*>(src);

for (size_t n = 0; n < val_size; n++) {
dst_ptr[n] = src_ptr[n];
}
}

} // namespace

namespace roc {
namespace core {

SeqlockImpl::SeqlockImpl()
: ver_(0) {
}

seqlock_version_t SeqlockImpl::version() const {
return AtomicOps::load_seq_cst(ver_);
}

bool SeqlockImpl::try_store(seqlock_version_t& ver,
void* current_value,
size_t value_size,
const void* new_value) {
seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
if (ver0 & 1) {
return false;
}

if (!AtomicOps::compare_exchange_relaxed(ver_, ver0, ver0 + 1)) {
return false;
}
AtomicOps::fence_release();

volatile_copy(current_value, new_value, value_size);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);

return true;
}

void SeqlockImpl::exclusive_store(seqlock_version_t& ver,
void* current_value,
size_t value_size,
const void* new_value) {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::store_relaxed(ver_, ver0 + 1);
AtomicOps::fence_release();

volatile_copy(current_value, new_value, value_size);
AtomicOps::fence_seq_cst();

ver = ver0 + 2;
AtomicOps::store_relaxed(ver_, ver);
}

// If the concurrent store is running and is not sleeping, retrying 3 times
// should be enough to succeed. This may fail if the concurrent store was
// preempted in the middle, of if there are multiple concurrent stores.
bool SeqlockImpl::try_load_repeat(seqlock_version_t& ver,
const void* current_value,
size_t value_size,
void* return_value) const {
if (try_load_(ver, current_value, value_size, return_value)) {
return true;
}
if (try_load_(ver, current_value, value_size, return_value)) {
return true;
}
if (try_load_(ver, current_value, value_size, return_value)) {
return true;
}
return false;
}

void SeqlockImpl::wait_load(seqlock_version_t& ver,
const void* current_value,
size_t value_size,
void* return_value) const {
while (!try_load_(ver, current_value, value_size, return_value)) {
cpu_relax();
}
}

bool SeqlockImpl::try_load_(seqlock_version_t& ver,
const void* current_value,
size_t value_size,
void* return_value) const {
const seqlock_version_t ver0 = AtomicOps::load_relaxed(ver_);
AtomicOps::fence_seq_cst();

volatile_copy(return_value, current_value, value_size);
AtomicOps::fence_acquire();

ver = AtomicOps::load_relaxed(ver_);
return ((ver0 & 1) == 0 && ver0 == ver);
}

} // namespace core
} // namespace roc
67 changes: 67 additions & 0 deletions src/internal_modules/roc_core/seqlock_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2020 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/seqlock_impl.h
//! @brief Seqlock implementation.

#ifndef ROC_CORE_SEQLOCK_IMPL_H_
#define ROC_CORE_SEQLOCK_IMPL_H_

#include "roc_core/stddefs.h"

namespace roc {
namespace core {

typedef uint32_t seqlock_version_t;

//! Seqlock implementation.
class SeqlockImpl {
public:
//! Initialize.
SeqlockImpl();

//! Load value version.
seqlock_version_t version() const;

//! Try to store value.
bool try_store(seqlock_version_t& ver,
void* current_value,
size_t value_size,
const void* new_value);

//! Store value.
void exclusive_store(seqlock_version_t& ver,
void* current_value,
size_t value_size,
const void* new_value);

//! Try to load value and version.
bool try_load_repeat(seqlock_version_t& ver,
const void* current_value,
size_t value_size,
void* return_value) const;

//! Load value and version.
void wait_load(seqlock_version_t& ver,
const void* current_value,
size_t value_size,
void* return_value) const;

private:
bool try_load_(seqlock_version_t& ver,
const void* current_value,
size_t value_size,
void* return_value) const;

seqlock_version_t ver_;
};

} // namespace core
} // namespace roc

#endif // ROC_CORE_SEQLOCK_IMPL_H_

0 comments on commit 49137bc

Please sign in to comment.