1
0
mirror of https://gitlab.com/obbart/universal_robots_ros_driver.git synced 2026-04-10 01:50:46 +02:00

make queue match naming conventions

This commit is contained in:
Felix Mauch
2019-06-06 10:03:18 +02:00
committed by Tristan Schnell
parent 212ce85dfe
commit 9788504f93
3 changed files with 290 additions and 289 deletions

View File

@@ -191,7 +191,7 @@ public:
bool getLatestProduct(std::unique_ptr<URPackage<HeaderT>>& product, std::chrono::milliseconds timeout)
{
return queue_.wait_dequeue_timed(product, timeout);
return queue_.waitDequeTimed(product, timeout);
}
private:
@@ -268,7 +268,7 @@ private:
for (auto& p : products)
{
if (!queue_.try_enqueue(std::move(p)))
if (!queue_.tryEnqueue(std::move(p)))
{
LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str());
}
@@ -294,7 +294,7 @@ private:
// at roughly 125hz (every 8ms) and have to update
// the controllers (i.e. the consumer) with *at least* 125Hz
// So we update the consumer more frequently via onTimeout
if (!queue_.wait_dequeue_timed(product, std::chrono::milliseconds(8)))
if (!queue_.waitDequeTimed(product, std::chrono::milliseconds(8)))
{
consumer_->onTimeout();
continue;

View File

@@ -1,4 +1,4 @@
// ©2013-2016 Cameron Desrochers.
// ©2013-2016 Cameron Desrochers.
// Distributed under the simplified BSD license (see the license file that
// should have come with this header).
// Uses Jeff Preshing's semaphore implementation (under the terms of its
@@ -108,7 +108,7 @@ enum memory_order
namespace moodycamel
{
AE_FORCEINLINE void compiler_fence(memory_order order)
AE_FORCEINLINE void compilerFence(memory_order order)
{
switch (order)
{
@@ -199,7 +199,7 @@ AE_FORCEINLINE void fence(memory_order order)
namespace moodycamel
{
AE_FORCEINLINE void compiler_fence(memory_order order)
AE_FORCEINLINE void compilerFence(memory_order order)
{
switch (order)
{
@@ -265,29 +265,29 @@ AE_FORCEINLINE void fence(memory_order order)
namespace moodycamel
{
template <typename T>
class weak_atomic
class WeakAtomic
{
public:
weak_atomic()
WeakAtomic()
{
}
#ifdef AE_VCPP
#pragma warning(disable : 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning
#endif
template <typename U>
weak_atomic(U&& x) : value(std::forward<U>(x))
WeakAtomic(U&& x) : value_(std::forward<U>(x))
{
}
#ifdef __cplusplus_cli
// Work around bug with universal reference/nullptr combination that only appears when /clr is on
weak_atomic(nullptr_t) : value(nullptr)
WeakAtomic(nullptr_t) : value_(nullptr)
{
}
#endif
weak_atomic(weak_atomic const& other) : value(other.value)
WeakAtomic(WeakAtomic const& other) : value_(other.value_)
{
}
weak_atomic(weak_atomic&& other) : value(std::move(other.value))
WeakAtomic(WeakAtomic&& other) : value_(std::move(other.value_))
{
}
#ifdef AE_VCPP
@@ -301,80 +301,80 @@ public:
#ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC
template <typename U>
AE_FORCEINLINE weak_atomic const& operator=(U&& x)
AE_FORCEINLINE WeakAtomic const& operator=(U&& x)
{
value = std::forward<U>(x);
value_ = std::forward<U>(x);
return *this;
}
AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other)
AE_FORCEINLINE WeakAtomic const& operator=(WeakAtomic const& other)
{
value = other.value;
value_ = other.value_;
return *this;
}
AE_FORCEINLINE T load() const
{
return value;
return value_;
}
AE_FORCEINLINE T fetch_add_acquire(T increment)
AE_FORCEINLINE T fetchAddAcquire(T increment)
{
#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86)
if (sizeof(T) == 4)
return _InterlockedExchangeAdd((long volatile*)&value, (long)increment);
return _InterlockedExchangeAdd((long volatile*)&value_, (long)increment);
#if defined(_M_AMD64)
else if (sizeof(T) == 8)
return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment);
return _InterlockedExchangeAdd64((long long volatile*)&value_, (long long)increment);
#endif
#else
#error Unsupported platform
#endif
assert(false && "T must be either a 32 or 64 bit type");
return value;
return value_;
}
AE_FORCEINLINE T fetch_add_release(T increment)
AE_FORCEINLINE T fetchAddRelease(T increment)
{
#if defined(AE_ARCH_X64) || defined(AE_ARCH_X86)
if (sizeof(T) == 4)
return _InterlockedExchangeAdd((long volatile*)&value, (long)increment);
return _InterlockedExchangeAdd((long volatile*)&value_, (long)increment);
#if defined(_M_AMD64)
else if (sizeof(T) == 8)
return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment);
return _InterlockedExchangeAdd64((long long volatile*)&value_, (long long)increment);
#endif
#else
#error Unsupported platform
#endif
assert(false && "T must be either a 32 or 64 bit type");
return value;
return value_;
}
#else
template <typename U>
AE_FORCEINLINE weak_atomic const& operator=(U&& x)
AE_FORCEINLINE WeakAtomic const& operator=(U&& x)
{
value.store(std::forward<U>(x), std::memory_order_relaxed);
value_.store(std::forward<U>(x), std::memory_order_relaxed);
return *this;
}
AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other)
AE_FORCEINLINE WeakAtomic const& operator=(WeakAtomic const& other)
{
value.store(other.value.load(std::memory_order_relaxed), std::memory_order_relaxed);
value_.store(other.value_.load(std::memory_order_relaxed), std::memory_order_relaxed);
return *this;
}
AE_FORCEINLINE T load() const
{
return value.load(std::memory_order_relaxed);
return value_.load(std::memory_order_relaxed);
}
AE_FORCEINLINE T fetch_add_acquire(T increment)
AE_FORCEINLINE T fetchAddAcquire(T increment)
{
return value.fetch_add(increment, std::memory_order_acquire);
return value_.fetch_add(increment, std::memory_order_acquire);
}
AE_FORCEINLINE T fetch_add_release(T increment)
AE_FORCEINLINE T fetchAddRelease(T increment)
{
return value.fetch_add(increment, std::memory_order_release);
return value_.fetch_add(increment, std::memory_order_release);
}
#endif
@@ -382,9 +382,9 @@ private:
#ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC
// No std::atomic support, but still need to circumvent compiler optimizations.
// `volatile` will make memory access slow, but is guaranteed to be reliable.
volatile T value;
volatile T value_;
#else
std::atomic<T> value;
std::atomic<T> value_;
#endif
};
@@ -465,13 +465,13 @@ public:
WaitForSingleObject(m_hSema, infinite);
}
bool try_wait()
bool tryWait()
{
const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
}
bool timed_wait(std::uint64_t usecs)
bool timedWait(std::uint64_t usecs)
{
const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
@@ -490,7 +490,7 @@ public:
class Semaphore
{
private:
semaphore_t m_sema;
semaphore_t sema_;
Semaphore(const Semaphore& other);
Semaphore& operator=(const Semaphore& other);
@@ -499,25 +499,25 @@ public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
semaphore_create(mach_task_self(), &sema_, SYNC_POLICY_FIFO, initialCount);
}
~Semaphore()
{
semaphore_destroy(mach_task_self(), m_sema);
semaphore_destroy(mach_task_self(), sema_);
}
void wait()
{
semaphore_wait(m_sema);
semaphore_wait(sema_);
}
bool try_wait()
bool tryWait()
{
return timed_wait(0);
return timedWait(0);
}
bool timed_wait(std::int64_t timeout_usecs)
bool timedWait(std::int64_t timeout_usecs)
{
mach_timespec_t ts;
ts.tv_sec = timeout_usecs / 1000000;
@@ -525,21 +525,21 @@ public:
// added in OSX 10.10:
// https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
kern_return_t rc = semaphore_timedwait(m_sema, ts);
kern_return_t rc = semaphore_timedwait(sema_, ts);
return rc != KERN_OPERATION_TIMED_OUT;
}
void signal()
{
semaphore_signal(m_sema);
semaphore_signal(sema_);
}
void signal(int count)
{
while (count-- > 0)
{
semaphore_signal(m_sema);
semaphore_signal(sema_);
}
}
};
@@ -550,7 +550,7 @@ public:
class Semaphore
{
private:
sem_t m_sema;
sem_t sema_;
Semaphore(const Semaphore& other);
Semaphore& operator=(const Semaphore& other);
@@ -559,12 +559,12 @@ public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
sem_init(&m_sema, 0, initialCount);
sem_init(&sema_, 0, initialCount);
}
~Semaphore()
{
sem_destroy(&m_sema);
sem_destroy(&sema_);
}
void wait()
@@ -573,21 +573,21 @@ public:
int rc;
do
{
rc = sem_wait(&m_sema);
rc = sem_wait(&sema_);
} while (rc == -1 && errno == EINTR);
}
bool try_wait()
bool tryWait()
{
int rc;
do
{
rc = sem_trywait(&m_sema);
rc = sem_trywait(&sema_);
} while (rc == -1 && errno == EINTR);
return !(rc == -1 && errno == EAGAIN);
}
bool timed_wait(std::uint64_t usecs)
bool timedWait(std::uint64_t usecs)
{
struct timespec ts;
const int usecs_in_1_sec = 1000000;
@@ -606,21 +606,21 @@ public:
int rc;
do
{
rc = sem_timedwait(&m_sema, &ts);
rc = sem_timedwait(&sema_, &ts);
} while (rc == -1 && errno == EINTR);
return !(rc == -1 && errno == ETIMEDOUT);
}
void signal()
{
sem_post(&m_sema);
sem_post(&sema_);
}
void signal(int count)
{
while (count-- > 0)
{
sem_post(&m_sema);
sem_post(&sema_);
}
}
};
@@ -637,34 +637,34 @@ public:
typedef std::make_signed<std::size_t>::type ssize_t;
private:
weak_atomic<ssize_t> m_count;
Semaphore m_sema;
WeakAtomic<ssize_t> count_;
Semaphore sema_;
bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
{
ssize_t oldCount;
ssize_t old_count;
// Is there a better way to set the initial spin count?
// If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
// as threads start hitting the kernel semaphore.
int spin = 10000;
while (--spin >= 0)
{
if (m_count.load() > 0)
if (count_.load() > 0)
{
m_count.fetch_add_acquire(-1);
count_.fetchAddAcquire(-1);
return true;
}
compiler_fence(memory_order_acquire); // Prevent the compiler from collapsing the loop.
compilerFence(memory_order_acquire); // Prevent the compiler from collapsing the loop.
}
oldCount = m_count.fetch_add_acquire(-1);
if (oldCount > 0)
old_count = count_.fetchAddAcquire(-1);
if (old_count > 0)
return true;
if (timeout_usecs < 0)
{
m_sema.wait();
sema_.wait();
return true;
}
if (m_sema.timed_wait(timeout_usecs))
if (sema_.timedWait(timeout_usecs))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
@@ -673,27 +673,27 @@ private:
// need to release the semaphore too.
while (true)
{
oldCount = m_count.fetch_add_release(1);
if (oldCount < 0)
old_count = count_.fetchAddRelease(1);
if (old_count < 0)
return false; // successfully restored things to the way they were
// Oh, the producer thread just signaled the semaphore after all. Try again:
oldCount = m_count.fetch_add_acquire(-1);
if (oldCount > 0 && m_sema.try_wait())
old_count = count_.fetchAddAcquire(-1);
if (old_count > 0 && sema_.tryWait())
return true;
}
}
public:
LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
LightweightSemaphore(ssize_t initialCount = 0) : count_(initialCount)
{
assert(initialCount >= 0);
}
bool tryWait()
{
if (m_count.load() > 0)
if (count_.load() > 0)
{
m_count.fetch_add_acquire(-1);
count_.fetchAddAcquire(-1);
return true;
}
return false;
@@ -713,17 +713,17 @@ public:
void signal(ssize_t count = 1)
{
assert(count >= 0);
ssize_t oldCount = m_count.fetch_add_release(count);
assert(oldCount >= -1);
if (oldCount < 0)
ssize_t old_count = count_.fetchAddRelease(count);
assert(old_count >= -1);
if (old_count < 0)
{
m_sema.signal(1);
sema_.signal(1);
}
}
ssize_t availableApprox() const
{
ssize_t count = m_count.load();
ssize_t count = count_.load();
return count > 0 ? count : 0;
}
};

View File

@@ -67,7 +67,7 @@ class ReaderWriterQueue
// thread owns all the front indices/pointers. Both threads read each
// other's variables, but only the owning thread updates them. E.g. After
// the consumer reads the producer's tail, the tail may change before the
// consumer is done dequeuing an object, but the consumer knows the tail
// consumer is done dequeuing_ an object, but the consumer knows the tail
// will never go backwards, only forwards.
// If there is no room to enqueue an object, an additional block (of
// equal size to the last block) is added. Blocks are never removed.
@@ -77,19 +77,19 @@ public:
// allocations. If more than MAX_BLOCK_SIZE elements are requested,
// then several blocks of MAX_BLOCK_SIZE each are reserved (including
// at least one extra buffer block).
explicit ReaderWriterQueue(size_t maxSize = 15)
explicit ReaderWriterQueue(size_t max_size = 15)
#ifndef NDEBUG
: enqueuing(false), dequeuing(false)
: enqueuing_(false), dequeuing_(false)
#endif
{
assert(maxSize > 0);
assert(max_size > 0);
assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
Block* firstBlock = nullptr;
Block* first_block = nullptr;
largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
if (largestBlockSize > MAX_BLOCK_SIZE * 2)
largest_block_size_ = ceilToPow2(max_size + 1); // We need a spare slot to fit maxSize elements in the block
if (largest_block_size_ > MAX_BLOCK_SIZE * 2)
{
// We need a spare block in case the producer is writing to a different block the consumer is reading from, and
// wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the
@@ -97,12 +97,12 @@ public:
// between front == tail meaning "empty" and "full".
// So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
// number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
largestBlockSize = MAX_BLOCK_SIZE;
Block* lastBlock = nullptr;
for (size_t i = 0; i != initialBlockCount; ++i)
size_t initial_block_count = (max_size + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
largest_block_size_ = MAX_BLOCK_SIZE;
Block* last_block = nullptr;
for (size_t i = 0; i != initial_block_count; ++i)
{
auto block = make_block(largestBlockSize);
auto block = makeBlock(largest_block_size_);
if (block == nullptr)
{
#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
@@ -111,22 +111,22 @@ public:
abort();
#endif
}
if (firstBlock == nullptr)
if (first_block == nullptr)
{
firstBlock = block;
first_block = block;
}
else
{
lastBlock->next = block;
last_block->next = block;
}
lastBlock = block;
block->next = firstBlock;
last_block = block;
block->next = first_block;
}
}
else
{
firstBlock = make_block(largestBlockSize);
if (firstBlock == nullptr)
first_block = makeBlock(largest_block_size_);
if (first_block == nullptr)
{
#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
throw std::bad_alloc();
@@ -134,10 +134,10 @@ public:
abort();
#endif
}
firstBlock->next = firstBlock;
first_block->next = first_block;
}
frontBlock = firstBlock;
tailBlock = firstBlock;
front_block_ = first_block;
tail_block_ = first_block;
// Make sure the reader/writer threads will have the initialized memory setup above:
fence(memory_order_sync);
@@ -151,42 +151,42 @@ public:
fence(memory_order_sync);
// Destroy any remaining objects in queue and free memory
Block* frontBlock_ = frontBlock;
Block* block = frontBlock_;
Block* front_block = front_block_;
Block* block = front_block;
do
{
Block* nextBlock = block->next;
size_t blockFront = block->front;
size_t blockTail = block->tail;
Block* next_block = block->next;
size_t block_front = block->front;
size_t block_tail = block->tail;
for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask)
for (size_t i = block_front; i != block_tail; i = (i + 1) & block->sizeMask)
{
auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
element->~T();
(void)element;
}
auto rawBlock = block->rawThis;
auto raw_block = block->rawThis;
block->~Block();
std::free(rawBlock);
block = nextBlock;
} while (block != frontBlock_);
std::free(raw_block);
block = next_block;
} while (block != front_block);
}
// Enqueues a copy of element if there is room in the queue.
// Returns true if the element was enqueued, false otherwise.
// Does not allocate memory.
AE_FORCEINLINE bool try_enqueue(T const& element)
AE_FORCEINLINE bool tryEnqueue(T const& element)
{
return inner_enqueue<CannotAlloc>(element);
return innerEnqueue<CannotAlloc>(element);
}
// Enqueues a moved copy of element if there is room in the queue.
// Returns true if the element was enqueued, false otherwise.
// Does not allocate memory.
AE_FORCEINLINE bool try_enqueue(T&& element)
AE_FORCEINLINE bool tryEnqueue(T&& element)
{
return inner_enqueue<CannotAlloc>(std::forward<T>(element));
return innerEnqueue<CannotAlloc>(std::forward<T>(element));
}
// Enqueues a copy of element on the queue.
@@ -194,7 +194,7 @@ public:
// Only fails (returns false) if memory allocation fails.
AE_FORCEINLINE bool enqueue(T const& element)
{
return inner_enqueue<CanAlloc>(element);
return innerEnqueue<CanAlloc>(element);
}
// Enqueues a moved copy of element on the queue.
@@ -202,17 +202,17 @@ public:
// Only fails (returns false) if memory allocation fails.
AE_FORCEINLINE bool enqueue(T&& element)
{
return inner_enqueue<CanAlloc>(std::forward<T>(element));
return innerEnqueue<CanAlloc>(std::forward<T>(element));
}
// Attempts to dequeue an element; if the queue is empty,
// returns false instead. If the queue has at least one element,
// moves front to result using operator=, then returns true.
template <typename U>
bool try_dequeue(U& result)
bool tryDequeue(U& result)
{
#ifndef NDEBUG
ReentrantGuard guard(this->dequeuing);
ReentrantGuard guard(this->dequeuing_);
#endif
// High-level pseudocode:
@@ -232,70 +232,70 @@ public:
// then re-read the front block and check if it's not empty again, then check if the tail
// block has advanced.
Block* frontBlock_ = frontBlock.load();
size_t blockTail = frontBlock_->localTail;
size_t blockFront = frontBlock_->front.load();
Block* front_block = front_block_.load();
size_t block_tail = front_block->localTail;
size_t block_front = front_block->front.load();
if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load()))
if (block_front != block_tail || block_front != (front_block->localTail = front_block->tail.load()))
{
fence(memory_order_acquire);
non_empty_front_block:
// Front block not empty, dequeue from here
auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
auto element = reinterpret_cast<T*>(front_block->data + block_front * sizeof(T));
result = std::move(*element);
element->~T();
blockFront = (blockFront + 1) & frontBlock_->sizeMask;
block_front = (block_front + 1) & front_block->sizeMask;
fence(memory_order_release);
frontBlock_->front = blockFront;
front_block->front = block_front;
}
else if (frontBlock_ != tailBlock.load())
else if (front_block != tail_block_.load())
{
fence(memory_order_acquire);
frontBlock_ = frontBlock.load();
blockTail = frontBlock_->localTail = frontBlock_->tail.load();
blockFront = frontBlock_->front.load();
front_block = front_block_.load();
block_tail = front_block->localTail = front_block->tail.load();
block_front = front_block->front.load();
fence(memory_order_acquire);
if (blockFront != blockTail)
if (block_front != block_tail)
{
// Oh look, the front block isn't empty after all
goto non_empty_front_block;
}
// Front block is empty but there's another block ahead, advance to it
Block* nextBlock = frontBlock_->next;
// Don't need an acquire fence here since next can only ever be set on the tailBlock,
// and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
// ensures next is up-to-date on this CPU in case we recently were at tailBlock.
Block* next_block = front_block->next;
// Don't need an acquire fence here since next can only ever be set on the tail_block,
// and we're not the tail_block, and we did an acquire earlier after reading tail_block which
// ensures next is up-to-date on this CPU in case we recently were at tail_block.
size_t nextBlockFront = nextBlock->front.load();
size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
size_t next_block_front = next_block->front.load();
size_t next_block_tail = next_block->localTail = next_block->tail.load();
fence(memory_order_acquire);
// Since the tailBlock is only ever advanced after being written to,
// Since the tail_block is only ever advanced after being written to,
// we know there's for sure an element to dequeue on it
assert(nextBlockFront != nextBlockTail);
AE_UNUSED(nextBlockTail);
assert(next_block_front != next_block_tail);
AE_UNUSED(next_block_tail);
// We're done with this block, let the producer use it if it needs
fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
frontBlock = frontBlock_ = nextBlock;
fence(memory_order_release); // Expose possibly pending changes to front_block->front from last dequeue
front_block_ = front_block = next_block;
compiler_fence(memory_order_release); // Not strictly needed
compilerFence(memory_order_release); // Not strictly needed
auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
auto element = reinterpret_cast<T*>(front_block->data + next_block_front * sizeof(T));
result = std::move(*element);
element->~T();
nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
next_block_front = (next_block_front + 1) & front_block->sizeMask;
fence(memory_order_release);
frontBlock_->front = nextBlockFront;
front_block->front = next_block_front;
}
else
{
@@ -307,47 +307,47 @@ public:
}
// Returns a pointer to the front element in the queue (the one that
// would be removed next by a call to `try_dequeue` or `pop`). If the
// would be removed next by a call to `tryDequeue` or `pop`). If the
// queue appears empty at the time the method is called, nullptr is
// returned instead.
// Must be called only from the consumer thread.
T* peek()
{
#ifndef NDEBUG
ReentrantGuard guard(this->dequeuing);
ReentrantGuard guard(this->dequeuing_);
#endif
// See try_dequeue() for reasoning
// See tryDequeue() for reasoning
Block* frontBlock_ = frontBlock.load();
size_t blockTail = frontBlock_->localTail;
size_t blockFront = frontBlock_->front.load();
Block* front_block = front_block_.load();
size_t block_tail = front_block->localTail;
size_t block_front = front_block->front.load();
if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load()))
if (block_front != block_tail || block_front != (front_block->localTail = front_block->tail.load()))
{
fence(memory_order_acquire);
non_empty_front_block:
return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
return reinterpret_cast<T*>(front_block->data + block_front * sizeof(T));
}
else if (frontBlock_ != tailBlock.load())
else if (front_block != tail_block_.load())
{
fence(memory_order_acquire);
frontBlock_ = frontBlock.load();
blockTail = frontBlock_->localTail = frontBlock_->tail.load();
blockFront = frontBlock_->front.load();
front_block = front_block_.load();
block_tail = front_block->localTail = front_block->tail.load();
block_front = front_block->front.load();
fence(memory_order_acquire);
if (blockFront != blockTail)
if (block_front != block_tail)
{
goto non_empty_front_block;
}
Block* nextBlock = frontBlock_->next;
Block* next_block = front_block->next;
size_t nextBlockFront = nextBlock->front.load();
size_t next_block_front = next_block->front.load();
fence(memory_order_acquire);
assert(nextBlockFront != nextBlock->tail.load());
return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
assert(next_block_front != next_block->tail.load());
return reinterpret_cast<T*>(next_block->data + next_block_front * sizeof(T));
}
return nullptr;
@@ -359,62 +359,62 @@ public:
bool pop()
{
#ifndef NDEBUG
ReentrantGuard guard(this->dequeuing);
ReentrantGuard guard(this->dequeuing_);
#endif
// See try_dequeue() for reasoning
// See tryDequeue() for reasoning
Block* frontBlock_ = frontBlock.load();
size_t blockTail = frontBlock_->localTail;
size_t blockFront = frontBlock_->front.load();
Block* front_block = front_block_.load();
size_t block_tail = front_block->localTail;
size_t block_front = front_block->front.load();
if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load()))
if (block_front != block_tail || block_front != (front_block->localTail = front_block->tail.load()))
{
fence(memory_order_acquire);
non_empty_front_block:
auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
auto element = reinterpret_cast<T*>(front_block->data + block_front * sizeof(T));
element->~T();
blockFront = (blockFront + 1) & frontBlock_->sizeMask;
block_front = (block_front + 1) & front_block->sizeMask;
fence(memory_order_release);
frontBlock_->front = blockFront;
front_block->front = block_front;
}
else if (frontBlock_ != tailBlock.load())
else if (front_block != tail_block_.load())
{
fence(memory_order_acquire);
frontBlock_ = frontBlock.load();
blockTail = frontBlock_->localTail = frontBlock_->tail.load();
blockFront = frontBlock_->front.load();
front_block = front_block_.load();
block_tail = front_block->localTail = front_block->tail.load();
block_front = front_block->front.load();
fence(memory_order_acquire);
if (blockFront != blockTail)
if (block_front != block_tail)
{
goto non_empty_front_block;
}
// Front block is empty but there's another block ahead, advance to it
Block* nextBlock = frontBlock_->next;
Block* next_block = front_block->next;
size_t nextBlockFront = nextBlock->front.load();
size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
size_t next_block_front = next_block->front.load();
size_t next_block_tail = next_block->localTail = next_block->tail.load();
fence(memory_order_acquire);
assert(nextBlockFront != nextBlockTail);
AE_UNUSED(nextBlockTail);
assert(next_block_front != next_block_tail);
AE_UNUSED(next_block_tail);
fence(memory_order_release);
frontBlock = frontBlock_ = nextBlock;
front_block_ = front_block = next_block;
compiler_fence(memory_order_release);
compilerFence(memory_order_release);
auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
auto element = reinterpret_cast<T*>(front_block->data + next_block_front * sizeof(T));
element->~T();
nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
next_block_front = (next_block_front + 1) & front_block->sizeMask;
fence(memory_order_release);
frontBlock_->front = nextBlockFront;
front_block->front = next_block_front;
}
else
{
@@ -427,19 +427,19 @@ public:
// Returns the approximate number of items currently in the queue.
// Safe to call from both the producer and consumer threads.
inline size_t size_approx() const
inline size_t sizeApprox() const
{
size_t result = 0;
Block* frontBlock_ = frontBlock.load();
Block* block = frontBlock_;
Block* front_block = front_block_.load();
Block* block = front_block;
do
{
fence(memory_order_acquire);
size_t blockFront = block->front.load();
size_t blockTail = block->tail.load();
result += (blockTail - blockFront) & block->sizeMask;
size_t block_front = block->front.load();
size_t block_tail = block->tail.load();
result += (block_tail - block_front) & block->sizeMask;
block = block->next.load();
} while (block != frontBlock_);
} while (block != front_block);
return result;
}
@@ -451,10 +451,10 @@ private:
};
template <AllocationMode canAlloc, typename U>
bool inner_enqueue(U&& element)
bool innerEnqueue(U&& element)
{
#ifndef NDEBUG
ReentrantGuard guard(this->enqueuing);
ReentrantGuard guard(this->enqueuing_);
#endif
// High-level pseudocode (assuming we're allowed to alloc a new block):
@@ -464,80 +464,80 @@ private:
// Else create a new block and enqueue there
// Advance tail to the block we just enqueued to
Block* tailBlock_ = tailBlock.load();
size_t blockFront = tailBlock_->localFront;
size_t blockTail = tailBlock_->tail.load();
Block* tail_block = tail_block_.load();
size_t block_front = tail_block->localFront;
size_t block_tail = tail_block->tail.load();
size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load()))
size_t next_block_tail = (block_tail + 1) & tail_block->sizeMask;
if (next_block_tail != block_front || next_block_tail != (tail_block->localFront = tail_block->front.load()))
{
fence(memory_order_acquire);
// This block has room for at least one more element
char* location = tailBlock_->data + blockTail * sizeof(T);
char* location = tail_block->data + block_tail * sizeof(T);
new (location) T(std::forward<U>(element));
fence(memory_order_release);
tailBlock_->tail = nextBlockTail;
tail_block->tail = next_block_tail;
}
else
{
fence(memory_order_acquire);
if (tailBlock_->next.load() != frontBlock)
if (tail_block->next.load() != front_block_)
{
// Note that the reason we can't advance to the frontBlock and start adding new entries there
// Note that the reason we can't advance to the front_block and start adding new entries there
// is because if we did, then dequeue would stay in that block, eventually reading the new values,
// instead of advancing to the next full block (whose values were enqueued first and so should be
// consumed first).
fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
fence(memory_order_acquire); // Ensure we get latest writes if we got the latest front_block
// tailBlock is full, but there's a free block ahead, use it
Block* tailBlockNext = tailBlock_->next.load();
size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
nextBlockTail = tailBlockNext->tail.load();
// tail_block is full, but there's a free block ahead, use it
Block* tail_block_next = tail_block->next.load();
size_t next_block_front = tail_block_next->localFront = tail_block_next->front.load();
next_block_tail = tail_block_next->tail.load();
fence(memory_order_acquire);
// This block must be empty since it's not the head block and we
// go through the blocks in a circle
assert(nextBlockFront == nextBlockTail);
tailBlockNext->localFront = nextBlockFront;
assert(next_block_front == next_block_tail);
tail_block_next->localFront = next_block_front;
char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
char* location = tail_block_next->data + next_block_tail * sizeof(T);
new (location) T(std::forward<U>(element));
tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
tail_block_next->tail = (next_block_tail + 1) & tail_block_next->sizeMask;
fence(memory_order_release);
tailBlock = tailBlockNext;
tail_block_ = tail_block_next;
}
else if (canAlloc == CanAlloc)
{
// tailBlock is full and there's no free block ahead; create a new block
auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
auto newBlock = make_block(newBlockSize);
if (newBlock == nullptr)
// tail_block is full and there's no free block ahead; create a new block
auto new_block_size = largest_block_size_ >= MAX_BLOCK_SIZE ? largest_block_size_ : largest_block_size_ * 2;
auto new_block = makeBlock(new_block_size);
if (new_block == nullptr)
{
// Could not allocate a block!
return false;
}
largestBlockSize = newBlockSize;
largest_block_size_ = new_block_size;
new (newBlock->data) T(std::forward<U>(element));
new (new_block->data) T(std::forward<U>(element));
assert(newBlock->front == 0);
newBlock->tail = newBlock->localTail = 1;
assert(new_block->front == 0);
new_block->tail = new_block->localTail = 1;
newBlock->next = tailBlock_->next.load();
tailBlock_->next = newBlock;
new_block->next = tail_block->next.load();
tail_block->next = new_block;
// Might be possible for the dequeue thread to see the new tailBlock->next
// *without* seeing the new tailBlock value, but this is OK since it can't
// advance to the next block until tailBlock is set anyway (because the only
// case where it could try to read the next is if it's already at the tailBlock,
// and it won't advance past tailBlock in any circumstance).
// Might be possible for the dequeue thread to see the new tail_block->next
// *without* seeing the new tail_block value, but this is OK since it can't
// advance to the next block until tail_block is set anyway (because the only
// case where it could try to read the next is if it's already at the tail_block,
// and it won't advance past tail_block in any circumstance).
fence(memory_order_release);
tailBlock = newBlock;
tail_block_ = new_block;
}
else if (canAlloc == CannotAlloc)
{
@@ -580,7 +580,7 @@ private:
}
template <typename U>
static AE_FORCEINLINE char* align_for(char* ptr)
static AE_FORCEINLINE char* alignFor(char* ptr)
{
const std::size_t alignment = std::alignment_of<U>::value;
return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
@@ -590,37 +590,38 @@ private:
#ifndef NDEBUG
struct ReentrantGuard
{
ReentrantGuard(bool& _inSection) : inSection(_inSection)
ReentrantGuard(bool& in_section) : in_section_(in_section)
{
assert(!inSection && "ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' "
"ctors and dtors");
inSection = true;
assert(!in_section_ && "ReaderWriterQueue does not support enqueuing_ or dequeuing_ elements from other "
"elements' "
"ctors and dtors");
in_section_ = true;
}
~ReentrantGuard()
{
inSection = false;
in_section_ = false;
}
private:
ReentrantGuard& operator=(ReentrantGuard const&);
private:
bool& inSection;
bool& in_section_;
};
#endif
struct Block
{
// Avoid false-sharing by putting highly contended variables on their own cache lines
weak_atomic<size_t> front; // (Atomic) Elements are read from here
size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
WeakAtomic<size_t> front; // (Atomic) Elements are read from here
size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
char cachelineFiller0_[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(WeakAtomic<size_t>) - sizeof(size_t)];
WeakAtomic<size_t> tail; // (Atomic) Elements are enqueued here
size_t localFront;
char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't
char cachelineFiller1_[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(WeakAtomic<size_t>) - sizeof(size_t)]; // next isn't
// very
// contended, but
// we don't want
@@ -628,7 +629,7 @@ private:
// cache line as
// tail (which
// is)
weak_atomic<Block*> next; // (Atomic)
WeakAtomic<Block*> next; // (Atomic)
char* data; // Contents (on heap) are aligned to T's alignment
@@ -655,33 +656,33 @@ private:
char* rawThis;
};
static Block* make_block(size_t capacity)
static Block* makeBlock(size_t capacity)
{
// Allocate enough memory for the block itself, as well as all the elements it will contain
auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
auto newBlockRaw = static_cast<char*>(std::malloc(size));
if (newBlockRaw == nullptr)
auto new_block_raw = static_cast<char*>(std::malloc(size));
if (new_block_raw == nullptr)
{
return nullptr;
}
auto newBlockAligned = align_for<Block>(newBlockRaw);
auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
auto new_block_aligned = alignFor<Block>(new_block_raw);
auto new_block_data = alignFor<T>(new_block_aligned + sizeof(Block));
return new (new_block_aligned) Block(capacity, new_block_raw, new_block_data);
}
private:
weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
WeakAtomic<Block*> front_block_; // (Atomic) Elements are enqueued to this block
char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
char cachelineFiller_[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(WeakAtomic<Block*>)];
WeakAtomic<Block*> tail_block_; // (Atomic) Elements are dequeued from this block
size_t largestBlockSize;
size_t largest_block_size_;
#ifndef NDEBUG
bool enqueuing;
bool dequeuing;
bool enqueuing_;
bool dequeuing_;
#endif
};
@@ -693,18 +694,18 @@ private:
typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
public:
explicit BlockingReaderWriterQueue(size_t maxSize = 15) : inner(maxSize)
explicit BlockingReaderWriterQueue(size_t maxSize = 15) : inner_(maxSize)
{
}
// Enqueues a copy of element if there is room in the queue.
// Returns true if the element was enqueued, false otherwise.
// Does not allocate memory.
AE_FORCEINLINE bool try_enqueue(T const& element)
AE_FORCEINLINE bool tryEnqueue(T const& element)
{
if (inner.try_enqueue(element))
if (inner_.tryEnqueue(element))
{
sema.signal();
sema_.signal();
return true;
}
return false;
@@ -713,11 +714,11 @@ public:
// Enqueues a moved copy of element if there is room in the queue.
// Returns true if the element was enqueued, false otherwise.
// Does not allocate memory.
AE_FORCEINLINE bool try_enqueue(T&& element)
AE_FORCEINLINE bool tryEnqueue(T&& element)
{
if (inner.try_enqueue(std::forward<T>(element)))
if (inner_.tryEnqueue(std::forward<T>(element)))
{
sema.signal();
sema_.signal();
return true;
}
return false;
@@ -728,9 +729,9 @@ public:
// Only fails (returns false) if memory allocation fails.
AE_FORCEINLINE bool enqueue(T const& element)
{
if (inner.enqueue(element))
if (inner_.enqueue(element))
{
sema.signal();
sema_.signal();
return true;
}
return false;
@@ -741,9 +742,9 @@ public:
// Only fails (returns false) if memory allocation fails.
AE_FORCEINLINE bool enqueue(T&& element)
{
if (inner.enqueue(std::forward<T>(element)))
if (inner_.enqueue(std::forward<T>(element)))
{
sema.signal();
sema_.signal();
return true;
}
return false;
@@ -753,11 +754,11 @@ public:
// returns false instead. If the queue has at least one element,
// moves front to result using operator=, then returns true.
template <typename U>
bool try_dequeue(U& result)
bool tryDequeue(U& result)
{
if (sema.tryWait())
if (sema_.tryWait())
{
bool success = inner.try_dequeue(result);
bool success = inner_.tryDequeue(result);
assert(success);
AE_UNUSED(success);
return true;
@@ -768,10 +769,10 @@ public:
// Attempts to dequeue an element; if the queue is empty,
// waits until an element is available, then dequeues it.
template <typename U>
void wait_dequeue(U& result)
void waitDequeue(U& result)
{
sema.wait();
bool success = inner.try_dequeue(result);
sema_.wait();
bool success = inner_.tryDequeue(result);
AE_UNUSED(result);
assert(success);
AE_UNUSED(success);
@@ -782,15 +783,15 @@ public:
// then dequeues it and returns true, or returns false if the timeout
// expires before an element can be dequeued.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
// and is thus functionally equivalent to calling waitDequeue.
template <typename U>
bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs)
bool waitDequeTimed(U& result, std::int64_t timeout_usecs)
{
if (!sema.wait(timeout_usecs))
if (!sema_.wait(timeout_usecs))
{
return false;
}
bool success = inner.try_dequeue(result);
bool success = inner_.tryDequeue(result);
AE_UNUSED(result);
assert(success);
AE_UNUSED(success);
@@ -803,22 +804,22 @@ public:
// then dequeues it and returns true, or returns false if the timeout
// expires before an element can be dequeued.
// Using a negative timeout indicates an indefinite timeout,
// and is thus functionally equivalent to calling wait_dequeue.
// and is thus functionally equivalent to calling waitDequeue.
template <typename U, typename Rep, typename Period>
inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout)
inline bool waitDequeTimed(U& result, std::chrono::duration<Rep, Period> const& timeout)
{
return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
return waitDequeTimed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
}
#endif
// Returns a pointer to the front element in the queue (the one that
// would be removed next by a call to `try_dequeue` or `pop`). If the
// would be removed next by a call to `tryDequeue` or `pop`). If the
// queue appears empty at the time the method is called, nullptr is
// returned instead.
// Must be called only from the consumer thread.
AE_FORCEINLINE T* peek()
{
return inner.peek();
return inner_.peek();
}
// Removes the front element from the queue, if any, without returning it.
@@ -826,9 +827,9 @@ public:
// `pop` was called.
AE_FORCEINLINE bool pop()
{
if (sema.tryWait())
if (sema_.tryWait())
{
bool result = inner.pop();
bool result = inner_.pop();
assert(result);
AE_UNUSED(result);
return true;
@@ -838,9 +839,9 @@ public:
// Returns the approximate number of items currently in the queue.
// Safe to call from both the producer and consumer threads.
AE_FORCEINLINE size_t size_approx() const
AE_FORCEINLINE size_t sizeApprox() const
{
return sema.availableApprox();
return sema_.availableApprox();
}
private:
@@ -853,8 +854,8 @@ private:
}
private:
ReaderWriterQueue inner;
spsc_sema::LightweightSemaphore sema;
ReaderWriterQueue inner_;
spsc_sema::LightweightSemaphore sema_;
};
} // end namespace moodycamel