The only subtle change I'd like to make to this quicksort algorithm to become more modular and flexible to possible incoming lists to sort, is to change this line:
result.splice(result.begin(),chunk_data,chunk_data.begin());
Which feeds the pivot element to the next line,
T const& partition_val = *result.begin();
Although this code is correct, it does not exploit the expected-time cost of quicksort when using a random element to pick the pivot.Additionally, on an unfortunate case of the list being passed in sorted in reverse order, (e.g 10,9,8,7,6,5,4,3,2,1), picking the 1st element will yield a recursion level of roughly \(O( (N-1)^{2} )\). This also has the effect of segfaulting my Linux virtual machine with a list of decent size .Implementing the pivot point as
result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size()));
( and obviously seeding the rand() ) will have the effect of reducing the recursion depth down to expected \(O(N log N)\) and handling large list sizes while reducing the chances of segfaulting. Although std::next is an O(N) time algorithm for anything inferior to Random Iterator (which std::list does not provide), sacrifices must be made for functionality before performance. Further modfications can be made to deal with std::vector<T> as opposed to std::list<T>, and will be the topic of a future blog post.
The final sorting program:
#include <future> #include <stack> #include <vector> #include <queue> #include <iostream> #include <mutex> #include <algorithm> #include <functional> #include <list> #include <memory> #include <condition_variable> template<typename T> class thread_safe_stack { private: std::unique_ptr<std::stack<T>> internal_stack; std::mutex mut; std::condition_variable cond_var; public: template<typename ... Args> thread_safe_stack(Args&& ... args) { internal_stack.reset(new std::stack<T>(std::forward<Args>(args)...)); } int size() { std::lock_guard<std::mutex> lock(mut); return internal_stack->size(); } bool wait_and_pop(T& data) { std::unique_lock<std::mutex> lock(mut); cond_var.wait(lock, [](){return size() > 0; }); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lock(mut); cond_var.wait(lock, [](){return size() > 0; }); auto ptr = std::make_shared<T>(std::move(internal_stack->top())); internal_stack->pop(); return ptr; } bool try_pop(T& data) { std::lock_guard<std::mutex> lock(mut); if(internal_stack->size() <= 0) return false; data = std::move(internal_stack->top()); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lock(mut); if(internal_stack->size() <= 0) return nullptr; auto ptr = std::make_shared<T>(std::move(internal_stack->top())); internal_stack->pop(); return ptr; } void push(const T& val) { std::lock_guard<std::mutex> lock(mut); internal_stack->push(val); } void push( T&& val) { std::lock_guard<std::mutex> lock(mut); internal_stack->push(std::move(val)); } }; template<typename T> struct sorter { // nice struct containing sorting details struct chunk_to_sort { std::list<T> data; // to sort std::promise<std::list<T>> promise; }; thread_safe_stack<chunk_to_sort> chunks; std::vector<std::thread> threads; unsigned const max_thread_count; std::atomic<bool> end_of_data; sorter() : max_thread_count(std::thread::hardware_concurrency() - 1), end_of_data(false) { std::srand(time(NULL)); } ~sorter() { end_of_data = true; for (unsigned i = 0; i < threads.size(); ++i) { if (threads[i].joinable()) threads[i].join(); } } void try_sort_chunk() { std::shared_ptr<chunk_to_sort> chunk = chunks.try_pop(); // get some data to sort, try sorting it if (chunk) { sort_chunk(chunk); } } std::list<T> do_sort(std::list<T>& chunk_data) { if (chunk_data.empty()) // nothhing to do? fine! return chunk_data; std::list<T> result; result.splice(result.begin(), chunk_data, std::next(chunk_data.begin(),std::rand() % chunk_data.size())); // remove pivot element from chunk_data and insert into result T const& partition_point = *result.begin(); // now the chunk_data is partitioned auto divide_point = std::partition(chunk_data.begin(), chunk_data.end(), [&](T const& other){ return other < partition_point; }); // sort the lower half chunk_to_sort new_lower_chunk; new_lower_chunk.data.splice(new_lower_chunk.data.end(), chunk_data, chunk_data.begin(), divide_point); // get the future for the lower half. As soon as a thread gets the lower chunk, it will set its promise auto new_lower = new_lower_chunk.promise.get_future(); // push it chunks.push(std::move(new_lower_chunk)); // can we get a new thread to work without oversubscribing? if (threads.size() < max_thread_count) { threads.push_back(std::thread(&sorter<T>::sort_thread, this)); } std::list<T> new_higher(do_sort(chunk_data)); // sort the higher data // result already contains the partition point.So we have to insert the sorted upper-half after the partition result.splice(result.end(), new_higher); // get upper sorted and insert here // you stallin' pimpin! while (new_lower.wait_for(std::chrono::milliseconds(0)) != std::future_status::ready) { try_sort_chunk(); // do work in the meantime } // lower half has been sorted result.splice(result.begin(), new_lower.get()); return result; } void sort_chunk(std::shared_ptr<chunk_to_sort> const & chunk) { chunk->promise.set_value(do_sort(chunk->data)); // set the promise } void sort_thread() { while (!end_of_data) { try_sort_chunk(); std::this_thread::yield(); // let me back off and let another thread come in } } }; template<typename T> std::list<T> parallel_quicksort(std::list<T> input) { if (input.empty()) { return input; } sorter<T> s; return s.do_sort(input); } int main() { std::list<int> my_list; for (int i = 3000;i >= 0; --i) { my_list.push_back(i); } // The below commented-out code is a primitive way for shuffling the contents of a std::list. Useful for testing // the functionality of the code above without picking a random pivot. // std::vector<int> vtor(my_list.size()); // std::copy(my_list.begin(),my_list.end(),vtor.begin()); // std::random_shuffle(vtor.begin(),vtor.end()); // std::copy(vtor.begin(),vtor.end(),my_list.begin()); auto sorted = parallel_quicksort(my_list); for (auto i = sorted.begin(); i != sorted.end(); ++i) { std::cout << *i << "\n"; } }The thread_safe_stack is implemented using simple std::mutex based locks for demonstration purposes. Using std::atomic can also be a viable solution, but for this example, contention is low and serves for demonstration. Although the change I wanted to make (choosing a random pivot) is small, it makes a difference in running time, and recursion depth ; all of which directly effect usability. Granted, this doesn't solve every problem regarding recursion depth and segfaults, bit it mitigates some of the problems I've encountered when implementing this code. I hope this has been informative!
No comments:
Post a Comment