From d8dbfdcc460754bd5f45182495ff14b39b94b24d Mon Sep 17 00:00:00 2001 From: graehl Date: Sat, 21 Aug 2010 03:07:28 +0000 Subject: heap speedups (TODO: align so children on same cache line) git-svn-id: https://ws10smt.googlecode.com/svn/trunk@611 ec762483-ff6d-05da-a07a-a48fb63a330f --- utils/d_ary_heap.h | 391 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 293 insertions(+), 98 deletions(-) diff --git a/utils/d_ary_heap.h b/utils/d_ary_heap.h index 2ec716d3..15bde583 100644 --- a/utils/d_ary_heap.h +++ b/utils/d_ary_heap.h @@ -1,3 +1,61 @@ +#ifndef D_ARY_HEAP_H +#define D_ARY_HEAP_H + + +#define D_ARY_PUSH_GRAEHL 0 // untested +#define D_ARY_POP_GRAEHL 0 // untested +#define D_ARY_DOWN_GRAEHL 0 // untested +#define D_ARY_UP_GRAEHL 0 // untested +#define D_ARY_APPEND_ALWAYS_PUSH 1 // heapify (0) is untested. otherwise switch between push and heapify depending on size (cache effects, existing items vs. # appended ones) + +#define D_ARY_TRACK_OUT_OF_HEAP 0 // shouldn't need to track, because in contains() false positives looking up stale or random loc map values are impossible - we just check key +#define D_ARY_VERIFY_HEAP 0 +// This is a very expensive test so it should be disabled even when NDEBUG is not defined + +/* adapted from boost/graph/detail/d_ary_heap.hpp + + local modifications: + + clear, heapify, append range/container, Size type template arg, reserve constructor arg + + hole+move rather than swap. note: swap would be more efficient for heavyweight keys, until move ctors exist + + don't set locmap to -1 when removing from heap (waste of time) + + // unlike arity=2 case, you don't gain anything by having indices start at 1, with 0-based child indices + // root @1, A=2, children indices m={0,1}: parent(i)=i/2, child(i,m)=2*i+m + // root @0: parent(i)=(i-1)/A child(i,n)=i*A+n+1 - can't improve on this except child(i,m)=i*A+m + (integer division, a/b=floor(a/b), so (i-1)/A = ceil(i/A)-1, or greatest int less than (i/A)) + + actually, no need to adjust child index, since child is called only once and inline + + e.g. for A=3 gorn address in tree -> index + + () = root -> 0 + (1) -> 1 + (2) -> 2 + (3) (A) -> 3 + (1,1) -> (1*A+1) = 4 + (1,2) -> (1*A+2) = 5 + (1,3) -> (1*A+3) = 6 + (2,1) -> (2*A+1) = 7 + etc. + +//TODO: block-align siblings! assume data[0] is 16 or 32-byte aligned ... then we want root @ index (blocksize-1). see http://www.lamarca.org/anthony/pubs/heaps.pdf pg8. for pow2(e.g. 4)-ary heap, it may be reasonable to use root @index A-1. however, suppose the key size is not padded to a power of 2 (e.g. 12 bytes), then we would need internal gaps at times. would want to use compile const template based inlineable alignment math for this? possibly use a container like vector that lets you specify padding relative to some address multiple for v[0]. + + optimal D: see http://www.lamarca.org/anthony/pubs/heaps.pdf pg 9. depedns on relative cost of swap,compare, but in all cases except swap=free, 2 is worse than 3-4. for expensive swap (3x compare), 4 still as good as 5. so just use 4. boost benchmarking djikstra agrees; 4 is best. + + cache-aligned 4-heap speedup over regular 2-heap is 10-80% (for huge heaps, the speedup is more) + + splay/skew heaps are worse than 2heap or aligned 4heap in practice. + + //TODO: switch from heapify (Floyd's method) to repeated push past some size limit (in bytes) due to cache effect - + #define D_ARY_BYTES_OUT_OF_CACHE 0x1000000 + + //TODO: assuming locmap is an lvalue pmap, we can be more efficient. on the other hand, if it's an intrusive property map to an interned mutable object, there's no difference in performance, and that's what i'm going to do in my first uses. plus, if keys are indices and the map is a vector, it's barely any overhead. + + */ + // //======================================================================= // Copyright 2009 Trustees of Indiana University @@ -8,8 +66,6 @@ // http://www.boost.org/LICENSE_1_0.txt) //======================================================================= // -#ifndef BOOST_D_ARY_HEAP_HPP -#define BOOST_D_ARY_HEAP_HPP #include #include @@ -20,46 +76,12 @@ #include #include -namespace boost { - - // Swap two elements in a property map without assuming they model - // LvaluePropertyMap -- currently not used - template - inline void property_map_swap( - PropMap prop_map, - const typename boost::property_traits::key_type& ka, - const typename boost::property_traits::key_type& kb) { - typename boost::property_traits::value_type va = get(prop_map, ka); - put(prop_map, ka, get(prop_map, kb)); - put(prop_map, kb, va); - } - - namespace detail { - template - class fixed_max_size_vector { - boost::shared_array m_data; - std::size_t m_size; - - public: - typedef std::size_t size_type; - fixed_max_size_vector(std::size_t max_size) - : m_data(new Value[max_size]), m_size(0) {} - std::size_t size() const {return m_size;} - bool empty() const {return m_size == 0;} - Value& operator[](std::size_t i) {return m_data[i];} - const Value& operator[](std::size_t i) const {return m_data[i];} - void push_back(Value v) {m_data[m_size++] = v;} - void pop_back() {--m_size;} - Value& back() {return m_data[m_size - 1];} - const Value& back() const {return m_data[m_size - 1];} - }; - } // D-ary heap using an indirect compare operator (use identity_property_map // as DistanceMap to get a direct compare operator). This heap appears to be // commonly used for Dijkstra's algorithm for its good practical performance - // on some platforms; asymptotically, it has an O(lg N) decrease-key - // operation while that can be done in constant time on a relaxed heap. The + // on some platforms; asymptotically, it's not optimal; it has an O(lg N) decrease-key + // operation, which is (amortized) constant time on a relaxed heap or fibonacci heap. The // implementation is mostly based on the binary heap page on Wikipedia and // online sources that state that the operations are the same for d-ary // heaps. This code is not based on the old Boost d-ary heap code. @@ -85,23 +107,91 @@ namespace boost { typename IndexInHeapPropertyMap, typename DistanceMap, typename Compare = std::less, - typename Container = std::vector > + typename Container = std::vector, + typename Size = typename Container::size_type, + typename Equal = std::equal_to > class d_ary_heap_indirect { BOOST_STATIC_ASSERT (Arity >= 2); - +# undef D_ARY_HEAP_NULL_INDEX +# define D_ARY_HEAP_NULL_INDEX ((size_type)(-1)) public: - typedef typename Container::size_type size_type; + typedef Container container_type; + typedef Size size_type; typedef Value value_type; - + typedef typename Container::const_iterator const_iterator; + typedef const_iterator iterator; + // The distances being compared using compare and that are stored in the + // distance map + typedef typename boost::property_traits::value_type distance_type; d_ary_heap_indirect(DistanceMap distance, IndexInHeapPropertyMap index_in_heap, const Compare& compare = Compare(), - const Container& data = Container()) - : compare(compare), data(data), distance(distance), - index_in_heap(index_in_heap) {} + size_type container_reserve = 100000, + Equal const& equal = Equal() + ) + : compare(compare), data(), distance(distance), + index_in_heap(index_in_heap) { + data.reserve(container_reserve); + } /* Implicit copy constructor */ /* Implicit assignment operator */ + template + void append_heapify(C const& c) { + data.reserve(data.size()+c.size()); + append_heapify(c.begin(),c.end()); + } + + template + void append_heapify(I begin,I end) { + data.insert(data.end(),begin,end); + heapify(); + } + + template + void append_push(C const& c) { + data.reserve(data.size()+c.size()); + append_push(c.begin(),c.end()); + } + + // past some threshold, this should be faster than append_heapify. also, if there are many existing elements it will be faster. + template + void append_push(I begin,I end) { + for (;begin!=end;++begin) + push(*begin); + } + + template + void append(C const& c) { + if (D_ARY_APPEND_ALWAYS_PUSH || data.size()>=c.size()/2) + append_push(c); + else + append_heapify(c); + } + + // past some threshold, this should be faster than append_heapify. also, if there are many existing elements it will be faster. + template + void append(I begin,I end) { + if (D_ARY_APPEND_ALWAYS_PUSH || data.size()>=0x10000) + append_push(begin,end); + else + append_heapify(begin,end); + } + + // could allow mutation of data directly, e.g. push_back 1 at a time - but then they could forget to heapify() + + //from bottom of heap tree up, turn that subtree into a heap by adjusting the root down + // for n=size, array elements indexed by floor(n/2) + 1, floor(n/2) + 2, ... , n are all leaves for the tree, thus each is an one-element heap already + // warning: this is many fewer instructions but, at some point (when heap doesn't fit in Lx cache) it will become slower than repeated push(). + void heapify() { + for (size_type i=parent(data.size()-1);i>0;--i) // starting from parent of last node, ending at first child of root (i==1) + preserve_heap_property_down(i); + } + + void reserve(size_type s) { + data.reserve(s); + } + size_type size() const { return data.size(); } @@ -110,11 +200,33 @@ namespace boost { return data.empty(); } + const_iterator begin() const { + return data.begin(); + } + + const_iterator end() const { + return data.end(); + } + + void clear() { +#if D_ARY_TRACK_OUT_OF_HEAP + for (typename Container::iterator i=data.begin(),e=data.end();i!=e;++i) + put(index_in_heap,*i,D_ARY_HEAP_NULL_INDEX); +#endif + data.clear(); + } + void push(const Value& v) { - size_type index = data.size(); - data.push_back(v); - put(index_in_heap, v, index); - preserve_heap_property_up(index); + if (D_ARY_PUSH_GRAEHL) { + size_type i = data.size(); + data.push_back(Value()); // (hoping default construct is cheap, construct-copy inline) + preserve_heap_property_up(v,i); // we don't have to recopy v, or init index_in_heap + } else { + size_type index = data.size(); + data.push_back(v); + put(index_in_heap, v, index); + preserve_heap_property_up(index); + } verify_heap(); } @@ -127,65 +239,80 @@ namespace boost { } void pop() { - put(index_in_heap, data[0], (size_type)(-1)); + if(D_ARY_TRACK_OUT_OF_HEAP) + put(index_in_heap, data[0], D_ARY_HEAP_NULL_INDEX); if (data.size() != 1) { - data[0] = data.back(); - put(index_in_heap, data[0], 0); - data.pop_back(); - preserve_heap_property_down(); + if (D_ARY_POP_GRAEHL) { + preserve_heap_property_down(data.back(),0,data.size()-1); + data.pop_back(); + } else { + data[0] = data.back(); + put(index_in_heap, data[0], 0); + data.pop_back(); + preserve_heap_property_down(); + } verify_heap(); } else { data.pop_back(); } } - // This function assumes the key has been updated (using an external write - // to the distance map or such) - // See http://coding.derkeiler.com/Archive/General/comp.theory/2007-05/msg00043.html - void update(const Value& v) { /* decrease-key */ + // This function assumes the key has been improved + // (distance has become smaller, so it may need to rise toward top(). + // i.e. decrease-key in a min-heap + void update(const Value& v) { size_type index = get(index_in_heap, v); preserve_heap_property_up(index); verify_heap(); } - bool contains(const Value& v) const { - size_type index = get(index_in_heap, v); - return (index != (size_type)(-1)); + inline bool contains(const Value &v,size_type i) const { + return D_ARY_TRACK_OUT_OF_HEAP ? + (i != D_ARY_HEAP_NULL_INDEX) : + i>=0 && i=0 check to catch uninit. data + } + + inline bool contains(const Value& v) const { + return contains(v,get(index_in_heap, v)); } void push_or_update(const Value& v) { /* insert if not present, else update */ size_type index = get(index_in_heap, v); - if (index == (size_type)(-1)) { - index = data.size(); - data.push_back(v); - put(index_in_heap, v, index); + if (D_ARY_PUSH_GRAEHL) { + if (contains(v,index)) + preserve_heap_property_up(v,index); + else + push(v); + } else { + if (!contains(v,index)) { + index = data.size(); + data.push_back(v); + put(index_in_heap, v, index); + } + preserve_heap_property_up(index); } - preserve_heap_property_up(index); verify_heap(); } private: + Equal equal; Compare compare; Container data; DistanceMap distance; IndexInHeapPropertyMap index_in_heap; - // The distances being compared using compare and that are stored in the - // distance map - typedef typename boost::property_traits::value_type distance_type; - // Get the parent of a given node in the heap - static size_type parent(size_type index) { + static inline size_type parent(size_type index) { return (index - 1) / Arity; } // Get the child_idx'th child of a given node; 0 <= child_idx < Arity - static size_type child(size_type index, std::size_t child_idx) { + static inline size_type child(size_type index, std::size_t child_idx) { return index * Arity + child_idx + 1; } // Swap two elements in the heap by index, updating index_in_heap - void swap_heap_elements(size_type index_a, size_type index_b) { + inline void swap_heap_elements(size_type index_a, size_type index_b) { using std::swap; Value value_a = data[index_a]; Value value_b = data[index_b]; @@ -195,33 +322,58 @@ namespace boost { put(index_in_heap, value_b, index_a); } - // Emulate the indirect_cmp that is now folded into this heap class - bool compare_indirect(const Value& a, const Value& b) const { - return compare(get(distance, a), get(distance, b)); + inline void move_heap_element(Value const& v,size_type ito) { + put(index_in_heap,v,ito); + data[ito]=v; //todo: move assign? } // Verify that the array forms a heap; commented out by default void verify_heap() const { // This is a very expensive test so it should be disabled even when // NDEBUG is not defined -#if 0 +#if D_ARY_VERIFY_HEAP for (size_t i = 1; i < data.size(); ++i) { - if (compare_indirect(data[i], data[parent(i)])) { + if (compare(get(distance,data[i]), get(distance,data[parent(i)]))) { assert (!"Element is smaller than its parent"); } } #endif } + // we have a copy of the key, so we don't need to do that stupid find # of levels to move then move. we act as though data[index]=currently_being_moved, but in fact it's an uninitialized "hole", which we fill at the very end + void preserve_heap_property_up(Value const& currently_being_moved,size_type index) { + size_type orig_index = index; + distance_type currently_being_moved_dist = + get(distance, currently_being_moved); + for (;;) { + if (index == 0) break; // Stop at root + size_type parent_index = parent(index); + Value const& parent_value = data[parent_index]; + if (compare(currently_being_moved_dist, get(distance, parent_value))) { + move_heap_element(parent_value,index); + index = parent_index; + } else { + break; // Heap property satisfied + } + } + //finish "swap chain" by filling hole w/ currently_being_moved + move_heap_element(currently_being_moved,index); // note: it's ok not to return early on index==0 at start, even if self-assignment isn't supported by Value - because currently_being_moved is a copy. + } + // Starting at a node, move up the tree swapping elements to preserve the - // heap property + // heap property. doesn't actually use swap; uses hole void preserve_heap_property_up(size_type index) { + if (index == 0) return; // Do nothing on root + if (D_ARY_UP_GRAEHL) { + Value copyi=data[index]; + preserve_heap_property_up(copyi,index); + return; + } size_type orig_index = index; size_type num_levels_moved = 0; // The first loop just saves swaps that need to be done in order to avoid // aliasing issues in its search; there is a second loop that does the // necessary swap operations - if (index == 0) return; // Do nothing on root Value currently_being_moved = data[index]; distance_type currently_being_moved_dist = get(distance, currently_being_moved); @@ -252,10 +404,64 @@ namespace boost { verify_heap(); } + // From the root, swap elements (each one with its smallest child) if there - // are any parent-child pairs that violate the heap property + // are any parent-child pairs that violate the heap property. v is placed at data[i], but then pushed down (note: data[i] won't be read explicitly; it will instead be overwritten by percolation). this also means that v must be a copy of data[i] if it was already at i. + // e.g. v=data.back(), i=0, sz=data.size()-1 for pop(), implicitly swapping data[i], data.back(), and doing data.pop_back(), then adjusting from 0 down w/ swaps. updates index_in_heap for v. + inline void preserve_heap_property_down(Value const& currently_being_moved,size_type i,size_type heap_size) { + distance_type currently_being_moved_dist=get(distance,currently_being_moved); + Value* data_ptr = &data[0]; + size_type index = 0; // hole at index - currently_being_moved to be put here when we find the final hole spot + for (;;) { + size_type first_child_index = child(index, 0); + if (first_child_index >= heap_size) break; /* No children */ + Value* child_base_ptr = data_ptr + first_child_index; // using index of first_child_index+smallest_child_index because we hope optimizer will be smart enough to const-unroll a loop below if we do this. i think the optimizer would have gotten it even without our help (i.e. store root-relative index) + + // begin find best child index/distance + size_type smallest_child_index = 0; // don't add to base first_child_index every time we update which is smallest. + distance_type smallest_child_dist = get(distance, child_base_ptr[smallest_child_index]); +#undef D_ARY_MAYBE_IMPROVE_CHILD_I +#define D_ARY_MAYBE_IMPROVE_CHILD_I \ + distance_type i_dist = get(distance, child_base_ptr[i]); \ + if (compare(i_dist, smallest_child_dist)) { \ + smallest_child_index = i; \ + smallest_child_dist = i_dist; \ + } + if (first_child_index + Arity <= heap_size) { + // avoid repeated heap_size boundcheck (should test if this is really a speedup - instruction cache tradeoff - could use upperbound = min(Arity,heap_size-first_child_index) instead. but this optimizes to a fixed number of iterations (compile time known) so probably worth it + for (size_t i = 1; i < Arity; ++i) { + D_ARY_MAYBE_IMPROVE_CHILD_I + } + } else { + for (size_t i = 1,e=heap_size - first_child_index; i < e; ++i) { + D_ARY_MAYBE_IMPROVE_CHILD_I + } + } + //end: know best child + + if (compare(smallest_child_dist, currently_being_moved_dist)) { + // instead of swapping, move. + move_heap_element(child_base_ptr[smallest_child_index],index); // move up + index=first_child_index+smallest_child_index; // descend - hole is now here + } else { + move_heap_element(currently_being_moved,index); // finish "swap chain" by filling hole + break; + } + } + verify_heap(); + } + + inline void preserve_heap_property_down(size_type i) { + preserve_heap_property_down(data[i],i,data.size()); + } + void preserve_heap_property_down() { if (data.empty()) return; + if (D_ARY_DOWN_GRAEHL) { // this *should* be more efficient because i avoid swaps. + Value copy0=data[0]; + preserve_heap_property_down(copy0,0,data.size()); + return; + } size_type index = 0; Value currently_being_moved = data[0]; distance_type currently_being_moved_dist = @@ -269,22 +475,13 @@ namespace boost { size_type smallest_child_index = 0; distance_type smallest_child_dist = get(distance, child_base_ptr[smallest_child_index]); if (first_child_index + Arity <= heap_size) { - // Special case for a statically known loop count (common case) - for (size_t i = 1; i < Arity; ++i) { - Value i_value = child_base_ptr[i]; - distance_type i_dist = get(distance, i_value); - if (compare(i_dist, smallest_child_dist)) { - smallest_child_index = i; - smallest_child_dist = i_dist; - } + for (size_t i = 1; i < Arity; ++i) { // can be unrolled completely. + + D_ARY_MAYBE_IMPROVE_CHILD_I } } else { - for (size_t i = 1; i < heap_size - first_child_index; ++i) { - distance_type i_dist = get(distance, child_base_ptr[i]); - if (compare(i_dist, smallest_child_dist)) { - smallest_child_index = i; - smallest_child_dist = i_dist; - } + for (size_t i = 1,e=heap_size - first_child_index; i < e; ++i) { + D_ARY_MAYBE_IMPROVE_CHILD_I } } if (compare(smallest_child_dist, currently_being_moved_dist)) { @@ -300,6 +497,4 @@ namespace boost { }; -} // namespace boost - -#endif // BOOST_D_ARY_HEAP_HPP +#endif -- cgit v1.2.3