## Tuesday, December 16, 2014

### Slight, yet Important Change to Williams Parallel Quicksort Algorithm

Upon reading Anthony Williams' excellent book on concurrency and mutlithreaded programming in C++, C++ Concurrency In Action, I wanted to take a step further in analyzing the parallel quicksort algorithm used and developed throughout the chapters that is later even given the finishing touch by submitting work to a thread-pool.

The only subtle change I'd like to make to this quicksort algorithm to become more modular and flexible to possible incoming lists to sort, is to change this line:

result.splice(result.begin(),chunk_data,chunk_data.begin());


Which feeds the pivot element to the next line,

T const& partition_val = *result.begin();


Although this code is correct, it does not exploit the expected-time cost of quicksort when using a random element to pick the pivot.Additionally, on an unfortunate  case of the list being passed in sorted in reverse order, (e.g 10,9,8,7,6,5,4,3,2,1), picking the 1st element will yield a recursion level of roughly $O( (N-1)^{2} )$. This also has the effect of segfaulting my Linux virtual machine with a list of decent size .Implementing the pivot point as

result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size()));


( and obviously seeding the rand() ) will have the effect of reducing the recursion depth down to expected $O(N log N)$  and handling large list sizes while reducing the chances of segfaulting. Although std::next is an O(N) time algorithm for anything inferior to Random Iterator (which std::list does not provide), sacrifices must be made for functionality before performance. Further modfications can be made to deal with std::vector<T> as opposed to std::list<T>, and will be the topic of a future blog post.

The final sorting program:

#include <future>
#include <stack>
#include <vector>
#include <queue>
#include <iostream>
#include <mutex>
#include <algorithm>
#include <functional>
#include <list>
#include <memory>
#include <condition_variable>

template<typename T>
{
private:
std::unique_ptr<std::stack<T>> internal_stack;
std::mutex mut;
std::condition_variable cond_var;
public:

template<typename ... Args>
{
internal_stack.reset(new std::stack<T>(std::forward<Args>(args)...));
}

int size()
{
std::lock_guard<std::mutex> lock(mut);
return internal_stack->size();
}
bool wait_and_pop(T& data)
{
std::unique_lock<std::mutex> lock(mut);
cond_var.wait(lock, [](){return size() > 0; });

}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lock(mut);
cond_var.wait(lock, [](){return size() > 0; });
auto ptr = std::make_shared<T>(std::move(internal_stack->top()));
internal_stack->pop();
return ptr;
}

bool try_pop(T& data)
{
std::lock_guard<std::mutex> lock(mut);
if(internal_stack->size() <= 0)
return false;
data = std::move(internal_stack->top());
return true;

}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lock(mut);
if(internal_stack->size() <= 0)
return nullptr;
auto ptr = std::make_shared<T>(std::move(internal_stack->top()));
internal_stack->pop();
return ptr;
}

void push(const T& val)
{
std::lock_guard<std::mutex> lock(mut);
internal_stack->push(val);

}

void push( T&& val)
{
std::lock_guard<std::mutex> lock(mut);
internal_stack->push(std::move(val));

}
};

template<typename T>
struct sorter
{

// nice struct containing sorting details
struct chunk_to_sort
{
std::list<T> data; // to sort
std::promise<std::list<T>> promise;

};

std::atomic<bool> end_of_data;

sorter() :
end_of_data(false)
{
std::srand(time(NULL));
}

~sorter()
{

end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i)
{
}

}

void try_sort_chunk()
{

std::shared_ptr<chunk_to_sort> chunk = chunks.try_pop(); // get some data to sort, try sorting it
if (chunk)
{
sort_chunk(chunk);
}
}

std::list<T> do_sort(std::list<T>& chunk_data)
{

if (chunk_data.empty()) // nothhing to do? fine!
return chunk_data;

std::list<T> result;
result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size())); // remove pivot element from chunk_data and insert into result
T const& partition_point = *result.begin();

// now the chunk_data is partitioned
auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(), [&](T const& other){ return other < partition_point; });

// sort the lower half
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data, chunk_data.begin(), divide_point);

// get the future for the lower half. As soon as a thread gets the lower chunk, it will set its promise
auto new_lower = new_lower_chunk.promise.get_future();

// push it
chunks.push(std::move(new_lower_chunk));

// can we get a new thread to work without oversubscribing?
{
}

std::list<T> new_higher(do_sort(chunk_data)); // sort the higher data

// result already contains the partition point.So we have to insert the sorted upper-half after the partition
result.splice(result.end(), new_higher); // get upper sorted and insert here

// you stallin' pimpin!
{
try_sort_chunk(); // do work in the meantime
}

// lower half has been sorted
result.splice(result.begin(), new_lower.get());

return result;

}

void sort_chunk(std::shared_ptr<chunk_to_sort> const & chunk)
{
chunk->promise.set_value(do_sort(chunk->data));  // set the promise
}
{
while (!end_of_data)
{
try_sort_chunk();
std::this_thread::yield(); // let me back off and let another thread come in
}
}
};

template<typename T>
std::list<T> parallel_quicksort(std::list<T> input)
{
if (input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}
int main()
{

std::list<int> my_list;
for (int i = 3000;i  >= 0; --i)
{
my_list.push_back(i);
}

// The below commented-out code is a primitive way for shuffling the contents of a std::list. Useful for testing
// the functionality of the code above without picking a random pivot.

// std::vector<int> vtor(my_list.size());
// std::copy(my_list.begin(),my_list.end(),vtor.begin());
// std::random_shuffle(vtor.begin(),vtor.end());
// std::copy(vtor.begin(),vtor.end(),my_list.begin());

auto sorted = parallel_quicksort(my_list);
for (auto i = sorted.begin(); i != sorted.end(); ++i)
{
std::cout << *i << "\n";
}
}


The thread_safe_stack is implemented using simple std::mutex based locks for demonstration purposes. Using std::atomic can also be a viable solution, but for this example, contention is low and serves for demonstration. Although the change I wanted to make (choosing a random pivot) is small, it makes a difference in running time, and recursion depth ; all of which directly effect usability. Granted, this doesn't solve every problem regarding recursion depth and segfaults, bit it mitigates some of the problems I've encountered when implementing this code. I hope this has been informative!