Thursday, May 28, 2015

Lockless Queues

Last time, we implemented a lock-less stack.  Let's move on to a slightly more useful structures, a fifo queue.  But first, we need to talk about how lock-less these data structures are.

Producer consumer counts
When talking about lock-less data structures for transferring data between threads, the number of producers and consumers expected to use the data structure at the same time.  So far, the lock-less stacks have been a multi-producer, multi-consumer setup, which means they've supported multiple threads pushing and popping data to the stacks at the same time on different threads.

That was managed relatively easy, but when talking about fifo queues, we're going to start with a weaker requirement to make things easier initially - single producer, single consumer.  This means we expect to only be pushed to by one thread, and pop from by another thread.

SPSCQueue - single producer, single consumer queue
Starting this queue, we're going to focus on a fixed sized queue.
The basic design in use here is, when pushing, we write out data to an internal buffer, then increment our write offset.  When poping, we make sure our read offset is different than our write offset, and if it is, we read the value and then increment our read offset.

The code looks like this:

template <class T>
class SPSCQueue
{
public:
 SPSCQueue(uint32_t _powerOfTwo)
 {
  m_count = 1 << _powerOfTwo;
  m_data = new T*[m_count];
  m_count-=1;
 }

 ~SPSCQueue()
 {
  delete[] m_data;
 }

 bool Push(T* _value)
 {
  uint32_t location = m_writerLoc;
  if (m_readerLoc + m_count < location)
  {
   return false;
  }

  m_data[location & m_count] = _value;
  m_writerLoc++;
  return true;
 }

 T* Pop()
 {
  uint32_t location = m_readerLoc;
  if (location == m_writerLoc)
  {
   return nullptr;
  }

  T* result = m_data[location & m_count];
  m_readerLoc = location + 1;
  return result;
 }

private:
 T** m_data;
 uint32_t m_count;
 std::atomic<uint32_t> m_readerLoc;
 std::atomic<uint32_t> m_writerLoc;
}; 

A note on the count:  I require a power of 2 size to avoid expensive divides, so I have the user pass in the power of two desired.  Also, Push can fail!  Be sure to handle that situation.  If I expect it to not fail, I'll use a verify macro, which includes an assert in debug builds to verify the results.

Do we need to use atomics here?
If you notice, we won't have multi threaded contention with how our code is written.  We write the value to m_data, and then increment the writer location.  When reading, if the reader and writer locations are different, there must be a value in the m_data where we were reading from, right?

Right?

 

Memory access ordering
So there are two ways this would fail without using the C++ atomics as we're using them.

Both of the ways are due to memory access ordering.  For C++11 (and ARM and PPC), all memory accesses will appear from a single thread's perspective as consistent.  Specifically, if you write to an address, and then read from that address on the same thread, you'll always get the value you wrote.

But when dealing with multiple threads, we can run into issues.  For example, with C++11 memory model, the optimizer is allowed to re-order those writes.  For our case, if we weren't protecting the m_writerLoc with atomics, the optimizer could change the code so that m_writerLoc gets written to before m_data gets written to, which will introduce a rare double pop, but only in optimized builds.

On CPUs with weak memory models(ARM, PPC and others) we end up with the same double pop issue, even on debug builds.  This is because the CPU will queue up reads and writes to complete once their cache line is available.  So even with a deoptimized build, the CPU can execute the write of value to m_data array, and then execute the m_writerLoc modify.  However, if the address of m_writerLoc is already in cache, but m_data isn't, the m_writerLoc will finalize before m_data is, resulting in the same double-pop issue possibly showing up.


Platform specific fixes

When dealing with the optimizer issue, we need to insert a compiler barrier.  This makes it so that the optimizer cannot move memory operations across where that barrier was.  This fixes the issue where the optimizer could rearrange the writes.

When dealing with the CPU issue, these platforms will include specialized instructions to ensure specific memory operations complete in the right order.  There are typically two notable ordering instructions that are typically made available, one to drain the read buffer, one to drain the read and write buffers.  The compiler should recognize the intrinsic version of the instruction as a compiler barrier as well.  With this, you'd write the pointer out, execute the write sync instruction, and then update the offset.


C++11 atomic memory models

The C++11 atomic system allows you to specify a memory access order for the atomics.  The important orders are as follows:
  • Relaxed - No memory ordering is expected, we only want the atomicness of this action.
  • Acquire - No memory operations may appear to be reordered before this load.
  • Release - No memory operations may appear to be reordered to after this store.
  • Acquire/release - Used for read-modify-write operations, no memory operations may appear to be reordered around this atomic operation assuming it succeeded.
  • Sequentially Consistent - The default access, similar to acquire/release, may impose additional restrictions.
So we could use these orders to optimize our SPSC queue.  In our Push, we could replace
m_writerLoc = location + 1;
with
m_writerLoc.fetch_add(1, std::memory_order_release);

and then in pop, replace
uint32_t location = m_readerLoc;
with
uint32_t location = m_readerLoc.load(std::memory_order_consume);

This probably won't affect how the code gets generated for x86 platforms, due to the strong memory model that x86 implements, however both arm and ppc should generate better code.

Next time we'll modify the code to support multiple consumers, and multiple producers.

Saturday, May 2, 2015

Non-intrusive lockless stack


Issues with Intrusive Lock-less structures
 So last time, we implemented an intrusive lock-less stack.  But there's some pretty strong limitations when it comes to intrusive lock-less structures.  Primarily, objects cannot be deleted so long as the lock-less structure is in use.  There's a large set of situations where this can be safe - specifically systems that maintain a pool of objects that can be popped from and pushed to from multiple threads.

But let's implement a more general case lock-less stack that doesn't have this weakness.

Fixed size Lock-less stack
 
template <class T>
class FixedSizeLocklessStack
{
 public:
 FixedSizeLocklessStack(uint32_t count)
 {
  m_buffer = new StackData[count];
  for (uint32_t i = 0; i < count; ++i)
  {
   m_freeSlots.Push(&m_buffer[i]);
  } 
 }

 ~FixedSizeLocklessStack()
 {
  delete[] m_buffer; 
 }

 bool Push(T* _value)
 {
   StackData* slot = m_freeSlots.Pop();
   if (!slot)
   {
    return false; 
   }
   slot->m_pointer = _value;
   m_valueStack.Push(slot);
 }

 T* Pop()
 {
   StackData* value = m_valueStack.Pop();
   if (!value)
   {
    return nullptr;
   }
   T* retValue = value->pointer;
   m_freeStack.Push(value);
   return retValue;
 }

private:
 struct StackData
 {
  std::atomic<StackData*>& GetIntrusiveMember() { return intrusiveValue; }
  std::atomic<StackData*> intrusiveValue;
  T* pointer;
 };
 LocklessStack<StackData> m_freeSlots;
 LocklessStack<StackData> m_valueStack;
 StackData* m_buffer;
};
So this is a relatively simple extension on top of the base Lockless Stack.  We have and maintain the data used to point to the data being pushed and popped.  This means the data being pushed in can be managed without need of worrying about if other threads are accessing the data.

Performance of this stack may be improved by forcing the StackData to be cached aligned.  This way each thread can avoid stomping on each other's caches.

But why fixed sized?
This is implemented as a fixed sized stack, but the internal stack data being managed acts as a pointer, so surely we could just allocate a new slot instead of returning false when pushing, right?

Problem with that though, is we lose the Lock-free guarantee.  This happens for any memory allocation.  Nearly all allocators that support multiple threads require locks.  Even the allocators that don't use locks(such as tcmalloc or a lock-free allocator) frequently have locks.  Due to this, when working with lock-less structures, I intentionally avoid automatic memory management.

But, expanding this to a expandable lock-less stack is straight forward.

Expandable lock-less stack

The main change is rather than allocate a single large buffer, we allocate a bunch of small entries, and when pushing, if we fail to pop, allocate a new element there as well.  Then on delete, delete all of the allocated entries.

template<class T>
class ExpandableLocklessStack
{
public:
 ExpandableLocklessStack(uint32_t _count)
 {
  for (uint32_t i = 0; i < _count; ++i)
  {
    m_freeSlots.Push(new StackData());
  }
 }

 ~ExpandableLocklessStack()
 {
  StackData* data; 
  while (data = m_freeSlots.Pop())
  {
   delete data;
  }
  while (data = m_valueStack.Pop())
  {
   delete data; 
  }
 }

 void Push(T* _data)
 {
  StackData* value = m_freeSlots.Pop();
  if (!value)
  {
   value = new StackData(); 
  }
  slot->m_pointer = _value;
  m_valueStack.Push(slot);
 }

 T* Pop()
 {
  StackData* value = m_valueStack.Pop();
  if (!value)
  {
   return nullptr;
  }
  T* retValue = value->pointer;
  m_freeStack.Push(value);
  return retValue;
 }

private:
 struct StackData
 {
  std::atomic<StackData*>& GetIntrusiveMember() { return intrusiveValue; }
  std::atomic<StackData*> intrusiveValue;
  T* pointer; 
 };
 LocklessStack<StackData> m_freeSlots;
 LocklessStack<StackData> m_valueStack;
};

Load-Link/Store-Conditional Lock-less Stack
Last time, I indicated that a Load-Link/Store-Conditional doesn't need tagging.  Let's take a quick look at what this might look like.

I'll be using a made up namespace called platform.  In it are 3 important functions:

//Loads a value from a source location using the load link instruction
template <class T>
T* LoadLink(T** src);

//Stores the value to destination, but only if no other core has written to the cache line the destination is in.
template <class T>
bool StoreConditional(T** dest, T* value);

//Writes a value, and then ensures that it will be visible to other cores before continuing execution.  Also acts as a compiler barrier.
//This is needed due to the platforms that support these instructions also tend to have weaker memory models, which would require synchronization here.
template <class T>
void WriteAndSync(T** dest, T* value);

There is also an important macro, called CACHE_ALIGN.  This adds compiler specific instructions to ensure the member is aligned to a cache size.  This does increase the size of this structure from the size of a pointer to a cache line size.  This is important though, because if the structure we're pushing ends up in the same cache line as the stack, we would never be able push that value to the stack.

struct LocklessTester
{
 LocklessTester*& GetIntrusiveMember() { return intrusiveValue; }
 LocklessTester* intrusiveValue;
 uint32_t a;
 uint32_t b;
};

template <class T>
class LocklessStack
{
public:
 LocklessStack() : m_head(nullptr) {} 
 void Push(T* _value)
 {
  while (true)
  {
   T* head = platform::LoadLink(&m_head);
   platform::WriteAndSync(&_value->GetIntrusiveMember(), head);
   if (platform::StoreConditional(&m_head, _value))
   {
    return;
   }
  }
 }
 T* Pop()
 {
  while (true)
  {
   T* head = platform::LoadLink(&m_head);
   if (!head)
   {
    return nullptr;
   } 
   T* nextHead = head->GetIntrusiveMember();
   if (platform::StoreConditional(&m_head, nextHead))
   {
    return head;
   }
  }
 }
private:
 CACHE_ALIGN(T*, m_head);
}; 
 
With this, we've mainly gone back to our original setup, however now we can be confident that our stack will be completely safe from ABA, not just partly, because we know that no one wrote to the cache line that contains our m_head value after we loaded from it.

Please note, this is currently untested due to my not having setup the Android NDK for testing the ARM version of this.