The only subtle change I'd like to make to this quicksort algorithm to become more modular and flexible to possible incoming lists to sort, is to change this line:
result.splice(result.begin(),chunk_data,chunk_data.begin());
Which feeds the pivot element to the next line,
T const& partition_val = *result.begin();
Although this code is correct, it does not exploit the expected-time cost of quicksort when using a random element to pick the pivot.Additionally, on an unfortunate case of the list being passed in sorted in reverse order, (e.g 10,9,8,7,6,5,4,3,2,1), picking the 1st element will yield a recursion level of roughly \(O( (N-1)^{2} )\). This also has the effect of segfaulting my Linux virtual machine with a list of decent size .Implementing the pivot point as
result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size()));
( and obviously seeding the rand() ) will have the effect of reducing the recursion depth down to expected \(O(N log N)\) and handling large list sizes while reducing the chances of segfaulting. Although std::next is an O(N) time algorithm for anything inferior to Random Iterator (which std::list does not provide), sacrifices must be made for functionality before performance. Further modfications can be made to deal with std::vector<T> as opposed to std::list<T>, and will be the topic of a future blog post.
The final sorting program:
#include <future>
#include <stack>
#include <vector>
#include <queue>
#include <iostream>
#include <mutex>
#include <algorithm>
#include <functional>
#include <list>
#include <memory>
#include <condition_variable>
template<typename T>
class thread_safe_stack
{
private:
std::unique_ptr<std::stack<T>> internal_stack;
std::mutex mut;
std::condition_variable cond_var;
public:
template<typename ... Args>
thread_safe_stack(Args&& ... args)
{
internal_stack.reset(new std::stack<T>(std::forward<Args>(args)...));
}
int size()
{
std::lock_guard<std::mutex> lock(mut);
return internal_stack->size();
}
bool wait_and_pop(T& data)
{
std::unique_lock<std::mutex> lock(mut);
cond_var.wait(lock, [](){return size() > 0; });
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lock(mut);
cond_var.wait(lock, [](){return size() > 0; });
auto ptr = std::make_shared<T>(std::move(internal_stack->top()));
internal_stack->pop();
return ptr;
}
bool try_pop(T& data)
{
std::lock_guard<std::mutex> lock(mut);
if(internal_stack->size() <= 0)
return false;
data = std::move(internal_stack->top());
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lock(mut);
if(internal_stack->size() <= 0)
return nullptr;
auto ptr = std::make_shared<T>(std::move(internal_stack->top()));
internal_stack->pop();
return ptr;
}
void push(const T& val)
{
std::lock_guard<std::mutex> lock(mut);
internal_stack->push(val);
}
void push( T&& val)
{
std::lock_guard<std::mutex> lock(mut);
internal_stack->push(std::move(val));
}
};
template<typename T>
struct sorter
{
// nice struct containing sorting details
struct chunk_to_sort
{
std::list<T> data; // to sort
std::promise<std::list<T>> promise;
};
thread_safe_stack<chunk_to_sort> chunks;
std::vector<std::thread> threads;
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
sorter() :
max_thread_count(std::thread::hardware_concurrency() - 1),
end_of_data(false)
{
std::srand(time(NULL));
}
~sorter()
{
end_of_data = true;
for (unsigned i = 0; i < threads.size(); ++i)
{
if (threads[i].joinable()) threads[i].join();
}
}
void try_sort_chunk()
{
std::shared_ptr<chunk_to_sort> chunk = chunks.try_pop(); // get some data to sort, try sorting it
if (chunk)
{
sort_chunk(chunk);
}
}
std::list<T> do_sort(std::list<T>& chunk_data)
{
if (chunk_data.empty()) // nothhing to do? fine!
return chunk_data;
std::list<T> result;
result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size())); // remove pivot element from chunk_data and insert into result
T const& partition_point = *result.begin();
// now the chunk_data is partitioned
auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(), [&](T const& other){ return other < partition_point; });
// sort the lower half
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data, chunk_data.begin(), divide_point);
// get the future for the lower half. As soon as a thread gets the lower chunk, it will set its promise
auto new_lower = new_lower_chunk.promise.get_future();
// push it
chunks.push(std::move(new_lower_chunk));
// can we get a new thread to work without oversubscribing?
if (threads.size() < max_thread_count)
{
threads.push_back(std::thread(&sorter<T>::sort_thread, this));
}
std::list<T> new_higher(do_sort(chunk_data)); // sort the higher data
// result already contains the partition point.So we have to insert the sorted upper-half after the partition
result.splice(result.end(), new_higher); // get upper sorted and insert here
// you stallin' pimpin!
while (new_lower.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready)
{
try_sort_chunk(); // do work in the meantime
}
// lower half has been sorted
result.splice(result.begin(), new_lower.get());
return result;
}
void sort_chunk(std::shared_ptr<chunk_to_sort> const & chunk)
{
chunk->promise.set_value(do_sort(chunk->data)); // set the promise
}
void sort_thread()
{
while (!end_of_data)
{
try_sort_chunk();
std::this_thread::yield(); // let me back off and let another thread come in
}
}
};
template<typename T>
std::list<T> parallel_quicksort(std::list<T> input)
{
if (input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}
int main()
{
std::list<int> my_list;
for (int i = 3000;i >= 0; --i)
{
my_list.push_back(i);
}
// The below commented-out code is a primitive way for shuffling the contents of a std::list. Useful for testing
// the functionality of the code above without picking a random pivot.
// std::vector<int> vtor(my_list.size());
// std::copy(my_list.begin(),my_list.end(),vtor.begin());
// std::random_shuffle(vtor.begin(),vtor.end());
// std::copy(vtor.begin(),vtor.end(),my_list.begin());
auto sorted = parallel_quicksort(my_list);
for (auto i = sorted.begin(); i != sorted.end(); ++i)
{
std::cout << *i << "\n";
}
}
The thread_safe_stack is implemented using simple std::mutex based locks for demonstration purposes. Using std::atomic can also be a viable solution, but for this example, contention is low and serves for demonstration.
Although the change I wanted to make (choosing a random pivot) is small, it makes a difference in running time, and recursion depth ; all of which directly effect usability. Granted, this doesn't solve every problem regarding recursion depth and segfaults, bit it mitigates some of the problems I've encountered when implementing this code. I hope this has been informative!
No comments:
Post a Comment