Monday, June 8, 2015

Multi Producer, Multi Consumer queues

Last time we implemented a Single Producer, Single Consumer queue.  Now we'll look at expanding that to multiple producers and multiple consumers.

Why does our previous example fail with Multiple Producers and consumers?
So the old code fails with multiple consumers.  But why?  Let's look at what would happen with multiple consumers:
Assuming we have 2 bits of data to consume, with both thread A and B trying to pop that data at the same time, it's possible that they both get the same location.  With them both reading the same location, the get a double pop.  We also have problems dealing with the increment as well - we could have a thread get paused after reading the location, another thread pop off several values, then the original thread resumes, setting the read location back.

Producers have similar issues of losing writes and possibly destroying ordering of the data.

Dealing with Multiple Consumers
So how do we deal with this?  Well, first we need to make sure only one consumer gets the item.  This requires us to first convert the T* array to an std::atomic<T*>.  Then when you go to pop an entry, you read the value, and then compare and swap that value for a nullptr.  Then if that succeeded, you know you successfully read the value and own the result.  Then, increment the reader location, and return what you found.  Looks like this:

 T* Pop()
 {
  do
  {
   uint32_t location = m_readerLoc;
   if (location == m_writerLoc)
   {
    return nullptr;
   }

   T* found = m_data[location & m_count];
   if (m_data[location & m_count].compare_exchange_strong(found, nullptr))
   {
    m_readerLoc++;
    return found;
   }
  } while (true);
 }

So, that works, right?

Accidental locks
The code as presented, accidentally introduces an accidental lock.  The situation happens if a thread succeeds to use the compare_exchange, but then gets paused before executing the increment.  Until that thread resumes execution, all other threads are blocked.  In general, any time you have a state update based on the result of an atomic compare, you have to be careful about running into that situation.

So how do we fix this?  What I do is I make it so that we increment the location independent of if we succeed or fail.  Like this:

 T* Pop()
 {
  do
  {
   uint32_t location = m_readerLoc;
   if (location == m_writerLoc)
   {
    return nullptr;
   }

   T* found = m_data[location & m_count];
   bool success = false;
   if (found)
   {
    success = m_data[location & m_count].compare_exchange_strong(found, nullptr);
   }
   m_readerLoc.compare_exchange_strong(location, location + 1);
   if (success)
   {
    return found;
   }
  } while (true);
 }

With this, if we have already read the value at location, or if someone else read the value, we'll try to go to the next slot.  Also note, this isn't a blind increment, due to possibly having many readers failing to read the value - we only want one of them to increment the results.

For the Push side of things, it looks like this:
 
 bool Push(T* _value)
 {
  do
  {
   uint32_t location = m_writerLoc;
   if (m_readerLoc + m_count < location)
   {
    return false;
   }

   T* found = nullptr;
   bool success = m_data[location & m_count].compare_exchange_strong(found, _value);
   m_writerLoc.compare_exchange_strong(location, location + 1);

   if (success)
   {
    return true;
   }
  } while (true);
 }

So now we have defeated the rare lock, the code appears to work, so everything's good, yes?

ABA problem part 2
We've got an ABA problem, and it's kind of a weird one.  I originally used this setup for a while as the job queue in my personal engine, and it worked until I started generating many jobs at the same time from several threads as I was consuming.

The problem shows up when you have three or more threads all accessing the queue at the same time.  Specifically, thread A and B are both pushing to the queue.  Thread C is poping.

If thread A reads from m_writerLoc, and then gets paused.  Thread B then performs a push operation to completion.  Thread C then pops the value and returns.  Thread A then resumes.  It sees the nullptr, and thinks it's safe to push.  However we've already passed that point as a valid write location, so we've lost the push.

So what we need to do, is rather than push nullptr, we need to push an ever changing value.  So what value should we push?  What I normally opt to do is to push a the current location, but modified by shifting by one and oring a 1 in place. and set the lowest bit so we know it's a value that's not something that was pushed.  We'll also want to replace the std::atomic<T*> array with an std::atomic<uintptr_t> array, since we'll be accessing the data as a numeric value. 

So, when poping, we need to read the value, make sure the lowest bit isn't set, and if it isn't(which indicates this is a pointer we pushed), compare and swap with our special key, and return the value we read if that succeeded, after incrementing the offset.

The code looks like this:

 
 T* Pop()
 {
  do
  {
   uint32_t location = m_readerLoc;
   if (location == m_writerLoc)
   {
    return nullptr;
   }

   uintptr_t found = m_data[location & m_count];
   uintptr_t target = (uintptr_t(location) << 1) | 1;
   bool success = false;
   if ((found & 1) == 0)
   {
    success = m_data[location & m_count].compare_exchange_strong(found, target);
   }

   m_readerLoc.compare_exchange_strong(location, location + 1);
   if (success)
   {
    return (T*)found;
   }
  } while (true);
 }

When it comes to pushing, the important thing is figuring out what the value we expect to read is.  When pushing, rather than setting found to a nullptr, we need to construct the location we would have wrote when we last popped.  That looks like this:

 bool Push(T* _value)
 {
  do
  {
   uint32_t location = m_writerLoc;
   if (m_readerLoc + m_count < location)
   {
    return false;
   }

   uintptr_t found = ((uintptr_t(location) - (m_count + 1)) << 1) | 1;
   bool success = m_data[location & m_count].compare_exchange_strong(found, (uintptr_t)_value);
   m_writerLoc.compare_exchange_strong(location, location + 1);

   if (success)
   {
    return true;
   }
  } while (true);
 }

The only remaining step is the constructor, we need to make sure we initialize the values to what would have been generated one loop before.

 MPMCQueue(uint32_t _count)
 {
  m_count = 1 << _count;
  m_data = new std::atomic[m_count];
  for (uint32_t i = 0; i < m_count; ++i)
  {
   m_data[i] = ((uintptr_t(i) - m_count) << 1) | 1;
  }
  m_count -= 1;
 }

And that is my current lockless queue implementation basically.  To my knowledge, there's no weakness in this implementation.  I've been using it as a basis for a job system, which has several producers and consumers pushing and popping at the same time, and I've not detected any double pop or failing to push.  Next I think I'll look at my own job system implementation.  It's by no means the best, but it's got some nice features, and I'd like to think it's fairly straight forward.

No comments:

Post a Comment