summaryrefslogtreecommitdiff
path: root/dtrain/parallelize/parallelize.rb
blob: e88d9eef7f14dd363cb768351cccde524bb2cf7b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env ruby


if ARGV.size != 5
  STDERR.write "Usage: "
  STDERR.write "ruby parallelize.rb <#shards> <input> <refs> <epochs> <dtrain.ini>\n"
  exit
end

dtrain_bin = '/home/pks/mt/cdec-dtrain/dtrain/dtrain_local_new_f'
ruby       = '/usr/bin/ruby'
lplp_rb    = '/home/pks/mt/cdec-dtrain/dtrain/hstreaming/lplp.rb'
lplp_args  = 'l2 select_k 100000'
gzip       = '/bin/gzip'

num_shards = ARGV[0].to_i
input      = ARGV[1]
refs       = ARGV[2]
epochs     = ARGV[3].to_i
ini        = ARGV[4]


`mkdir work`

def make_shards(input, refs, num_shards)
  lc = `wc -l #{input}`.split.first.to_i
  shard_sz = lc / num_shards
  leftover = lc % num_shards
  in_f = File.new input, 'r'
  refs_f = File.new refs, 'r'
  shard_in_files = []
  shard_refs_files = []
  0.upto(num_shards-1) { |shard|
    shard_in = File.new "work/shard.#{shard}.in", 'w+'
    shard_refs = File.new "work/shard.#{shard}.refs", 'w+'
    0.upto(shard_sz-1) { |i|
      shard_in.write in_f.gets
      shard_refs.write refs_f.gets
    }
    shard_in_files << shard_in
    shard_refs_files << shard_refs
  }
  while leftover > 0
    shard_in_files[-1].write in_f.gets
    shard_refs_files[-1].write refs_f.gets
    leftover -= 1
  end
  (shard_in_files + shard_refs_files).each do |f| f.close end
  in_f.close
  refs_f.close
end

make_shards input, refs, num_shards

0.upto(epochs-1) { |epoch|
  pids = []
  input_weights = ''
  if epoch > 0 then input_weights = "--input_weights work/weights.#{epoch-1}" end
  weights_files = []
  0.upto(num_shards-1) { |shard|
    pids << Kernel.fork {
      `#{dtrain_bin} -c #{ini}\
        --input work/shard.#{shard}.in\
        --refs work/shard.#{shard}.refs #{input_weights}\
        --output work/weights.#{shard}.#{epoch}\
        &> work/out.#{shard}.#{epoch}`
    }
    weights_files << "work/weights.#{shard}.#{epoch}"
  }
  pids.each { |pid| Process.wait(pid) }
  cat = File.new('work/weights_cat', 'w+')
  weights_files.each { |f| cat.write File.new(f, 'r').read }
  cat.close
  `#{ruby} #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat &> work/weights.#{epoch}`
}

`rm work/weights_cat`
`#{gzip} work/*`