summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src/mpi-pyp.hh
diff options
context:
space:
mode:
authorphilblunsom <philblunsom@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-19 18:33:29 +0000
committerphilblunsom <philblunsom@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-07-19 18:33:29 +0000
commit0e4d1fc42b5f3707d81549d11fad8fcd4f9af89f (patch)
tree3e29ca6353b9eb8c48a7d47fc1f0aff70932260b /gi/pyp-topics/src/mpi-pyp.hh
parent1eb8cf39863b166e87a3115c682e873dbe091582 (diff)
Vaguely working distributed implementation. Hierarchical topics doesn't yet work correctly.
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@317 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp.hh')
-rw-r--r--gi/pyp-topics/src/mpi-pyp.hh277
1 files changed, 204 insertions, 73 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp.hh b/gi/pyp-topics/src/mpi-pyp.hh
index 65358d20..0328b387 100644
--- a/gi/pyp-topics/src/mpi-pyp.hh
+++ b/gi/pyp-topics/src/mpi-pyp.hh
@@ -26,18 +26,21 @@
template <typename Dish, typename Hash=std::tr1::hash<Dish> >
class MPIPYP : public PYP<Dish, Hash> {
public:
+ typedef std::map<Dish, int> dish_delta_type;
+
MPIPYP(double a, double b, Hash hash=Hash());
- virtual int increment(Dish d, double p0);
- virtual int decrement(Dish d);
+ template < typename Uniform01 >
+ int increment(Dish d, double p0, Uniform01& rnd);
+ template < typename Uniform01 >
+ int decrement(Dish d, Uniform01& rnd);
void clear();
void reset_deltas();
- void synchronise();
+ void synchronise(dish_delta_type* result);
private:
- typedef std::map<Dish, int> dish_delta_type;
typedef std::map<Dish, typename PYP<Dish,Hash>::TableCounter> table_delta_type;
dish_delta_type m_count_delta;
@@ -49,8 +52,10 @@ MPIPYP<Dish,Hash>::MPIPYP(double a, double b, Hash h)
: PYP<Dish,Hash>(a, b, 0, h) {}
template <typename Dish, typename Hash>
+ template <typename Uniform01>
int
-MPIPYP<Dish,Hash>::increment(Dish dish, double p0) {
+MPIPYP<Dish,Hash>::increment(Dish dish, double p0, Uniform01& rnd) {
+ //std::cerr << "-----INCREMENT DISH " << dish << std::endl;
int delta = 0;
int table_joined=-1;
typename PYP<Dish,Hash>::TableCounter &tc = PYP<Dish,Hash>::_dish_tables[dish];
@@ -63,19 +68,23 @@ MPIPYP<Dish,Hash>::increment(Dish dish, double p0) {
double& b = PYP<Dish,Hash>::_b;
double pshare = (c > 0) ? (c - a*t) : 0.0;
double pnew = (b + a*T) * p0;
- assert (pshare >= 0.0);
+ if (pshare < 0.0) {
+ std::cerr << pshare << " " << c << " " << a << " " << t << std::endl;
+ assert(false);
+ }
- if (mt_genrand_res53() < pnew / (pshare + pnew)) {
+ if (rnd() < pnew / (pshare + pnew)) {
// assign to a new table
tc.tables += 1;
tc.table_histogram[1] += 1;
PYP<Dish,Hash>::_total_tables += 1;
delta = 1;
+ table_joined = 1;
}
else {
// randomly assign to an existing table
// remove constant denominator from inner loop
- double r = mt_genrand_res53() * (c - a*t);
+ double r = rnd() * (c - a*t);
for (std::map<int,int>::iterator
hit = tc.table_histogram.begin();
hit != tc.table_histogram.end(); ++hit) {
@@ -83,9 +92,9 @@ MPIPYP<Dish,Hash>::increment(Dish dish, double p0) {
if (r <= 0) {
tc.table_histogram[hit->first+1] += 1;
hit->second -= 1;
+ table_joined = hit->first+1;
if (hit->second == 0)
tc.table_histogram.erase(hit);
- table_joined = hit->first+1;
break;
}
}
@@ -112,32 +121,66 @@ MPIPYP<Dish,Hash>::increment(Dish dish, double p0) {
m_count_delta.erase(customer_it);
// increment the histogram bar for the table joined
- if (!delta) {
- assert (table_joined >= 0);
- std::map<int,int> &histogram = m_table_delta[dish].table_histogram;
- typename std::map<int,int>::iterator table_it; bool table_insert_result;
- boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined,0));
- table_it->second += 1;
- if (table_it->second == 0) histogram.erase(table_it);
+ /*
+ typename PYP<Dish,Hash>::TableCounter &delta_tc = m_table_delta[dish];
+
+ std::map<int,int> &histogram = delta_tc.table_histogram;
+ assert (table_joined > 0);
+ typename std::map<int,int>::iterator table_it; bool table_insert_result;
+ boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined,0));
+ table_it->second += 1;
+ if (delta == 0) {
// decrement the histogram bar for the table left
- boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined-1,0));
- table_it->second -= 1;
- if (table_it->second == 0) histogram.erase(table_it);
+ typename std::map<int,int>::iterator left_table_it;
+ boost::tie(left_table_it, table_insert_result)
+ = histogram.insert(std::make_pair(table_joined-1,0));
+ left_table_it->second -= 1;
+ if (left_table_it->second == 0) histogram.erase(left_table_it);
}
- else {
- typename PYP<Dish,Hash>::TableCounter &delta_tc = m_table_delta[dish];
- delta_tc.tables += 1;
- delta_tc.table_histogram[1] += 1;
+ else delta_tc.tables += 1;
+
+ if (table_it->second == 0) histogram.erase(table_it);
+
+ //std::cerr << "Added (" << delta << ") " << dish << " to table " << table_joined << "\n";
+ //std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n";
+ //for (std::map<int,int>::const_iterator
+ // hit = delta_tc.table_histogram.begin();
+ // hit != delta_tc.table_histogram.end(); ++hit) {
+ // std::cerr << " " << hit->second << " tables with " << hit->first << " customers." << std::endl;
+ //}
+ //std::cerr << "Added (" << delta << ") " << dish << " to table " << table_joined << "\n";
+ //std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n";
+ int x_num_customers=0, x_num_table=0;
+ for (std::map<int,int>::const_iterator
+ hit = delta_tc.table_histogram.begin();
+ hit != delta_tc.table_histogram.end(); ++hit) {
+ x_num_table += hit->second;
+ x_num_customers += (hit->second*hit->first);
+ }
+ int tmp_c = PYP<Dish,Hash>::count(dish);
+ int tmp_t = PYP<Dish,Hash>::num_tables(dish);
+ assert (x_num_customers <= tmp_c);
+ assert (x_num_table <= tmp_t);
+
+ if (delta_tc.table_histogram.empty()) {
+ assert (delta_tc.tables == 0);
+ m_table_delta.erase(dish);
}
+ */
+
+ //PYP<Dish,Hash>::debug_info(std::cerr);
+ //std::cerr << " Dish " << dish << " has count " << PYP<Dish,Hash>::count(dish) << " tables " << PYP<Dish,Hash>::num_tables(dish) << std::endl;
return delta;
}
template <typename Dish, typename Hash>
+ template <typename Uniform01>
int
-MPIPYP<Dish,Hash>::decrement(Dish dish)
+MPIPYP<Dish,Hash>::decrement(Dish dish, Uniform01& rnd)
{
+ //std::cerr << "-----DECREMENT DISH " << dish << std::endl;
typename std::tr1::unordered_map<Dish, int>::iterator dcit = find(dish);
//typename google::sparse_hash_map<Dish, int>::iterator dcit = find(dish);
if (dcit == PYP<Dish,Hash>::end()) {
@@ -156,7 +199,7 @@ MPIPYP<Dish,Hash>::decrement(Dish dish)
}
typename PYP<Dish,Hash>::TableCounter &tc = dtit->second;
- double r = mt_genrand_res53() * PYP<Dish,Hash>::count(dish);
+ double r = rnd() * PYP<Dish,Hash>::count(dish);
for (std::map<int,int>::iterator hit = tc.table_histogram.begin();
hit != tc.table_histogram.end(); ++hit) {
r -= (hit->first * hit->second);
@@ -191,26 +234,54 @@ MPIPYP<Dish,Hash>::decrement(Dish dish)
PYP<Dish,Hash>::_dish_tables.erase(dtit);
}
+ // MPI Delta processing
typename dish_delta_type::iterator it;
bool insert_result;
boost::tie(it, insert_result) = m_count_delta.insert(std::make_pair(dish,0));
-
it->second -= 1;
+ if (it->second == 0) m_count_delta.erase(it);
- if (it->second == 0)
- m_count_delta.erase(it);
-
- assert (table_left >= 0);
+ assert (table_left > 0);
typename PYP<Dish,Hash>::TableCounter& delta_tc = m_table_delta[dish];
- if (table_left > 1)
- delta_tc.table_histogram[table_left-1] += 1;
+ if (table_left > 1) {
+ std::map<int,int>::iterator tit;
+ boost::tie(tit, insert_result) = delta_tc.table_histogram.insert(std::make_pair(table_left-1,0));
+ tit->second += 1;
+ if (tit->second == 0) delta_tc.table_histogram.erase(tit);
+ }
else delta_tc.tables -= 1;
- std::map<int,int>::iterator tit = delta_tc.table_histogram.find(table_left);
- //assert (tit != delta_tc.table_histogram.end());
+ std::map<int,int>::iterator tit;
+ boost::tie(tit, insert_result) = delta_tc.table_histogram.insert(std::make_pair(table_left,0));
tit->second -= 1;
if (tit->second == 0) delta_tc.table_histogram.erase(tit);
+ // std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n";
+ // for (std::map<int,int>::const_iterator
+ // hit = delta_tc.table_histogram.begin();
+ // hit != delta_tc.table_histogram.end(); ++hit) {
+ // std::cerr << " " << hit->second << " tables with " << hit->first << " customers." << std::endl;
+ // }
+ int x_num_customers=0, x_num_table=0;
+ for (std::map<int,int>::const_iterator
+ hit = delta_tc.table_histogram.begin();
+ hit != delta_tc.table_histogram.end(); ++hit) {
+ x_num_table += hit->second;
+ x_num_customers += (hit->second*hit->first);
+ }
+ int tmp_c = PYP<Dish,Hash>::count(dish);
+ int tmp_t = PYP<Dish,Hash>::num_tables(dish);
+ assert (x_num_customers <= tmp_c);
+ assert (x_num_table <= tmp_t);
+
+ if (delta_tc.table_histogram.empty()) {
+ // std::cerr << " DELETING " << dish << std::endl;
+ assert (delta_tc.tables == 0);
+ m_table_delta.erase(dish);
+ }
+
+ //PYP<Dish,Hash>::debug_info(std::cerr);
+ //std::cerr << " Dish " << dish << " has count " << PYP<Dish,Hash>::count(dish) << " tables " << PYP<Dish,Hash>::num_tables(dish) << std::endl;
return delta;
}
@@ -238,6 +309,16 @@ struct sum_maps {
}
};
+template <typename Dish>
+struct subtract_maps {
+ typedef std::map<Dish,int> map_type;
+ map_type& operator() (map_type& l, map_type const & r) const {
+ for (typename map_type::const_iterator it=r.begin(); it != r.end(); it++)
+ l[it->first] -= it->second;
+ return l;
+ }
+};
+
// Needed Boost definitions
namespace boost {
namespace mpi {
@@ -255,60 +336,110 @@ namespace boost {
} // namespace serialization
} // namespace boost
+template <typename A, typename B, typename C>
+struct triple {
+ triple() {}
+ triple(const A& a, const B& b, const C& c) : first(a), second(b), third(c) {}
+ A first;
+ B second;
+ C third;
+
+ template<class Archive>
+ void serialize(Archive &ar, const unsigned int version){
+ ar & first;
+ ar & second;
+ ar & third;
+ }
+};
+
template <typename Dish, typename Hash>
void
-MPIPYP<Dish,Hash>::synchronise() {
+MPIPYP<Dish,Hash>::synchronise(dish_delta_type* result) {
boost::mpi::communicator world;
- int rank = world.rank(), size = world.size();
+ //int rank = world.rank(), size = world.size();
+ boost::mpi::all_reduce(world, m_count_delta, *result, sum_maps<Dish>());
+ subtract_maps<Dish>()(*result, m_count_delta);
+
+/*
// communicate the customer count deltas
- dish_delta_type global_dish_delta; // the “merged” map
+ dish_delta_type global_dish_delta;
boost::mpi::all_reduce(world, m_count_delta, global_dish_delta, sum_maps<Dish>());
// update this restaurant
for (typename dish_delta_type::const_iterator it=global_dish_delta.begin();
it != global_dish_delta.end(); ++it) {
- std::tr1::unordered_map<Dish,int,Hash>::operator[](it->first) += (it->second - m_count_delta[it->first]);
+ int global_delta = it->second - m_count_delta[it->first];
+ if (global_delta == 0) continue;
+ typename std::tr1::unordered_map<Dish,int,Hash>::iterator dit; bool inserted;
+ boost::tie(dit, inserted)
+ = std::tr1::unordered_map<Dish,int,Hash>::insert(std::make_pair(it->first, 0));
+ dit->second += global_delta;
+ assert(dit->second >= 0);
+ if (dit->second == 0) {
+ std::tr1::unordered_map<Dish,int,Hash>::erase(dit);
+ }
+
PYP<Dish,Hash>::_total_customers += (it->second - m_count_delta[it->first]);
- //std::cerr << "Process " << rank << " adding " << (it->second - m_count_delta[it->first]) << " customers." << std::endl;
+ int tmp = PYP<Dish,Hash>::_total_customers;
+ assert(tmp >= 0);
+ //std::cerr << "Process " << rank << " adding " << (it->second - m_count_delta[it->first]) << " of customer " << it->first << std::endl;
}
-
+*/
+/*
// communicate the table count deltas
-// for (int process = 0; process < size; ++process) {
-// if (rank == process) {
-// // broadcast deltas
-// std::cerr << " -- Rank " << rank << " broadcasting -- " << std::endl;
-//
-// boost::mpi::broadcast(world, m_table_delta, process);
-//
-// std::cerr << " -- Rank " << rank << " done broadcasting -- " << std::endl;
-// }
-// else {
-// std::cerr << " -- Rank " << rank << " receiving -- " << std::endl;
-// // receive deltas
-// table_delta_type recv_table_delta;
-//
-// boost::mpi::broadcast(world, recv_table_delta, process);
-//
-// std::cerr << " -- Rank " << rank << " done receiving -- " << std::endl;
-//
-// for (typename table_delta_type::const_iterator dish_it=recv_table_delta.begin();
-// dish_it != recv_table_delta.end(); ++dish_it) {
-// typename PYP<Dish,Hash>::TableCounter &tc = PYP<Dish,Hash>::_dish_tables[dish_it->first];
-//
-// for (std::map<int,int>::const_iterator it=dish_it->second.table_histogram.begin();
-// it != dish_it->second.table_histogram.end(); ++it) {
-// tc.table_histogram[it->first] += it->second;
-// }
-// tc.tables += dish_it->second.tables;
-// PYP<Dish,Hash>::_total_tables += dish_it->second.tables;
-// }
-// }
-// }
-// std::cerr << " -- Done Reducing -- " << std::endl;
+ for (int process = 0; process < size; ++process) {
+ typename std::vector< triple<Dish, int, int> > message;
+ if (rank == process) {
+ // broadcast deltas
+ for (typename table_delta_type::const_iterator dish_it=m_table_delta.begin();
+ dish_it != m_table_delta.end(); ++dish_it) {
+ //assert (dish_it->second.tables > 0);
+ for (std::map<int,int>::const_iterator it=dish_it->second.table_histogram.begin();
+ it != dish_it->second.table_histogram.end(); ++it) {
+ triple<Dish, int, int> m(dish_it->first, it->first, it->second);
+ message.push_back(m);
+ }
+ // append a special message with the total table delta for this dish
+ triple<Dish, int, int> m(dish_it->first, -1, dish_it->second.tables);
+ message.push_back(m);
+ }
+ boost::mpi::broadcast(world, message, process);
+ }
+ else {
+ // receive deltas
+ boost::mpi::broadcast(world, message, process);
+ for (typename std::vector< triple<Dish, int, int> >::const_iterator it=message.begin(); it != message.end(); ++it) {
+ typename PYP<Dish,Hash>::TableCounter& tc = PYP<Dish,Hash>::_dish_tables[it->first];
+ if (it->second >= 0) {
+ std::map<int,int>::iterator tit; bool inserted;
+ boost::tie(tit, inserted) = tc.table_histogram.insert(std::make_pair(it->second, 0));
+ tit->second += it->third;
+ if (tit->second < 0) {
+ std::cerr << tit->first << " " << tit->second << " " << it->first << " " << it->second << " " << it->third << std::endl;
+ assert(tit->second >= 0);
+ }
+ if (tit->second == 0) {
+ tc.table_histogram.erase(tit);
+ }
+ }
+ else {
+ tc.tables += it->third;
+ PYP<Dish,Hash>::_total_tables += it->third;
+ assert(tc.tables >= 0);
+ if (tc.tables == 0) assert(tc.table_histogram.empty());
+ if (tc.table_histogram.empty()) {
+ assert (tc.tables == 0);
+ PYP<Dish,Hash>::_dish_tables.erase(it->first);
+ }
+ }
+ }
+ }
+ }
+*/
- reset_deltas();
+// reset_deltas();
}
#endif