Sunday, July 12, 2015

Job systems

Video game and threading
For the longest time, video games pretty much only had a single core to work with.  As far as I'm aware, it wasn't until the Sega Saturn that came with multiple programmable cores for games to use.  PC's caught up to using dual cores in the early 2000, but it wasn't until the last generation that multiple cores played a much bigger role.

Most engines have two primary threads, a render thread, and an engine thread.  In addition, there will be several other threads, such as audio threads, streaming threads, and possibly a bunch of other specialized threads, but they rarely use very much of the cores they run on.

The goal of a job system is to take portions of the update and move them from the engine or render thread, and get them to execute on those other, less utilized cores.

Jobs
Right, so a job represents a bit of code.  Since I'm not focusing on supporting the PS3, I can focus on very generic code to create and manage jobs.  For me, a job is just a basic virtual class with an execute function, so it looks like this:


class Job
{
public:
  virtual void Execute() = 0;
  virtual ~Job() = default;
  bool UnsetAutoFree() { m_flags |= 0x80000000; }
private:
  std::atomic<uint32_t>* m_modifiedOnCompletion;
  uint32_t m_flags;
  friend class JobManager;
};

The job manager then allows you to add a job, be notified when the job is completed, and help assist with job execution.  A very simple job manager might look like this:


class JobManager
{
public:
  JobManager(uint32_t _queueSize) : m_jobList(_queueSize) {}
  bool TryAddJob(Job* _job, std::atomic<uint32_t>* _toMod = nullptr);
  void AddJob(Job* _job, std::atomic<uint32_t>* _toMod = nullptr)
  {
    while (!TryAddJob(_job, _toMod));
  }
  bool ProcessAJob();
private:
  Lockless::MPMCQueue<Job> m_jobList;
};

Then in the .cpp, we've got the implementation that looks like this:

bool JobManager::TryAddJob(Job* _job, std::atomic<uint32_t>* _toMod)
{
  if (_toMod)
  {
    _toMod->fetch_add(1);
  }
  _job->m_modifiedOnCompletion = _toMod;
  bool pushed = m_jobList.Push(_job);
  if (!pushed && _toMod)
  {
    _toMod->fetch_add(-1);
  }
  return pushed;
}

bool JobManager::ProcessAJob()
{
  Job* job = m_jobList.Pop();
  if (!job)
  {
    return false;
  }
  job->Execute();
  if (job->m_modifiedOnCompletion)
  {
    job->m_modifiedOnCompletion->fetch_sub(1);
  }
  if ((job->m_flags & 0x80000000) == 0)
  {
    delete job;
  }
  return true;
}

Now, any threads that want to help in the work, can just call ProcessAJob, and assist with jobs.

This provides a nice, basic setup for submitting jobs.  We're using the lockless queue to support multiple threads all pushing and popping jobs all at the same time.

Improving job control
Currently, if you've got a series of task that you then need to ensure complete, and then kick off more jobs, you'd have to add the jobs, and then wait for the atomic value to return to 0, helping out by calling ProcessAJob.  That works, but it's not the best.  I used that for a while, but adding the ability to automatically run custom code, or add jobs to the queue on the thread that just completed the last operation in a group can greatly simplify job sequences.

For this, I create a JobGroup, which will store a list of Jobs, and that can be added to a JobManager.


class Job
{
  <...>
private:
  class JobGroup* m_owningGroup;
};

class JobGroup
{
public:
  virtual ~JobGroup() = default;
  virtual void Finalize() = 0;
  void AddJob(Job* _job) { m_jobs.push_back(_job); }
  bool UnsetAutoFree() { m_flags |= 0x80000000; }
private:
  std::vector<Job*> m_jobs;
  std::atomic<uint32_t> m_jobCount;
  std::atomic<uint32_t>* m_toFinalize;
  uint32_t m_flags;
  friend class JobManager;
};

class JobManager
{
  <...>
  void AddJobGroup(JobGroup* _group, std::atomic<uint32_t>* _toMod = nullptr);
}

Then, in the JobManager code...

bool JobManager::AddJob(Job* _job, std::atomic<uint32_t>* _toMod)
{
  _job->m_owningGroup = nullptr;
  <...>
}

void JobManager::AddJobGroup(JobGroup* _group, std::atomic<uint32_t>* _toMod)
{
  _group->m_jobCount = _group->m_jobs.size();
  _group->m_toFinalize = _toMod;
  if (_toMod)
  {
    _toMod->fetch_add(1);
  }
  for (auto job : _group->m_jobs)
  {
    job->m_owningGroup = _group;
    job->m_modifiedOnCompletion = nullptr;
    while (!m_jobList.Push(job))
    {
    }
  }
}

bool JobManager::ProcessAJob()
{
  Job* job = m_jobList.Pop();
  if (!job)
  {
    return false;
  }
  job->Execute();
  if (job->m_modifiedOnCompletion)
  {
    job->m_modifiedOnCompletion->fetch_sub(1);
  }
  JobGroup* group = job->m_owningGroup;
  if (group && group->m_jobCount.fetch_sub(1) == 1)
  {
    group->Finalize();
    if (group->m_toFinalize)
    {
      group->m_toFinalize->fetch_sub(1);
    }
    if ((group->m_flags & 0x80000000) == 0)
    {
      delete group;
    }
  }
  if ((job->m_flags & 0x80000000) == 0)
  {
    delete job;
  }
  return true;
}

So what happens, is when a job is executed, if it has an owning group, it'll decrement the atomic value tied to the group.  Once the group's count hits 0, it calls finalize, and decrements the atomic that was associated with it when it was added.  This finalize function allows for sequential operations to immediately start execution of the next sequence needed without waiting for another thread to spin a sequence around. 

With this, we've got a good start, but this post was getting long and was long in the making.  Next time I'll post about thread spawning, job and thread specialization, and attaching a limited access resource to those specializations so that you can guarantee there will never be more than a given number of those jobs in flight.

4 comments:

  1. Well I need to look here more often :) Interesting system (I need to think about some improvement in my job system so this may be hand :D) but I think that there may be small problem when m_jobList is full. Then when we call AddJob the value of _toMod may be increase few times and decrements only once am I right ?

    ReplyDelete
    Replies
    1. Ah, yeah. I've fixed that up. In the version I'm actually using correctly decrements if it failed to push(which the post does now). That version is what I'll end up posting once I finish the post, but I've been too distracted with adding DX12 support.

      Delete
    2. Hehe:] How do you like DX12? Personally I'm still waiting for Vulcan :)

      Delete
    3. So it's coming along fairly well. The samples they provide are very specific and minimalistic, and so they don't really demo how many engines will have to setup things, like for the Root Signature and how you bind resources to be used in shaders, so I ended up looking at how UE4 manages some of that to figure it out.

      I'm definitely planning on supporting Vulcan as well, it's just I've got my hands on DX12 first. It'll be nice to have an OpenGL implementation available that maps to how I'm wanting to use the GPU.

      One of the more advance features I really want to play around with for both of them is the execute indirect stuff. I want to see how much of the rendering thread's work I can get the GPU to do for me.

      Delete