From 9788504f937acbf6e234f372fd1234f95aa233a3 Mon Sep 17 00:00:00 2001 From: Felix Mauch Date: Thu, 6 Jun 2019 10:03:18 +0200 Subject: [PATCH] make queue match naming conventions --- .../include/ur_rtde_driver/comm/pipeline.h | 6 +- .../include/ur_rtde_driver/queue/atomicops.h | 154 +++---- .../ur_rtde_driver/queue/readerwriterqueue.h | 419 +++++++++--------- 3 files changed, 290 insertions(+), 289 deletions(-) diff --git a/ur_rtde_driver/include/ur_rtde_driver/comm/pipeline.h b/ur_rtde_driver/include/ur_rtde_driver/comm/pipeline.h index 12ee28f..6a85056 100644 --- a/ur_rtde_driver/include/ur_rtde_driver/comm/pipeline.h +++ b/ur_rtde_driver/include/ur_rtde_driver/comm/pipeline.h @@ -191,7 +191,7 @@ public: bool getLatestProduct(std::unique_ptr>& product, std::chrono::milliseconds timeout) { - return queue_.wait_dequeue_timed(product, timeout); + return queue_.waitDequeTimed(product, timeout); } private: @@ -268,7 +268,7 @@ private: for (auto& p : products) { - if (!queue_.try_enqueue(std::move(p))) + if (!queue_.tryEnqueue(std::move(p))) { LOG_ERROR("Pipeline producer overflowed! <%s>", name_.c_str()); } @@ -294,7 +294,7 @@ private: // at roughly 125hz (every 8ms) and have to update // the controllers (i.e. the consumer) with *at least* 125Hz // So we update the consumer more frequently via onTimeout - if (!queue_.wait_dequeue_timed(product, std::chrono::milliseconds(8))) + if (!queue_.waitDequeTimed(product, std::chrono::milliseconds(8))) { consumer_->onTimeout(); continue; diff --git a/ur_rtde_driver/include/ur_rtde_driver/queue/atomicops.h b/ur_rtde_driver/include/ur_rtde_driver/queue/atomicops.h index 8cbf463..aa4590f 100644 --- a/ur_rtde_driver/include/ur_rtde_driver/queue/atomicops.h +++ b/ur_rtde_driver/include/ur_rtde_driver/queue/atomicops.h @@ -1,4 +1,4 @@ -// ©2013-2016 Cameron Desrochers. +// ©2013-2016 Cameron Desrochers. // Distributed under the simplified BSD license (see the license file that // should have come with this header). // Uses Jeff Preshing's semaphore implementation (under the terms of its @@ -108,7 +108,7 @@ enum memory_order namespace moodycamel { -AE_FORCEINLINE void compiler_fence(memory_order order) +AE_FORCEINLINE void compilerFence(memory_order order) { switch (order) { @@ -199,7 +199,7 @@ AE_FORCEINLINE void fence(memory_order order) namespace moodycamel { -AE_FORCEINLINE void compiler_fence(memory_order order) +AE_FORCEINLINE void compilerFence(memory_order order) { switch (order) { @@ -265,29 +265,29 @@ AE_FORCEINLINE void fence(memory_order order) namespace moodycamel { template -class weak_atomic +class WeakAtomic { public: - weak_atomic() + WeakAtomic() { } #ifdef AE_VCPP #pragma warning(disable : 4100) // Get rid of (erroneous) 'unreferenced formal parameter' warning #endif template - weak_atomic(U&& x) : value(std::forward(x)) + WeakAtomic(U&& x) : value_(std::forward(x)) { } #ifdef __cplusplus_cli // Work around bug with universal reference/nullptr combination that only appears when /clr is on - weak_atomic(nullptr_t) : value(nullptr) + WeakAtomic(nullptr_t) : value_(nullptr) { } #endif - weak_atomic(weak_atomic const& other) : value(other.value) + WeakAtomic(WeakAtomic const& other) : value_(other.value_) { } - weak_atomic(weak_atomic&& other) : value(std::move(other.value)) + WeakAtomic(WeakAtomic&& other) : value_(std::move(other.value_)) { } #ifdef AE_VCPP @@ -301,80 +301,80 @@ public: #ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC template - AE_FORCEINLINE weak_atomic const& operator=(U&& x) + AE_FORCEINLINE WeakAtomic const& operator=(U&& x) { - value = std::forward(x); + value_ = std::forward(x); return *this; } - AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) + AE_FORCEINLINE WeakAtomic const& operator=(WeakAtomic const& other) { - value = other.value; + value_ = other.value_; return *this; } AE_FORCEINLINE T load() const { - return value; + return value_; } - AE_FORCEINLINE T fetch_add_acquire(T increment) + AE_FORCEINLINE T fetchAddAcquire(T increment) { #if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) if (sizeof(T) == 4) - return _InterlockedExchangeAdd((long volatile*)&value, (long)increment); + return _InterlockedExchangeAdd((long volatile*)&value_, (long)increment); #if defined(_M_AMD64) else if (sizeof(T) == 8) - return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment); + return _InterlockedExchangeAdd64((long long volatile*)&value_, (long long)increment); #endif #else #error Unsupported platform #endif assert(false && "T must be either a 32 or 64 bit type"); - return value; + return value_; } - AE_FORCEINLINE T fetch_add_release(T increment) + AE_FORCEINLINE T fetchAddRelease(T increment) { #if defined(AE_ARCH_X64) || defined(AE_ARCH_X86) if (sizeof(T) == 4) - return _InterlockedExchangeAdd((long volatile*)&value, (long)increment); + return _InterlockedExchangeAdd((long volatile*)&value_, (long)increment); #if defined(_M_AMD64) else if (sizeof(T) == 8) - return _InterlockedExchangeAdd64((long long volatile*)&value, (long long)increment); + return _InterlockedExchangeAdd64((long long volatile*)&value_, (long long)increment); #endif #else #error Unsupported platform #endif assert(false && "T must be either a 32 or 64 bit type"); - return value; + return value_; } #else template - AE_FORCEINLINE weak_atomic const& operator=(U&& x) + AE_FORCEINLINE WeakAtomic const& operator=(U&& x) { - value.store(std::forward(x), std::memory_order_relaxed); + value_.store(std::forward(x), std::memory_order_relaxed); return *this; } - AE_FORCEINLINE weak_atomic const& operator=(weak_atomic const& other) + AE_FORCEINLINE WeakAtomic const& operator=(WeakAtomic const& other) { - value.store(other.value.load(std::memory_order_relaxed), std::memory_order_relaxed); + value_.store(other.value_.load(std::memory_order_relaxed), std::memory_order_relaxed); return *this; } AE_FORCEINLINE T load() const { - return value.load(std::memory_order_relaxed); + return value_.load(std::memory_order_relaxed); } - AE_FORCEINLINE T fetch_add_acquire(T increment) + AE_FORCEINLINE T fetchAddAcquire(T increment) { - return value.fetch_add(increment, std::memory_order_acquire); + return value_.fetch_add(increment, std::memory_order_acquire); } - AE_FORCEINLINE T fetch_add_release(T increment) + AE_FORCEINLINE T fetchAddRelease(T increment) { - return value.fetch_add(increment, std::memory_order_release); + return value_.fetch_add(increment, std::memory_order_release); } #endif @@ -382,9 +382,9 @@ private: #ifndef AE_USE_STD_ATOMIC_FOR_WEAK_ATOMIC // No std::atomic support, but still need to circumvent compiler optimizations. // `volatile` will make memory access slow, but is guaranteed to be reliable. - volatile T value; + volatile T value_; #else - std::atomic value; + std::atomic value_; #endif }; @@ -465,13 +465,13 @@ public: WaitForSingleObject(m_hSema, infinite); } - bool try_wait() + bool tryWait() { const unsigned long RC_WAIT_TIMEOUT = 0x00000102; return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT; } - bool timed_wait(std::uint64_t usecs) + bool timedWait(std::uint64_t usecs) { const unsigned long RC_WAIT_TIMEOUT = 0x00000102; return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT; @@ -490,7 +490,7 @@ public: class Semaphore { private: - semaphore_t m_sema; + semaphore_t sema_; Semaphore(const Semaphore& other); Semaphore& operator=(const Semaphore& other); @@ -499,25 +499,25 @@ public: Semaphore(int initialCount = 0) { assert(initialCount >= 0); - semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount); + semaphore_create(mach_task_self(), &sema_, SYNC_POLICY_FIFO, initialCount); } ~Semaphore() { - semaphore_destroy(mach_task_self(), m_sema); + semaphore_destroy(mach_task_self(), sema_); } void wait() { - semaphore_wait(m_sema); + semaphore_wait(sema_); } - bool try_wait() + bool tryWait() { - return timed_wait(0); + return timedWait(0); } - bool timed_wait(std::int64_t timeout_usecs) + bool timedWait(std::int64_t timeout_usecs) { mach_timespec_t ts; ts.tv_sec = timeout_usecs / 1000000; @@ -525,21 +525,21 @@ public: // added in OSX 10.10: // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html - kern_return_t rc = semaphore_timedwait(m_sema, ts); + kern_return_t rc = semaphore_timedwait(sema_, ts); return rc != KERN_OPERATION_TIMED_OUT; } void signal() { - semaphore_signal(m_sema); + semaphore_signal(sema_); } void signal(int count) { while (count-- > 0) { - semaphore_signal(m_sema); + semaphore_signal(sema_); } } }; @@ -550,7 +550,7 @@ public: class Semaphore { private: - sem_t m_sema; + sem_t sema_; Semaphore(const Semaphore& other); Semaphore& operator=(const Semaphore& other); @@ -559,12 +559,12 @@ public: Semaphore(int initialCount = 0) { assert(initialCount >= 0); - sem_init(&m_sema, 0, initialCount); + sem_init(&sema_, 0, initialCount); } ~Semaphore() { - sem_destroy(&m_sema); + sem_destroy(&sema_); } void wait() @@ -573,21 +573,21 @@ public: int rc; do { - rc = sem_wait(&m_sema); + rc = sem_wait(&sema_); } while (rc == -1 && errno == EINTR); } - bool try_wait() + bool tryWait() { int rc; do { - rc = sem_trywait(&m_sema); + rc = sem_trywait(&sema_); } while (rc == -1 && errno == EINTR); return !(rc == -1 && errno == EAGAIN); } - bool timed_wait(std::uint64_t usecs) + bool timedWait(std::uint64_t usecs) { struct timespec ts; const int usecs_in_1_sec = 1000000; @@ -606,21 +606,21 @@ public: int rc; do { - rc = sem_timedwait(&m_sema, &ts); + rc = sem_timedwait(&sema_, &ts); } while (rc == -1 && errno == EINTR); return !(rc == -1 && errno == ETIMEDOUT); } void signal() { - sem_post(&m_sema); + sem_post(&sema_); } void signal(int count) { while (count-- > 0) { - sem_post(&m_sema); + sem_post(&sema_); } } }; @@ -637,34 +637,34 @@ public: typedef std::make_signed::type ssize_t; private: - weak_atomic m_count; - Semaphore m_sema; + WeakAtomic count_; + Semaphore sema_; bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) { - ssize_t oldCount; + ssize_t old_count; // Is there a better way to set the initial spin count? // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC, // as threads start hitting the kernel semaphore. int spin = 10000; while (--spin >= 0) { - if (m_count.load() > 0) + if (count_.load() > 0) { - m_count.fetch_add_acquire(-1); + count_.fetchAddAcquire(-1); return true; } - compiler_fence(memory_order_acquire); // Prevent the compiler from collapsing the loop. + compilerFence(memory_order_acquire); // Prevent the compiler from collapsing the loop. } - oldCount = m_count.fetch_add_acquire(-1); - if (oldCount > 0) + old_count = count_.fetchAddAcquire(-1); + if (old_count > 0) return true; if (timeout_usecs < 0) { - m_sema.wait(); + sema_.wait(); return true; } - if (m_sema.timed_wait(timeout_usecs)) + if (sema_.timedWait(timeout_usecs)) return true; // At this point, we've timed out waiting for the semaphore, but the // count is still decremented indicating we may still be waiting on @@ -673,27 +673,27 @@ private: // need to release the semaphore too. while (true) { - oldCount = m_count.fetch_add_release(1); - if (oldCount < 0) + old_count = count_.fetchAddRelease(1); + if (old_count < 0) return false; // successfully restored things to the way they were // Oh, the producer thread just signaled the semaphore after all. Try again: - oldCount = m_count.fetch_add_acquire(-1); - if (oldCount > 0 && m_sema.try_wait()) + old_count = count_.fetchAddAcquire(-1); + if (old_count > 0 && sema_.tryWait()) return true; } } public: - LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount) + LightweightSemaphore(ssize_t initialCount = 0) : count_(initialCount) { assert(initialCount >= 0); } bool tryWait() { - if (m_count.load() > 0) + if (count_.load() > 0) { - m_count.fetch_add_acquire(-1); + count_.fetchAddAcquire(-1); return true; } return false; @@ -713,17 +713,17 @@ public: void signal(ssize_t count = 1) { assert(count >= 0); - ssize_t oldCount = m_count.fetch_add_release(count); - assert(oldCount >= -1); - if (oldCount < 0) + ssize_t old_count = count_.fetchAddRelease(count); + assert(old_count >= -1); + if (old_count < 0) { - m_sema.signal(1); + sema_.signal(1); } } ssize_t availableApprox() const { - ssize_t count = m_count.load(); + ssize_t count = count_.load(); return count > 0 ? count : 0; } }; diff --git a/ur_rtde_driver/include/ur_rtde_driver/queue/readerwriterqueue.h b/ur_rtde_driver/include/ur_rtde_driver/queue/readerwriterqueue.h index fdfcdf8..e56e444 100644 --- a/ur_rtde_driver/include/ur_rtde_driver/queue/readerwriterqueue.h +++ b/ur_rtde_driver/include/ur_rtde_driver/queue/readerwriterqueue.h @@ -67,7 +67,7 @@ class ReaderWriterQueue // thread owns all the front indices/pointers. Both threads read each // other's variables, but only the owning thread updates them. E.g. After // the consumer reads the producer's tail, the tail may change before the - // consumer is done dequeuing an object, but the consumer knows the tail + // consumer is done dequeuing_ an object, but the consumer knows the tail // will never go backwards, only forwards. // If there is no room to enqueue an object, an additional block (of // equal size to the last block) is added. Blocks are never removed. @@ -77,19 +77,19 @@ public: // allocations. If more than MAX_BLOCK_SIZE elements are requested, // then several blocks of MAX_BLOCK_SIZE each are reserved (including // at least one extra buffer block). - explicit ReaderWriterQueue(size_t maxSize = 15) + explicit ReaderWriterQueue(size_t max_size = 15) #ifndef NDEBUG - : enqueuing(false), dequeuing(false) + : enqueuing_(false), dequeuing_(false) #endif { - assert(maxSize > 0); + assert(max_size > 0); assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2"); assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2"); - Block* firstBlock = nullptr; + Block* first_block = nullptr; - largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block - if (largestBlockSize > MAX_BLOCK_SIZE * 2) + largest_block_size_ = ceilToPow2(max_size + 1); // We need a spare slot to fit maxSize elements in the block + if (largest_block_size_ > MAX_BLOCK_SIZE * 2) { // We need a spare block in case the producer is writing to a different block the consumer is reading from, and // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the @@ -97,12 +97,12 @@ public: // between front == tail meaning "empty" and "full". // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying): - size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1); - largestBlockSize = MAX_BLOCK_SIZE; - Block* lastBlock = nullptr; - for (size_t i = 0; i != initialBlockCount; ++i) + size_t initial_block_count = (max_size + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1); + largest_block_size_ = MAX_BLOCK_SIZE; + Block* last_block = nullptr; + for (size_t i = 0; i != initial_block_count; ++i) { - auto block = make_block(largestBlockSize); + auto block = makeBlock(largest_block_size_); if (block == nullptr) { #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED @@ -111,22 +111,22 @@ public: abort(); #endif } - if (firstBlock == nullptr) + if (first_block == nullptr) { - firstBlock = block; + first_block = block; } else { - lastBlock->next = block; + last_block->next = block; } - lastBlock = block; - block->next = firstBlock; + last_block = block; + block->next = first_block; } } else { - firstBlock = make_block(largestBlockSize); - if (firstBlock == nullptr) + first_block = makeBlock(largest_block_size_); + if (first_block == nullptr) { #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED throw std::bad_alloc(); @@ -134,10 +134,10 @@ public: abort(); #endif } - firstBlock->next = firstBlock; + first_block->next = first_block; } - frontBlock = firstBlock; - tailBlock = firstBlock; + front_block_ = first_block; + tail_block_ = first_block; // Make sure the reader/writer threads will have the initialized memory setup above: fence(memory_order_sync); @@ -151,42 +151,42 @@ public: fence(memory_order_sync); // Destroy any remaining objects in queue and free memory - Block* frontBlock_ = frontBlock; - Block* block = frontBlock_; + Block* front_block = front_block_; + Block* block = front_block; do { - Block* nextBlock = block->next; - size_t blockFront = block->front; - size_t blockTail = block->tail; + Block* next_block = block->next; + size_t block_front = block->front; + size_t block_tail = block->tail; - for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) + for (size_t i = block_front; i != block_tail; i = (i + 1) & block->sizeMask) { auto element = reinterpret_cast(block->data + i * sizeof(T)); element->~T(); (void)element; } - auto rawBlock = block->rawThis; + auto raw_block = block->rawThis; block->~Block(); - std::free(rawBlock); - block = nextBlock; - } while (block != frontBlock_); + std::free(raw_block); + block = next_block; + } while (block != front_block); } // Enqueues a copy of element if there is room in the queue. // Returns true if the element was enqueued, false otherwise. // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T const& element) + AE_FORCEINLINE bool tryEnqueue(T const& element) { - return inner_enqueue(element); + return innerEnqueue(element); } // Enqueues a moved copy of element if there is room in the queue. // Returns true if the element was enqueued, false otherwise. // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T&& element) + AE_FORCEINLINE bool tryEnqueue(T&& element) { - return inner_enqueue(std::forward(element)); + return innerEnqueue(std::forward(element)); } // Enqueues a copy of element on the queue. @@ -194,7 +194,7 @@ public: // Only fails (returns false) if memory allocation fails. AE_FORCEINLINE bool enqueue(T const& element) { - return inner_enqueue(element); + return innerEnqueue(element); } // Enqueues a moved copy of element on the queue. @@ -202,17 +202,17 @@ public: // Only fails (returns false) if memory allocation fails. AE_FORCEINLINE bool enqueue(T&& element) { - return inner_enqueue(std::forward(element)); + return innerEnqueue(std::forward(element)); } // Attempts to dequeue an element; if the queue is empty, // returns false instead. If the queue has at least one element, // moves front to result using operator=, then returns true. template - bool try_dequeue(U& result) + bool tryDequeue(U& result) { #ifndef NDEBUG - ReentrantGuard guard(this->dequeuing); + ReentrantGuard guard(this->dequeuing_); #endif // High-level pseudocode: @@ -232,70 +232,70 @@ public: // then re-read the front block and check if it's not empty again, then check if the tail // block has advanced. - Block* frontBlock_ = frontBlock.load(); - size_t blockTail = frontBlock_->localTail; - size_t blockFront = frontBlock_->front.load(); + Block* front_block = front_block_.load(); + size_t block_tail = front_block->localTail; + size_t block_front = front_block->front.load(); - if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) + if (block_front != block_tail || block_front != (front_block->localTail = front_block->tail.load())) { fence(memory_order_acquire); non_empty_front_block: // Front block not empty, dequeue from here - auto element = reinterpret_cast(frontBlock_->data + blockFront * sizeof(T)); + auto element = reinterpret_cast(front_block->data + block_front * sizeof(T)); result = std::move(*element); element->~T(); - blockFront = (blockFront + 1) & frontBlock_->sizeMask; + block_front = (block_front + 1) & front_block->sizeMask; fence(memory_order_release); - frontBlock_->front = blockFront; + front_block->front = block_front; } - else if (frontBlock_ != tailBlock.load()) + else if (front_block != tail_block_.load()) { fence(memory_order_acquire); - frontBlock_ = frontBlock.load(); - blockTail = frontBlock_->localTail = frontBlock_->tail.load(); - blockFront = frontBlock_->front.load(); + front_block = front_block_.load(); + block_tail = front_block->localTail = front_block->tail.load(); + block_front = front_block->front.load(); fence(memory_order_acquire); - if (blockFront != blockTail) + if (block_front != block_tail) { // Oh look, the front block isn't empty after all goto non_empty_front_block; } // Front block is empty but there's another block ahead, advance to it - Block* nextBlock = frontBlock_->next; - // Don't need an acquire fence here since next can only ever be set on the tailBlock, - // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which - // ensures next is up-to-date on this CPU in case we recently were at tailBlock. + Block* next_block = front_block->next; + // Don't need an acquire fence here since next can only ever be set on the tail_block, + // and we're not the tail_block, and we did an acquire earlier after reading tail_block which + // ensures next is up-to-date on this CPU in case we recently were at tail_block. - size_t nextBlockFront = nextBlock->front.load(); - size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load(); + size_t next_block_front = next_block->front.load(); + size_t next_block_tail = next_block->localTail = next_block->tail.load(); fence(memory_order_acquire); - // Since the tailBlock is only ever advanced after being written to, + // Since the tail_block is only ever advanced after being written to, // we know there's for sure an element to dequeue on it - assert(nextBlockFront != nextBlockTail); - AE_UNUSED(nextBlockTail); + assert(next_block_front != next_block_tail); + AE_UNUSED(next_block_tail); // We're done with this block, let the producer use it if it needs - fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue - frontBlock = frontBlock_ = nextBlock; + fence(memory_order_release); // Expose possibly pending changes to front_block->front from last dequeue + front_block_ = front_block = next_block; - compiler_fence(memory_order_release); // Not strictly needed + compilerFence(memory_order_release); // Not strictly needed - auto element = reinterpret_cast(frontBlock_->data + nextBlockFront * sizeof(T)); + auto element = reinterpret_cast(front_block->data + next_block_front * sizeof(T)); result = std::move(*element); element->~T(); - nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask; + next_block_front = (next_block_front + 1) & front_block->sizeMask; fence(memory_order_release); - frontBlock_->front = nextBlockFront; + front_block->front = next_block_front; } else { @@ -307,47 +307,47 @@ public: } // Returns a pointer to the front element in the queue (the one that - // would be removed next by a call to `try_dequeue` or `pop`). If the + // would be removed next by a call to `tryDequeue` or `pop`). If the // queue appears empty at the time the method is called, nullptr is // returned instead. // Must be called only from the consumer thread. T* peek() { #ifndef NDEBUG - ReentrantGuard guard(this->dequeuing); + ReentrantGuard guard(this->dequeuing_); #endif - // See try_dequeue() for reasoning + // See tryDequeue() for reasoning - Block* frontBlock_ = frontBlock.load(); - size_t blockTail = frontBlock_->localTail; - size_t blockFront = frontBlock_->front.load(); + Block* front_block = front_block_.load(); + size_t block_tail = front_block->localTail; + size_t block_front = front_block->front.load(); - if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) + if (block_front != block_tail || block_front != (front_block->localTail = front_block->tail.load())) { fence(memory_order_acquire); non_empty_front_block: - return reinterpret_cast(frontBlock_->data + blockFront * sizeof(T)); + return reinterpret_cast(front_block->data + block_front * sizeof(T)); } - else if (frontBlock_ != tailBlock.load()) + else if (front_block != tail_block_.load()) { fence(memory_order_acquire); - frontBlock_ = frontBlock.load(); - blockTail = frontBlock_->localTail = frontBlock_->tail.load(); - blockFront = frontBlock_->front.load(); + front_block = front_block_.load(); + block_tail = front_block->localTail = front_block->tail.load(); + block_front = front_block->front.load(); fence(memory_order_acquire); - if (blockFront != blockTail) + if (block_front != block_tail) { goto non_empty_front_block; } - Block* nextBlock = frontBlock_->next; + Block* next_block = front_block->next; - size_t nextBlockFront = nextBlock->front.load(); + size_t next_block_front = next_block->front.load(); fence(memory_order_acquire); - assert(nextBlockFront != nextBlock->tail.load()); - return reinterpret_cast(nextBlock->data + nextBlockFront * sizeof(T)); + assert(next_block_front != next_block->tail.load()); + return reinterpret_cast(next_block->data + next_block_front * sizeof(T)); } return nullptr; @@ -359,62 +359,62 @@ public: bool pop() { #ifndef NDEBUG - ReentrantGuard guard(this->dequeuing); + ReentrantGuard guard(this->dequeuing_); #endif - // See try_dequeue() for reasoning + // See tryDequeue() for reasoning - Block* frontBlock_ = frontBlock.load(); - size_t blockTail = frontBlock_->localTail; - size_t blockFront = frontBlock_->front.load(); + Block* front_block = front_block_.load(); + size_t block_tail = front_block->localTail; + size_t block_front = front_block->front.load(); - if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) + if (block_front != block_tail || block_front != (front_block->localTail = front_block->tail.load())) { fence(memory_order_acquire); non_empty_front_block: - auto element = reinterpret_cast(frontBlock_->data + blockFront * sizeof(T)); + auto element = reinterpret_cast(front_block->data + block_front * sizeof(T)); element->~T(); - blockFront = (blockFront + 1) & frontBlock_->sizeMask; + block_front = (block_front + 1) & front_block->sizeMask; fence(memory_order_release); - frontBlock_->front = blockFront; + front_block->front = block_front; } - else if (frontBlock_ != tailBlock.load()) + else if (front_block != tail_block_.load()) { fence(memory_order_acquire); - frontBlock_ = frontBlock.load(); - blockTail = frontBlock_->localTail = frontBlock_->tail.load(); - blockFront = frontBlock_->front.load(); + front_block = front_block_.load(); + block_tail = front_block->localTail = front_block->tail.load(); + block_front = front_block->front.load(); fence(memory_order_acquire); - if (blockFront != blockTail) + if (block_front != block_tail) { goto non_empty_front_block; } // Front block is empty but there's another block ahead, advance to it - Block* nextBlock = frontBlock_->next; + Block* next_block = front_block->next; - size_t nextBlockFront = nextBlock->front.load(); - size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load(); + size_t next_block_front = next_block->front.load(); + size_t next_block_tail = next_block->localTail = next_block->tail.load(); fence(memory_order_acquire); - assert(nextBlockFront != nextBlockTail); - AE_UNUSED(nextBlockTail); + assert(next_block_front != next_block_tail); + AE_UNUSED(next_block_tail); fence(memory_order_release); - frontBlock = frontBlock_ = nextBlock; + front_block_ = front_block = next_block; - compiler_fence(memory_order_release); + compilerFence(memory_order_release); - auto element = reinterpret_cast(frontBlock_->data + nextBlockFront * sizeof(T)); + auto element = reinterpret_cast(front_block->data + next_block_front * sizeof(T)); element->~T(); - nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask; + next_block_front = (next_block_front + 1) & front_block->sizeMask; fence(memory_order_release); - frontBlock_->front = nextBlockFront; + front_block->front = next_block_front; } else { @@ -427,19 +427,19 @@ public: // Returns the approximate number of items currently in the queue. // Safe to call from both the producer and consumer threads. - inline size_t size_approx() const + inline size_t sizeApprox() const { size_t result = 0; - Block* frontBlock_ = frontBlock.load(); - Block* block = frontBlock_; + Block* front_block = front_block_.load(); + Block* block = front_block; do { fence(memory_order_acquire); - size_t blockFront = block->front.load(); - size_t blockTail = block->tail.load(); - result += (blockTail - blockFront) & block->sizeMask; + size_t block_front = block->front.load(); + size_t block_tail = block->tail.load(); + result += (block_tail - block_front) & block->sizeMask; block = block->next.load(); - } while (block != frontBlock_); + } while (block != front_block); return result; } @@ -451,10 +451,10 @@ private: }; template - bool inner_enqueue(U&& element) + bool innerEnqueue(U&& element) { #ifndef NDEBUG - ReentrantGuard guard(this->enqueuing); + ReentrantGuard guard(this->enqueuing_); #endif // High-level pseudocode (assuming we're allowed to alloc a new block): @@ -464,80 +464,80 @@ private: // Else create a new block and enqueue there // Advance tail to the block we just enqueued to - Block* tailBlock_ = tailBlock.load(); - size_t blockFront = tailBlock_->localFront; - size_t blockTail = tailBlock_->tail.load(); + Block* tail_block = tail_block_.load(); + size_t block_front = tail_block->localFront; + size_t block_tail = tail_block->tail.load(); - size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask; - if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) + size_t next_block_tail = (block_tail + 1) & tail_block->sizeMask; + if (next_block_tail != block_front || next_block_tail != (tail_block->localFront = tail_block->front.load())) { fence(memory_order_acquire); // This block has room for at least one more element - char* location = tailBlock_->data + blockTail * sizeof(T); + char* location = tail_block->data + block_tail * sizeof(T); new (location) T(std::forward(element)); fence(memory_order_release); - tailBlock_->tail = nextBlockTail; + tail_block->tail = next_block_tail; } else { fence(memory_order_acquire); - if (tailBlock_->next.load() != frontBlock) + if (tail_block->next.load() != front_block_) { - // Note that the reason we can't advance to the frontBlock and start adding new entries there + // Note that the reason we can't advance to the front_block and start adding new entries there // is because if we did, then dequeue would stay in that block, eventually reading the new values, // instead of advancing to the next full block (whose values were enqueued first and so should be // consumed first). - fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock + fence(memory_order_acquire); // Ensure we get latest writes if we got the latest front_block - // tailBlock is full, but there's a free block ahead, use it - Block* tailBlockNext = tailBlock_->next.load(); - size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load(); - nextBlockTail = tailBlockNext->tail.load(); + // tail_block is full, but there's a free block ahead, use it + Block* tail_block_next = tail_block->next.load(); + size_t next_block_front = tail_block_next->localFront = tail_block_next->front.load(); + next_block_tail = tail_block_next->tail.load(); fence(memory_order_acquire); // This block must be empty since it's not the head block and we // go through the blocks in a circle - assert(nextBlockFront == nextBlockTail); - tailBlockNext->localFront = nextBlockFront; + assert(next_block_front == next_block_tail); + tail_block_next->localFront = next_block_front; - char* location = tailBlockNext->data + nextBlockTail * sizeof(T); + char* location = tail_block_next->data + next_block_tail * sizeof(T); new (location) T(std::forward(element)); - tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask; + tail_block_next->tail = (next_block_tail + 1) & tail_block_next->sizeMask; fence(memory_order_release); - tailBlock = tailBlockNext; + tail_block_ = tail_block_next; } else if (canAlloc == CanAlloc) { - // tailBlock is full and there's no free block ahead; create a new block - auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2; - auto newBlock = make_block(newBlockSize); - if (newBlock == nullptr) + // tail_block is full and there's no free block ahead; create a new block + auto new_block_size = largest_block_size_ >= MAX_BLOCK_SIZE ? largest_block_size_ : largest_block_size_ * 2; + auto new_block = makeBlock(new_block_size); + if (new_block == nullptr) { // Could not allocate a block! return false; } - largestBlockSize = newBlockSize; + largest_block_size_ = new_block_size; - new (newBlock->data) T(std::forward(element)); + new (new_block->data) T(std::forward(element)); - assert(newBlock->front == 0); - newBlock->tail = newBlock->localTail = 1; + assert(new_block->front == 0); + new_block->tail = new_block->localTail = 1; - newBlock->next = tailBlock_->next.load(); - tailBlock_->next = newBlock; + new_block->next = tail_block->next.load(); + tail_block->next = new_block; - // Might be possible for the dequeue thread to see the new tailBlock->next - // *without* seeing the new tailBlock value, but this is OK since it can't - // advance to the next block until tailBlock is set anyway (because the only - // case where it could try to read the next is if it's already at the tailBlock, - // and it won't advance past tailBlock in any circumstance). + // Might be possible for the dequeue thread to see the new tail_block->next + // *without* seeing the new tail_block value, but this is OK since it can't + // advance to the next block until tail_block is set anyway (because the only + // case where it could try to read the next is if it's already at the tail_block, + // and it won't advance past tail_block in any circumstance). fence(memory_order_release); - tailBlock = newBlock; + tail_block_ = new_block; } else if (canAlloc == CannotAlloc) { @@ -580,7 +580,7 @@ private: } template - static AE_FORCEINLINE char* align_for(char* ptr) + static AE_FORCEINLINE char* alignFor(char* ptr) { const std::size_t alignment = std::alignment_of::value; return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment; @@ -590,37 +590,38 @@ private: #ifndef NDEBUG struct ReentrantGuard { - ReentrantGuard(bool& _inSection) : inSection(_inSection) + ReentrantGuard(bool& in_section) : in_section_(in_section) { - assert(!inSection && "ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' " - "ctors and dtors"); - inSection = true; + assert(!in_section_ && "ReaderWriterQueue does not support enqueuing_ or dequeuing_ elements from other " + "elements' " + "ctors and dtors"); + in_section_ = true; } ~ReentrantGuard() { - inSection = false; + in_section_ = false; } private: ReentrantGuard& operator=(ReentrantGuard const&); private: - bool& inSection; + bool& in_section_; }; #endif struct Block { // Avoid false-sharing by putting highly contended variables on their own cache lines - weak_atomic front; // (Atomic) Elements are read from here - size_t localTail; // An uncontended shadow copy of tail, owned by the consumer + WeakAtomic front; // (Atomic) Elements are read from here + size_t localTail; // An uncontended shadow copy of tail, owned by the consumer - char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic) - sizeof(size_t)]; - weak_atomic tail; // (Atomic) Elements are enqueued here + char cachelineFiller0_[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(WeakAtomic) - sizeof(size_t)]; + WeakAtomic tail; // (Atomic) Elements are enqueued here size_t localFront; - char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic) - sizeof(size_t)]; // next isn't + char cachelineFiller1_[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(WeakAtomic) - sizeof(size_t)]; // next isn't // very // contended, but // we don't want @@ -628,7 +629,7 @@ private: // cache line as // tail (which // is) - weak_atomic next; // (Atomic) + WeakAtomic next; // (Atomic) char* data; // Contents (on heap) are aligned to T's alignment @@ -655,33 +656,33 @@ private: char* rawThis; }; - static Block* make_block(size_t capacity) + static Block* makeBlock(size_t capacity) { // Allocate enough memory for the block itself, as well as all the elements it will contain auto size = sizeof(Block) + std::alignment_of::value - 1; size += sizeof(T) * capacity + std::alignment_of::value - 1; - auto newBlockRaw = static_cast(std::malloc(size)); - if (newBlockRaw == nullptr) + auto new_block_raw = static_cast(std::malloc(size)); + if (new_block_raw == nullptr) { return nullptr; } - auto newBlockAligned = align_for(newBlockRaw); - auto newBlockData = align_for(newBlockAligned + sizeof(Block)); - return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData); + auto new_block_aligned = alignFor(new_block_raw); + auto new_block_data = alignFor(new_block_aligned + sizeof(Block)); + return new (new_block_aligned) Block(capacity, new_block_raw, new_block_data); } private: - weak_atomic frontBlock; // (Atomic) Elements are enqueued to this block + WeakAtomic front_block_; // (Atomic) Elements are enqueued to this block - char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic)]; - weak_atomic tailBlock; // (Atomic) Elements are dequeued from this block + char cachelineFiller_[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(WeakAtomic)]; + WeakAtomic tail_block_; // (Atomic) Elements are dequeued from this block - size_t largestBlockSize; + size_t largest_block_size_; #ifndef NDEBUG - bool enqueuing; - bool dequeuing; + bool enqueuing_; + bool dequeuing_; #endif }; @@ -693,18 +694,18 @@ private: typedef ::moodycamel::ReaderWriterQueue ReaderWriterQueue; public: - explicit BlockingReaderWriterQueue(size_t maxSize = 15) : inner(maxSize) + explicit BlockingReaderWriterQueue(size_t maxSize = 15) : inner_(maxSize) { } // Enqueues a copy of element if there is room in the queue. // Returns true if the element was enqueued, false otherwise. // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T const& element) + AE_FORCEINLINE bool tryEnqueue(T const& element) { - if (inner.try_enqueue(element)) + if (inner_.tryEnqueue(element)) { - sema.signal(); + sema_.signal(); return true; } return false; @@ -713,11 +714,11 @@ public: // Enqueues a moved copy of element if there is room in the queue. // Returns true if the element was enqueued, false otherwise. // Does not allocate memory. - AE_FORCEINLINE bool try_enqueue(T&& element) + AE_FORCEINLINE bool tryEnqueue(T&& element) { - if (inner.try_enqueue(std::forward(element))) + if (inner_.tryEnqueue(std::forward(element))) { - sema.signal(); + sema_.signal(); return true; } return false; @@ -728,9 +729,9 @@ public: // Only fails (returns false) if memory allocation fails. AE_FORCEINLINE bool enqueue(T const& element) { - if (inner.enqueue(element)) + if (inner_.enqueue(element)) { - sema.signal(); + sema_.signal(); return true; } return false; @@ -741,9 +742,9 @@ public: // Only fails (returns false) if memory allocation fails. AE_FORCEINLINE bool enqueue(T&& element) { - if (inner.enqueue(std::forward(element))) + if (inner_.enqueue(std::forward(element))) { - sema.signal(); + sema_.signal(); return true; } return false; @@ -753,11 +754,11 @@ public: // returns false instead. If the queue has at least one element, // moves front to result using operator=, then returns true. template - bool try_dequeue(U& result) + bool tryDequeue(U& result) { - if (sema.tryWait()) + if (sema_.tryWait()) { - bool success = inner.try_dequeue(result); + bool success = inner_.tryDequeue(result); assert(success); AE_UNUSED(success); return true; @@ -768,10 +769,10 @@ public: // Attempts to dequeue an element; if the queue is empty, // waits until an element is available, then dequeues it. template - void wait_dequeue(U& result) + void waitDequeue(U& result) { - sema.wait(); - bool success = inner.try_dequeue(result); + sema_.wait(); + bool success = inner_.tryDequeue(result); AE_UNUSED(result); assert(success); AE_UNUSED(success); @@ -782,15 +783,15 @@ public: // then dequeues it and returns true, or returns false if the timeout // expires before an element can be dequeued. // Using a negative timeout indicates an indefinite timeout, - // and is thus functionally equivalent to calling wait_dequeue. + // and is thus functionally equivalent to calling waitDequeue. template - bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs) + bool waitDequeTimed(U& result, std::int64_t timeout_usecs) { - if (!sema.wait(timeout_usecs)) + if (!sema_.wait(timeout_usecs)) { return false; } - bool success = inner.try_dequeue(result); + bool success = inner_.tryDequeue(result); AE_UNUSED(result); assert(success); AE_UNUSED(success); @@ -803,22 +804,22 @@ public: // then dequeues it and returns true, or returns false if the timeout // expires before an element can be dequeued. // Using a negative timeout indicates an indefinite timeout, - // and is thus functionally equivalent to calling wait_dequeue. + // and is thus functionally equivalent to calling waitDequeue. template - inline bool wait_dequeue_timed(U& result, std::chrono::duration const& timeout) + inline bool waitDequeTimed(U& result, std::chrono::duration const& timeout) { - return wait_dequeue_timed(result, std::chrono::duration_cast(timeout).count()); + return waitDequeTimed(result, std::chrono::duration_cast(timeout).count()); } #endif // Returns a pointer to the front element in the queue (the one that - // would be removed next by a call to `try_dequeue` or `pop`). If the + // would be removed next by a call to `tryDequeue` or `pop`). If the // queue appears empty at the time the method is called, nullptr is // returned instead. // Must be called only from the consumer thread. AE_FORCEINLINE T* peek() { - return inner.peek(); + return inner_.peek(); } // Removes the front element from the queue, if any, without returning it. @@ -826,9 +827,9 @@ public: // `pop` was called. AE_FORCEINLINE bool pop() { - if (sema.tryWait()) + if (sema_.tryWait()) { - bool result = inner.pop(); + bool result = inner_.pop(); assert(result); AE_UNUSED(result); return true; @@ -838,9 +839,9 @@ public: // Returns the approximate number of items currently in the queue. // Safe to call from both the producer and consumer threads. - AE_FORCEINLINE size_t size_approx() const + AE_FORCEINLINE size_t sizeApprox() const { - return sema.availableApprox(); + return sema_.availableApprox(); } private: @@ -853,8 +854,8 @@ private: } private: - ReaderWriterQueue inner; - spsc_sema::LightweightSemaphore sema; + ReaderWriterQueue inner_; + spsc_sema::LightweightSemaphore sema_; }; } // end namespace moodycamel