summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src/mpi-pyp.hh
diff options
context:
space:
mode:
authorChris Dyer <cdyer@cs.cmu.edu>2012-10-11 14:06:32 -0400
committerChris Dyer <cdyer@cs.cmu.edu>2012-10-11 14:06:32 -0400
commit9339c80d465545aec5a6dccfef7c83ca715bf11f (patch)
tree64c56d558331edad1db3832018c80e799551c39a /gi/pyp-topics/src/mpi-pyp.hh
parent438dac41810b7c69fa10203ac5130d20efa2da9f (diff)
parentafd7da3b2338661657ad0c4e9eec681e014d37bf (diff)
Merge branch 'master' of https://github.com/redpony/cdec
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp.hh')
-rw-r--r--gi/pyp-topics/src/mpi-pyp.hh447
1 files changed, 0 insertions, 447 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp.hh b/gi/pyp-topics/src/mpi-pyp.hh
deleted file mode 100644
index c2341b9e..00000000
--- a/gi/pyp-topics/src/mpi-pyp.hh
+++ /dev/null
@@ -1,447 +0,0 @@
-#ifndef _mpipyp_hh
-#define _mpipyp_hh
-
-#include <math.h>
-#include <map>
-#include <tr1/unordered_map>
-//#include <google/sparse_hash_map>
-
-#include <boost/random/uniform_real.hpp>
-#include <boost/random/variate_generator.hpp>
-#include <boost/random/mersenne_twister.hpp>
-#include <boost/tuple/tuple.hpp>
-#include <boost/serialization/map.hpp>
-#include <boost/mpi.hpp>
-#include <boost/mpi/environment.hpp>
-#include <boost/mpi/communicator.hpp>
-#include <boost/mpi/operations.hpp>
-
-
-#include "pyp.hh"
-
-//
-// Pitman-Yor process with customer and table tracking
-//
-
-template <typename Dish, typename Hash=std::tr1::hash<Dish> >
-class MPIPYP : public PYP<Dish, Hash> {
-public:
- typedef std::map<Dish, int> dish_delta_type;
-
- MPIPYP(double a, double b, Hash hash=Hash());
-
- template < typename Uniform01 >
- int increment(Dish d, double p0, Uniform01& rnd);
- template < typename Uniform01 >
- int decrement(Dish d, Uniform01& rnd);
-
- void clear();
- void reset_deltas();
-
- void synchronise(dish_delta_type* result);
-
-private:
- typedef std::map<Dish, typename PYP<Dish,Hash>::TableCounter> table_delta_type;
-
- dish_delta_type m_count_delta;
- table_delta_type m_table_delta;
-};
-
-template <typename Dish, typename Hash>
-MPIPYP<Dish,Hash>::MPIPYP(double a, double b, Hash h)
-: PYP<Dish,Hash>(a, b, 0, h) {}
-
-template <typename Dish, typename Hash>
- template <typename Uniform01>
-int
-MPIPYP<Dish,Hash>::increment(Dish dish, double p0, Uniform01& rnd) {
- //std::cerr << "-----INCREMENT DISH " << dish << std::endl;
- int delta = 0;
- int table_joined=-1;
- typename PYP<Dish,Hash>::TableCounter &tc = PYP<Dish,Hash>::_dish_tables[dish];
-
- // seated on a new or existing table?
- int c = PYP<Dish,Hash>::count(dish);
- int t = PYP<Dish,Hash>::num_tables(dish);
- int T = PYP<Dish,Hash>::num_tables();
- double& a = PYP<Dish,Hash>::_a;
- double& b = PYP<Dish,Hash>::_b;
- double pshare = (c > 0) ? (c - a*t) : 0.0;
- double pnew = (b + a*T) * p0;
- if (pshare < 0.0) {
- std::cerr << pshare << " " << c << " " << a << " " << t << std::endl;
- assert(false);
- }
-
- if (rnd() < pnew / (pshare + pnew)) {
- // assign to a new table
- tc.tables += 1;
- tc.table_histogram[1] += 1;
- PYP<Dish,Hash>::_total_tables += 1;
- delta = 1;
- table_joined = 1;
- }
- else {
- // randomly assign to an existing table
- // remove constant denominator from inner loop
- double r = rnd() * (c - a*t);
- for (std::map<int,int>::iterator
- hit = tc.table_histogram.begin();
- hit != tc.table_histogram.end(); ++hit) {
- r -= ((hit->first - a) * hit->second);
- if (r <= 0) {
- tc.table_histogram[hit->first+1] += 1;
- hit->second -= 1;
- table_joined = hit->first+1;
- if (hit->second == 0)
- tc.table_histogram.erase(hit);
- break;
- }
- }
- if (r > 0) {
- std::cerr << r << " " << c << " " << a << " " << t << std::endl;
- assert(false);
- }
- delta = 0;
- }
-
- std::tr1::unordered_map<Dish,int,Hash>::operator[](dish) += 1;
- //google::sparse_hash_map<Dish,int,Hash>::operator[](dish) += 1;
- PYP<Dish,Hash>::_total_customers += 1;
-
- // MPI Delta handling
- // track the customer entering
- typename dish_delta_type::iterator customer_it;
- bool customer_insert_result;
- boost::tie(customer_it, customer_insert_result)
- = m_count_delta.insert(std::make_pair(dish,0));
-
- customer_it->second += 1;
- if (customer_it->second == 0)
- m_count_delta.erase(customer_it);
-
- // increment the histogram bar for the table joined
- /*
- typename PYP<Dish,Hash>::TableCounter &delta_tc = m_table_delta[dish];
-
- std::map<int,int> &histogram = delta_tc.table_histogram;
- assert (table_joined > 0);
-
- typename std::map<int,int>::iterator table_it; bool table_insert_result;
- boost::tie(table_it, table_insert_result) = histogram.insert(std::make_pair(table_joined,0));
- table_it->second += 1;
- if (delta == 0) {
- // decrement the histogram bar for the table left
- typename std::map<int,int>::iterator left_table_it;
- boost::tie(left_table_it, table_insert_result)
- = histogram.insert(std::make_pair(table_joined-1,0));
- left_table_it->second -= 1;
- if (left_table_it->second == 0) histogram.erase(left_table_it);
- }
- else delta_tc.tables += 1;
-
- if (table_it->second == 0) histogram.erase(table_it);
-
- //std::cerr << "Added (" << delta << ") " << dish << " to table " << table_joined << "\n";
- //std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n";
- //for (std::map<int,int>::const_iterator
- // hit = delta_tc.table_histogram.begin();
- // hit != delta_tc.table_histogram.end(); ++hit) {
- // std::cerr << " " << hit->second << " tables with " << hit->first << " customers." << std::endl;
- //}
- //std::cerr << "Added (" << delta << ") " << dish << " to table " << table_joined << "\n";
- //std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n";
- int x_num_customers=0, x_num_table=0;
- for (std::map<int,int>::const_iterator
- hit = delta_tc.table_histogram.begin();
- hit != delta_tc.table_histogram.end(); ++hit) {
- x_num_table += hit->second;
- x_num_customers += (hit->second*hit->first);
- }
- int tmp_c = PYP<Dish,Hash>::count(dish);
- int tmp_t = PYP<Dish,Hash>::num_tables(dish);
- assert (x_num_customers <= tmp_c);
- assert (x_num_table <= tmp_t);
-
- if (delta_tc.table_histogram.empty()) {
- assert (delta_tc.tables == 0);
- m_table_delta.erase(dish);
- }
- */
-
- //PYP<Dish,Hash>::debug_info(std::cerr);
- //std::cerr << " Dish " << dish << " has count " << PYP<Dish,Hash>::count(dish) << " tables " << PYP<Dish,Hash>::num_tables(dish) << std::endl;
-
- return delta;
-}
-
-template <typename Dish, typename Hash>
- template <typename Uniform01>
-int
-MPIPYP<Dish,Hash>::decrement(Dish dish, Uniform01& rnd)
-{
- //std::cerr << "-----DECREMENT DISH " << dish << std::endl;
- typename std::tr1::unordered_map<Dish, int>::iterator dcit = find(dish);
- //typename google::sparse_hash_map<Dish, int>::iterator dcit = find(dish);
- if (dcit == PYP<Dish,Hash>::end()) {
- std::cerr << dish << std::endl;
- assert(false);
- }
-
- int delta = 0, table_left=-1;
-
- typename std::tr1::unordered_map<Dish, typename PYP<Dish,Hash>::TableCounter>::iterator dtit
- = PYP<Dish,Hash>::_dish_tables.find(dish);
- //typename google::sparse_hash_map<Dish, TableCounter>::iterator dtit = _dish_tables.find(dish);
- if (dtit == PYP<Dish,Hash>::_dish_tables.end()) {
- std::cerr << dish << std::endl;
- assert(false);
- }
- typename PYP<Dish,Hash>::TableCounter &tc = dtit->second;
-
- double r = rnd() * PYP<Dish,Hash>::count(dish);
- for (std::map<int,int>::iterator hit = tc.table_histogram.begin();
- hit != tc.table_histogram.end(); ++hit) {
- r -= (hit->first * hit->second);
- if (r <= 0) {
- table_left = hit->first;
- if (hit->first > 1) {
- tc.table_histogram[hit->first-1] += 1;
- }
- else {
- delta = -1;
- tc.tables -= 1;
- PYP<Dish,Hash>::_total_tables -= 1;
- }
-
- hit->second -= 1;
- if (hit->second == 0) tc.table_histogram.erase(hit);
- break;
- }
- }
- if (r > 0) {
- std::cerr << r << " " << PYP<Dish,Hash>::count(dish) << " " << PYP<Dish,Hash>::_a << " "
- << PYP<Dish,Hash>::num_tables(dish) << std::endl;
- assert(false);
- }
-
- // remove the customer
- dcit->second -= 1;
- PYP<Dish,Hash>::_total_customers -= 1;
- assert(dcit->second >= 0);
- if (dcit->second == 0) {
- PYP<Dish,Hash>::erase(dcit);
- PYP<Dish,Hash>::_dish_tables.erase(dtit);
- }
-
- // MPI Delta processing
- typename dish_delta_type::iterator it;
- bool insert_result;
- boost::tie(it, insert_result) = m_count_delta.insert(std::make_pair(dish,0));
- it->second -= 1;
- if (it->second == 0) m_count_delta.erase(it);
-
- assert (table_left > 0);
- typename PYP<Dish,Hash>::TableCounter& delta_tc = m_table_delta[dish];
- if (table_left > 1) {
- std::map<int,int>::iterator tit;
- boost::tie(tit, insert_result) = delta_tc.table_histogram.insert(std::make_pair(table_left-1,0));
- tit->second += 1;
- if (tit->second == 0) delta_tc.table_histogram.erase(tit);
- }
- else delta_tc.tables -= 1;
-
- std::map<int,int>::iterator tit;
- boost::tie(tit, insert_result) = delta_tc.table_histogram.insert(std::make_pair(table_left,0));
- tit->second -= 1;
- if (tit->second == 0) delta_tc.table_histogram.erase(tit);
-
- // std::cerr << "Dish " << dish << " has " << count(dish) << " customers, and is sitting at " << PYP<Dish,Hash>::num_tables(dish) << " tables.\n";
- // for (std::map<int,int>::const_iterator
- // hit = delta_tc.table_histogram.begin();
- // hit != delta_tc.table_histogram.end(); ++hit) {
- // std::cerr << " " << hit->second << " tables with " << hit->first << " customers." << std::endl;
- // }
- int x_num_customers=0, x_num_table=0;
- for (std::map<int,int>::const_iterator
- hit = delta_tc.table_histogram.begin();
- hit != delta_tc.table_histogram.end(); ++hit) {
- x_num_table += hit->second;
- x_num_customers += (hit->second*hit->first);
- }
- int tmp_c = PYP<Dish,Hash>::count(dish);
- int tmp_t = PYP<Dish,Hash>::num_tables(dish);
- assert (x_num_customers <= tmp_c);
- assert (x_num_table <= tmp_t);
-
- if (delta_tc.table_histogram.empty()) {
- // std::cerr << " DELETING " << dish << std::endl;
- assert (delta_tc.tables == 0);
- m_table_delta.erase(dish);
- }
-
- //PYP<Dish,Hash>::debug_info(std::cerr);
- //std::cerr << " Dish " << dish << " has count " << PYP<Dish,Hash>::count(dish) << " tables " << PYP<Dish,Hash>::num_tables(dish) << std::endl;
- return delta;
-}
-
-template <typename Dish, typename Hash>
-void
-MPIPYP<Dish,Hash>::clear() {
- PYP<Dish,Hash>::clear();
- reset_deltas();
-}
-
-template <typename Dish, typename Hash>
-void
-MPIPYP<Dish,Hash>::reset_deltas() {
- m_count_delta.clear();
- m_table_delta.clear();
-}
-
-template <typename Dish>
-struct sum_maps {
- typedef std::map<Dish,int> map_type;
- map_type& operator() (map_type& l, map_type const & r) const {
- for (typename map_type::const_iterator it=r.begin(); it != r.end(); it++)
- l[it->first] += it->second;
- return l;
- }
-};
-
-template <typename Dish>
-struct subtract_maps {
- typedef std::map<Dish,int> map_type;
- map_type& operator() (map_type& l, map_type const & r) const {
- for (typename map_type::const_iterator it=r.begin(); it != r.end(); it++)
- l[it->first] -= it->second;
- return l;
- }
-};
-
-// Needed Boost definitions
-namespace boost {
- namespace mpi {
- template <>
- struct is_commutative< sum_maps<int>, std::map<int,int> > : mpl::true_ {};
- }
-
- namespace serialization {
- template<class Archive>
- void serialize(Archive & ar, PYP<int>::TableCounter& t, const unsigned int version) {
- ar & t.table_histogram;
- ar & t.tables;
- }
-
- } // namespace serialization
-} // namespace boost
-
-template <typename A, typename B, typename C>
-struct triple {
- triple() {}
- triple(const A& a, const B& b, const C& c) : first(a), second(b), third(c) {}
- A first;
- B second;
- C third;
-
- template<class Archive>
- void serialize(Archive &ar, const unsigned int version){
- ar & first;
- ar & second;
- ar & third;
- }
-};
-
-BOOST_IS_BITWISE_SERIALIZABLE(MPIPYP<int>::dish_delta_type)
-BOOST_CLASS_TRACKING(MPIPYP<int>::dish_delta_type,track_never)
-
-template <typename Dish, typename Hash>
-void
-MPIPYP<Dish,Hash>::synchronise(dish_delta_type* result) {
- boost::mpi::communicator world;
- //int rank = world.rank(), size = world.size();
-
- boost::mpi::all_reduce(world, m_count_delta, *result, sum_maps<Dish>());
- subtract_maps<Dish>()(*result, m_count_delta);
-
-/*
- // communicate the customer count deltas
- dish_delta_type global_dish_delta;
- boost::mpi::all_reduce(world, m_count_delta, global_dish_delta, sum_maps<Dish>());
-
- // update this restaurant
- for (typename dish_delta_type::const_iterator it=global_dish_delta.begin();
- it != global_dish_delta.end(); ++it) {
- int global_delta = it->second - m_count_delta[it->first];
- if (global_delta == 0) continue;
- typename std::tr1::unordered_map<Dish,int,Hash>::iterator dit; bool inserted;
- boost::tie(dit, inserted)
- = std::tr1::unordered_map<Dish,int,Hash>::insert(std::make_pair(it->first, 0));
- dit->second += global_delta;
- assert(dit->second >= 0);
- if (dit->second == 0) {
- std::tr1::unordered_map<Dish,int,Hash>::erase(dit);
- }
-
- PYP<Dish,Hash>::_total_customers += (it->second - m_count_delta[it->first]);
- int tmp = PYP<Dish,Hash>::_total_customers;
- assert(tmp >= 0);
- //std::cerr << "Process " << rank << " adding " << (it->second - m_count_delta[it->first]) << " of customer " << it->first << std::endl;
- }
-*/
-/*
- // communicate the table count deltas
- for (int process = 0; process < size; ++process) {
- typename std::vector< triple<Dish, int, int> > message;
- if (rank == process) {
- // broadcast deltas
- for (typename table_delta_type::const_iterator dish_it=m_table_delta.begin();
- dish_it != m_table_delta.end(); ++dish_it) {
- //assert (dish_it->second.tables > 0);
- for (std::map<int,int>::const_iterator it=dish_it->second.table_histogram.begin();
- it != dish_it->second.table_histogram.end(); ++it) {
- triple<Dish, int, int> m(dish_it->first, it->first, it->second);
- message.push_back(m);
- }
- // append a special message with the total table delta for this dish
- triple<Dish, int, int> m(dish_it->first, -1, dish_it->second.tables);
- message.push_back(m);
- }
- boost::mpi::broadcast(world, message, process);
- }
- else {
- // receive deltas
- boost::mpi::broadcast(world, message, process);
- for (typename std::vector< triple<Dish, int, int> >::const_iterator it=message.begin(); it != message.end(); ++it) {
- typename PYP<Dish,Hash>::TableCounter& tc = PYP<Dish,Hash>::_dish_tables[it->first];
- if (it->second >= 0) {
- std::map<int,int>::iterator tit; bool inserted;
- boost::tie(tit, inserted) = tc.table_histogram.insert(std::make_pair(it->second, 0));
- tit->second += it->third;
- if (tit->second < 0) {
- std::cerr << tit->first << " " << tit->second << " " << it->first << " " << it->second << " " << it->third << std::endl;
- assert(tit->second >= 0);
- }
- if (tit->second == 0) {
- tc.table_histogram.erase(tit);
- }
- }
- else {
- tc.tables += it->third;
- PYP<Dish,Hash>::_total_tables += it->third;
- assert(tc.tables >= 0);
- if (tc.tables == 0) assert(tc.table_histogram.empty());
- if (tc.table_histogram.empty()) {
- assert (tc.tables == 0);
- PYP<Dish,Hash>::_dish_tables.erase(it->first);
- }
- }
- }
- }
- }
-*/
-
-// reset_deltas();
-}
-
-#endif