Tuesday, December 16, 2014

Slight, yet Important Change to Williams Parallel Quicksort Algorithm

Upon reading Anthony Williams' excellent book on concurrency and mutlithreaded programming in C++, C++ Concurrency In Action, I wanted to take a step further in analyzing the parallel quicksort algorithm used and developed throughout the chapters that is later even given the finishing touch by submitting work to a thread-pool.





The only subtle change I'd like to make to this quicksort algorithm to become more modular and flexible to possible incoming lists to sort, is to change this line:

result.splice(result.begin(),chunk_data,chunk_data.begin());

Which feeds the pivot element to the next line,

T const& partition_val = *result.begin();

Although this code is correct, it does not exploit the expected-time cost of quicksort when using a random element to pick the pivot.Additionally, on an unfortunate  case of the list being passed in sorted in reverse order, (e.g 10,9,8,7,6,5,4,3,2,1), picking the 1st element will yield a recursion level of roughly \(O( (N-1)^{2} )\). This also has the effect of segfaulting my Linux virtual machine with a list of decent size .Implementing the pivot point as

result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size()));

( and obviously seeding the rand() ) will have the effect of reducing the recursion depth down to expected \(O(N log N)\)  and handling large list sizes while reducing the chances of segfaulting. Although std::next is an O(N) time algorithm for anything inferior to Random Iterator (which std::list does not provide), sacrifices must be made for functionality before performance. Further modfications can be made to deal with std::vector<T> as opposed to std::list<T>, and will be the topic of a future blog post.

The final sorting program:


#include <future>
#include <stack>
#include <vector>
#include <queue>
#include <iostream>
#include <mutex>
#include <algorithm>
#include <functional>
#include <list>
#include <memory>
#include <condition_variable>


template<typename T>
class thread_safe_stack
{
private:
 std::unique_ptr<std::stack<T>> internal_stack;
 std::mutex mut;
 std::condition_variable cond_var;
public:

 template<typename ... Args>
 thread_safe_stack(Args&& ... args)
 {
  internal_stack.reset(new std::stack<T>(std::forward<Args>(args)...));
 }

 int size()
 {
  std::lock_guard<std::mutex> lock(mut);
  return internal_stack->size();
 }
 bool wait_and_pop(T& data)
 {
  std::unique_lock<std::mutex> lock(mut);
  cond_var.wait(lock, [](){return size() > 0; });
  

 }

 std::shared_ptr<T> wait_and_pop()
 {
  std::unique_lock<std::mutex> lock(mut);
  cond_var.wait(lock, [](){return size() > 0; });
  auto ptr = std::make_shared<T>(std::move(internal_stack->top()));
  internal_stack->pop();
  return ptr;
 }

 bool try_pop(T& data)
 {
  std::lock_guard<std::mutex> lock(mut);
  if(internal_stack->size() <= 0)
     return false;
  data = std::move(internal_stack->top());
  return true;

 }

 
 std::shared_ptr<T> try_pop()
 {
  std::lock_guard<std::mutex> lock(mut);
  if(internal_stack->size() <= 0)
     return nullptr;
  auto ptr = std::make_shared<T>(std::move(internal_stack->top()));
  internal_stack->pop();
  return ptr;
 }

 void push(const T& val)
 {
  std::lock_guard<std::mutex> lock(mut);
  internal_stack->push(val);
  
 }

 void push( T&& val)
 {
  std::lock_guard<std::mutex> lock(mut);
  internal_stack->push(std::move(val));

 }
};

template<typename T>
struct sorter
{

 // nice struct containing sorting details
 struct chunk_to_sort
 {
  std::list<T> data; // to sort
  std::promise<std::list<T>> promise;

 };

 thread_safe_stack<chunk_to_sort> chunks;
 std::vector<std::thread> threads;
 unsigned const max_thread_count;
 std::atomic<bool> end_of_data;

 sorter() :
  max_thread_count(std::thread::hardware_concurrency() - 1),
  end_of_data(false)
 {
  std::srand(time(NULL));
 }

 ~sorter()
 {

  end_of_data = true;
  for (unsigned i = 0; i < threads.size(); ++i)
  {
   if (threads[i].joinable()) threads[i].join();
  }

 }

 void try_sort_chunk()
 {

  std::shared_ptr<chunk_to_sort> chunk = chunks.try_pop(); // get some data to sort, try sorting it
  if (chunk)
  {
   sort_chunk(chunk);
  }
 }

 std::list<T> do_sort(std::list<T>& chunk_data)
 {

  if (chunk_data.empty()) // nothhing to do? fine!
   return chunk_data;

  std::list<T> result;
  result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size())); // remove pivot element from chunk_data and insert into result
  T const& partition_point = *result.begin();


  // now the chunk_data is partitioned
  auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(), [&](T const& other){ return other < partition_point; });

  // sort the lower half
  chunk_to_sort new_lower_chunk;
  new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data, chunk_data.begin(), divide_point);

  // get the future for the lower half. As soon as a thread gets the lower chunk, it will set its promise
  auto new_lower = new_lower_chunk.promise.get_future();

  // push it 
  chunks.push(std::move(new_lower_chunk));


  // can we get a new thread to work without oversubscribing?
  if (threads.size() < max_thread_count)
  {
   threads.push_back(std::thread(&sorter<T>::sort_thread, this));
  }

  
  std::list<T> new_higher(do_sort(chunk_data)); // sort the higher data


  // result already contains the partition point.So we have to insert the sorted upper-half after the partition
  result.splice(result.end(), new_higher); // get upper sorted and insert here

  // you stallin' pimpin!
  while (new_lower.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready)
  {
   try_sort_chunk(); // do work in the meantime
  }

  // lower half has been sorted
  result.splice(result.begin(), new_lower.get());

  return result;

 }

 void sort_chunk(std::shared_ptr<chunk_to_sort> const & chunk)
 {
  chunk->promise.set_value(do_sort(chunk->data));  // set the promise
 }
 void sort_thread()
 {
  while (!end_of_data)
  {
   try_sort_chunk();
   std::this_thread::yield(); // let me back off and let another thread come in
  }
 }
};

template<typename T> 
std::list<T> parallel_quicksort(std::list<T> input)
{
 if (input.empty())
 {
  return input;
 }
 sorter<T> s;
 return s.do_sort(input);
}
int main()
{


 std::list<int> my_list;
 for (int i = 3000;i  >= 0; --i)
 {
  my_list.push_back(i);
 }

        // The below commented-out code is a primitive way for shuffling the contents of a std::list. Useful for testing 
        // the functionality of the code above without picking a random pivot.

 // std::vector<int> vtor(my_list.size());
 // std::copy(my_list.begin(),my_list.end(),vtor.begin());
 // std::random_shuffle(vtor.begin(),vtor.end());
 // std::copy(vtor.begin(),vtor.end(),my_list.begin());

 auto sorted = parallel_quicksort(my_list);
 for (auto i = sorted.begin(); i != sorted.end(); ++i)
 {
  std::cout << *i << "\n";
 }
}

The thread_safe_stack is implemented using simple std::mutex based locks for demonstration purposes. Using std::atomic can also be a viable solution, but for this example, contention is low and serves for demonstration. Although the change I wanted to make (choosing a random pivot) is small, it makes a difference in running time, and recursion depth ; all of which directly effect usability. Granted, this doesn't solve every problem regarding recursion depth and segfaults, bit it mitigates some of the problems I've encountered when implementing this code. I hope this has been informative!

No comments:

Post a Comment