diff options
author | philblunsom <philblunsom@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-07-19 18:33:29 +0000 |
---|---|---|
committer | philblunsom <philblunsom@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-07-19 18:33:29 +0000 |
commit | 0e4d1fc42b5f3707d81549d11fad8fcd4f9af89f (patch) | |
tree | 3e29ca6353b9eb8c48a7d47fc1f0aff70932260b /gi/pyp-topics/src/mpi-pyp.hh | |
parent | 1eb8cf39863b166e87a3115c682e873dbe091582 (diff) |
Vaguely working distributed implementation. Hierarchical topics doesn't yet work correctly.
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@317 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp.hh')
-rw-r--r-- | gi/pyp-topics/src/mpi-pyp.hh | 277 |
1 files changed, 204 insertions, 73 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp.hh b/gi/pyp-topics/src/mpi-pyp.hh index 65358d20..0328b387 100644 --- a/gi/pyp-topics/src/mpi-pyp.hh +++ b/gi/pyp-topics/src/mpi-pyp.hh @@ -26,18 +26,21 @@ template <typename Dish, typename Hash=std::tr1::hash<Dish> > class MPIPYP : public PYP<Dish, Hash> { public: + typedef std::map<Dish, int> dish_delta_type; + MPIPYP(double a, double b, Hash hash=Hash()); - virtual int increment(Dish d, double p0); - virtual int decrement(Dish d); + template < typename Uniform01 > + int increment(Dish d, double p0, Uniform01& rnd); + template < typename Uniform01 > + int decrement(Dish d, Uniform01& rnd); void clear(); void reset_deltas(); - void synchronise(); + void synchronise(dish_delta_type* result); private: - typedef std::map<Dish, int> dish_delta_type; typedef std::map<Dish, typename PYP<Dish,Hash>::TableCounter> table_delta_type; dish_delta_type m_count_delta; @@ -49,8 +52,10 @@ MPIPYP<Dish,Hash>::MPIPYP(double a, double b, Hash h) : PYP<Dish,Hash>(a, b, 0, h) {} template <typename Dish, typename Hash> + template <typename Uniform01> int -MPIPYP<Dish,Hash>::increment(Dish dish, double p0) { +MPIPYP<Dish,Hash>::increment(Dish dish, double p0, Uniform01& rnd) { + //std::cerr << "-----INCREMENT DISH " << dish << std::endl; int delta = 0; int table_joined=-1; typename PYP<Dish,Hash>::TableCounter &tc = PYP<Dish,Hash>::_dish_tables[dish]; @@ -63,19 +68,23 @@ MPIPYP<Dish,Hash>::increment(Dish dish, double p0) { double& b = PYP<Dish,Hash>::_b; double pshare = (c > 0) ? (c - a*t) : 0.0; double pnew = (b + a*T) * p0; - assert (pshare >= 0.0); + if (pshare < 0.0) { + std::cerr << pshare << " " << c << " " << a << " " << t << std::endl; + assert(false); + } - if (mt_genrand_res53() < pnew / (pshare + pnew)) { + if (rnd() < pnew / (pshare + pnew)) { // assign to a new table tc.tables += 1; tc.table_histogram[1] += 1; PYP<Dish,Hash>::_total_tables += 1; delta = 1; + table_joined = 1; } else { // randomly assign to an existing table // remove constant denominator from inner loop - double r = mt_genrand_res53() * (c - a*t); + double r = rnd() * (c - a*t); for (std::map<int,int>::iterator hit = tc.table_histogram.begin(); hit != tc.table_histogram.end(); ++hit) { @@ -83,9 +92,9 @@ MPIPYP<Dish,Hash>::increment(Dish dish, double p0) { if (r <= 0) { tc.table_histogram[hit->first+1] += 1; hit->second -= 1; + table_joined = hit->first+1; if (hit->second == 0) tc.table_histogram.erase(hit); - table_joined = hit->first+1; break; } } @@ -112,32 +121,66 @@ MPIPYP<Dish,Hash>::increment(Dish dish, double p0) { m_count_delta.erase(customer_it); // increment the histogram bar for the table joined - if (!delta) { - assert (table_joined >= 0); - std::map<int,int> &histogram = m_table_delta[dish].table_histogram; - typename std::map<int,int>::iterator table_it; bool table_insert_result; - boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined,0)); - table_it->second += 1; - if (table_it->second == 0) histogram.erase(table_it); + /* + typename PYP<Dish,Hash>::TableCounter &delta_tc = m_table_delta[dish]; + + std::map<int,int> &histogram = delta_tc.table_histogram; + assert (table_joined > 0); + typename std::map<int,int>::iterator table_it; bool table_insert_result; + boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined,0)); + table_it->second += 1; + if (delta == 0) { // decrement the histogram bar for the table left - boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined-1,0)); - table_it->second -= 1; - if (table_it->second == 0) histogram.erase(table_it); + typename std::map<int,int>::iterator left_table_it; + boost::tie(left_table_it, table_insert_result) + = histogram.insert(std::make_pair(table_joined-1,0)); + left_table_it->second -= 1; + if (left_table_it->second == 0) histogram.erase(left_table_it); } - else { - typename PYP<Dish,Hash>::TableCounter &delta_tc = m_table_delta[dish]; - delta_tc.tables += 1; - delta_tc.table_histogram[1] += 1; + else delta_tc.tables += 1; + + if (table_it->second == 0) histogram.erase(table_it); + + //std::cerr << "Added (" << delta << ") " << dish << " to table " << table_joined << "\n"; + //std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n"; + //for (std::map<int,int>::const_iterator + // hit = delta_tc.table_histogram.begin(); + // hit != delta_tc.table_histogram.end(); ++hit) { + // std::cerr << " " << hit->second << " tables with " << hit->first << " customers." << std::endl; + //} + //std::cerr << "Added (" << delta << ") " << dish << " to table " << table_joined << "\n"; + //std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n"; + int x_num_customers=0, x_num_table=0; + for (std::map<int,int>::const_iterator + hit = delta_tc.table_histogram.begin(); + hit != delta_tc.table_histogram.end(); ++hit) { + x_num_table += hit->second; + x_num_customers += (hit->second*hit->first); + } + int tmp_c = PYP<Dish,Hash>::count(dish); + int tmp_t = PYP<Dish,Hash>::num_tables(dish); + assert (x_num_customers <= tmp_c); + assert (x_num_table <= tmp_t); + + if (delta_tc.table_histogram.empty()) { + assert (delta_tc.tables == 0); + m_table_delta.erase(dish); } + */ + + //PYP<Dish,Hash>::debug_info(std::cerr); + //std::cerr << " Dish " << dish << " has count " << PYP<Dish,Hash>::count(dish) << " tables " << PYP<Dish,Hash>::num_tables(dish) << std::endl; return delta; } template <typename Dish, typename Hash> + template <typename Uniform01> int -MPIPYP<Dish,Hash>::decrement(Dish dish) +MPIPYP<Dish,Hash>::decrement(Dish dish, Uniform01& rnd) { + //std::cerr << "-----DECREMENT DISH " << dish << std::endl; typename std::tr1::unordered_map<Dish, int>::iterator dcit = find(dish); //typename google::sparse_hash_map<Dish, int>::iterator dcit = find(dish); if (dcit == PYP<Dish,Hash>::end()) { @@ -156,7 +199,7 @@ MPIPYP<Dish,Hash>::decrement(Dish dish) } typename PYP<Dish,Hash>::TableCounter &tc = dtit->second; - double r = mt_genrand_res53() * PYP<Dish,Hash>::count(dish); + double r = rnd() * PYP<Dish,Hash>::count(dish); for (std::map<int,int>::iterator hit = tc.table_histogram.begin(); hit != tc.table_histogram.end(); ++hit) { r -= (hit->first * hit->second); @@ -191,26 +234,54 @@ MPIPYP<Dish,Hash>::decrement(Dish dish) PYP<Dish,Hash>::_dish_tables.erase(dtit); } + // MPI Delta processing typename dish_delta_type::iterator it; bool insert_result; boost::tie(it, insert_result) = m_count_delta.insert(std::make_pair(dish,0)); - it->second -= 1; + if (it->second == 0) m_count_delta.erase(it); - if (it->second == 0) - m_count_delta.erase(it); - - assert (table_left >= 0); + assert (table_left > 0); typename PYP<Dish,Hash>::TableCounter& delta_tc = m_table_delta[dish]; - if (table_left > 1) - delta_tc.table_histogram[table_left-1] += 1; + if (table_left > 1) { + std::map<int,int>::iterator tit; + boost::tie(tit, insert_result) = delta_tc.table_histogram.insert(std::make_pair(table_left-1,0)); + tit->second += 1; + if (tit->second == 0) delta_tc.table_histogram.erase(tit); + } else delta_tc.tables -= 1; - std::map<int,int>::iterator tit = delta_tc.table_histogram.find(table_left); - //assert (tit != delta_tc.table_histogram.end()); + std::map<int,int>::iterator tit; + boost::tie(tit, insert_result) = delta_tc.table_histogram.insert(std::make_pair(table_left,0)); tit->second -= 1; if (tit->second == 0) delta_tc.table_histogram.erase(tit); + // std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n"; + // for (std::map<int,int>::const_iterator + // hit = delta_tc.table_histogram.begin(); + // hit != delta_tc.table_histogram.end(); ++hit) { + // std::cerr << " " << hit->second << " tables with " << hit->first << " customers." << std::endl; + // } + int x_num_customers=0, x_num_table=0; + for (std::map<int,int>::const_iterator + hit = delta_tc.table_histogram.begin(); + hit != delta_tc.table_histogram.end(); ++hit) { + x_num_table += hit->second; + x_num_customers += (hit->second*hit->first); + } + int tmp_c = PYP<Dish,Hash>::count(dish); + int tmp_t = PYP<Dish,Hash>::num_tables(dish); + assert (x_num_customers <= tmp_c); + assert (x_num_table <= tmp_t); + + if (delta_tc.table_histogram.empty()) { + // std::cerr << " DELETING " << dish << std::endl; + assert (delta_tc.tables == 0); + m_table_delta.erase(dish); + } + + //PYP<Dish,Hash>::debug_info(std::cerr); + //std::cerr << " Dish " << dish << " has count " << PYP<Dish,Hash>::count(dish) << " tables " << PYP<Dish,Hash>::num_tables(dish) << std::endl; return delta; } @@ -238,6 +309,16 @@ struct sum_maps { } }; +template <typename Dish> +struct subtract_maps { + typedef std::map<Dish,int> map_type; + map_type& operator() (map_type& l, map_type const & r) const { + for (typename map_type::const_iterator it=r.begin(); it != r.end(); it++) + l[it->first] -= it->second; + return l; + } +}; + // Needed Boost definitions namespace boost { namespace mpi { @@ -255,60 +336,110 @@ namespace boost { } // namespace serialization } // namespace boost +template <typename A, typename B, typename C> +struct triple { + triple() {} + triple(const A& a, const B& b, const C& c) : first(a), second(b), third(c) {} + A first; + B second; + C third; + + template<class Archive> + void serialize(Archive &ar, const unsigned int version){ + ar & first; + ar & second; + ar & third; + } +}; + template <typename Dish, typename Hash> void -MPIPYP<Dish,Hash>::synchronise() { +MPIPYP<Dish,Hash>::synchronise(dish_delta_type* result) { boost::mpi::communicator world; - int rank = world.rank(), size = world.size(); + //int rank = world.rank(), size = world.size(); + boost::mpi::all_reduce(world, m_count_delta, *result, sum_maps<Dish>()); + subtract_maps<Dish>()(*result, m_count_delta); + +/* // communicate the customer count deltas - dish_delta_type global_dish_delta; // the “merged” map + dish_delta_type global_dish_delta; boost::mpi::all_reduce(world, m_count_delta, global_dish_delta, sum_maps<Dish>()); // update this restaurant for (typename dish_delta_type::const_iterator it=global_dish_delta.begin(); it != global_dish_delta.end(); ++it) { - std::tr1::unordered_map<Dish,int,Hash>::operator[](it->first) += (it->second - m_count_delta[it->first]); + int global_delta = it->second - m_count_delta[it->first]; + if (global_delta == 0) continue; + typename std::tr1::unordered_map<Dish,int,Hash>::iterator dit; bool inserted; + boost::tie(dit, inserted) + = std::tr1::unordered_map<Dish,int,Hash>::insert(std::make_pair(it->first, 0)); + dit->second += global_delta; + assert(dit->second >= 0); + if (dit->second == 0) { + std::tr1::unordered_map<Dish,int,Hash>::erase(dit); + } + PYP<Dish,Hash>::_total_customers += (it->second - m_count_delta[it->first]); - //std::cerr << "Process " << rank << " adding " << (it->second - m_count_delta[it->first]) << " customers." << std::endl; + int tmp = PYP<Dish,Hash>::_total_customers; + assert(tmp >= 0); + //std::cerr << "Process " << rank << " adding " << (it->second - m_count_delta[it->first]) << " of customer " << it->first << std::endl; } - +*/ +/* // communicate the table count deltas -// for (int process = 0; process < size; ++process) { -// if (rank == process) { -// // broadcast deltas -// std::cerr << " -- Rank " << rank << " broadcasting -- " << std::endl; -// -// boost::mpi::broadcast(world, m_table_delta, process); -// -// std::cerr << " -- Rank " << rank << " done broadcasting -- " << std::endl; -// } -// else { -// std::cerr << " -- Rank " << rank << " receiving -- " << std::endl; -// // receive deltas -// table_delta_type recv_table_delta; -// -// boost::mpi::broadcast(world, recv_table_delta, process); -// -// std::cerr << " -- Rank " << rank << " done receiving -- " << std::endl; -// -// for (typename table_delta_type::const_iterator dish_it=recv_table_delta.begin(); -// dish_it != recv_table_delta.end(); ++dish_it) { -// typename PYP<Dish,Hash>::TableCounter &tc = PYP<Dish,Hash>::_dish_tables[dish_it->first]; -// -// for (std::map<int,int>::const_iterator it=dish_it->second.table_histogram.begin(); -// it != dish_it->second.table_histogram.end(); ++it) { -// tc.table_histogram[it->first] += it->second; -// } -// tc.tables += dish_it->second.tables; -// PYP<Dish,Hash>::_total_tables += dish_it->second.tables; -// } -// } -// } -// std::cerr << " -- Done Reducing -- " << std::endl; + for (int process = 0; process < size; ++process) { + typename std::vector< triple<Dish, int, int> > message; + if (rank == process) { + // broadcast deltas + for (typename table_delta_type::const_iterator dish_it=m_table_delta.begin(); + dish_it != m_table_delta.end(); ++dish_it) { + //assert (dish_it->second.tables > 0); + for (std::map<int,int>::const_iterator it=dish_it->second.table_histogram.begin(); + it != dish_it->second.table_histogram.end(); ++it) { + triple<Dish, int, int> m(dish_it->first, it->first, it->second); + message.push_back(m); + } + // append a special message with the total table delta for this dish + triple<Dish, int, int> m(dish_it->first, -1, dish_it->second.tables); + message.push_back(m); + } + boost::mpi::broadcast(world, message, process); + } + else { + // receive deltas + boost::mpi::broadcast(world, message, process); + for (typename std::vector< triple<Dish, int, int> >::const_iterator it=message.begin(); it != message.end(); ++it) { + typename PYP<Dish,Hash>::TableCounter& tc = PYP<Dish,Hash>::_dish_tables[it->first]; + if (it->second >= 0) { + std::map<int,int>::iterator tit; bool inserted; + boost::tie(tit, inserted) = tc.table_histogram.insert(std::make_pair(it->second, 0)); + tit->second += it->third; + if (tit->second < 0) { + std::cerr << tit->first << " " << tit->second << " " << it->first << " " << it->second << " " << it->third << std::endl; + assert(tit->second >= 0); + } + if (tit->second == 0) { + tc.table_histogram.erase(tit); + } + } + else { + tc.tables += it->third; + PYP<Dish,Hash>::_total_tables += it->third; + assert(tc.tables >= 0); + if (tc.tables == 0) assert(tc.table_histogram.empty()); + if (tc.table_histogram.empty()) { + assert (tc.tables == 0); + PYP<Dish,Hash>::_dish_tables.erase(it->first); + } + } + } + } + } +*/ - reset_deltas(); +// reset_deltas(); } #endif |