From c7021174a3592d6bb160a001dbced72d3f31d148 Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Tue, 14 Apr 2015 16:26:30 -0400 Subject: Parallel tokenization --- corpus/tokenize-parallel.py | 73 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100755 corpus/tokenize-parallel.py (limited to 'corpus') diff --git a/corpus/tokenize-parallel.py b/corpus/tokenize-parallel.py new file mode 100755 index 00000000..6e4d0bd8 --- /dev/null +++ b/corpus/tokenize-parallel.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import gzip +import math +import os +import shutil +import subprocess +import sys +import tempfile + +DEFAULT_JOBS = 8 +DEFAULT_TMP = '/tmp' + +TOKENIZER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tokenize-anything.sh') + +def gzopen(f): + return gzip.open(f) if f.endswith('.gz') else open(f) + +def wc(f): + return sum(1 for line in gzopen(f)) + +def main(argv): + + if len(argv[1:]) < 1: + sys.stderr.write('Parallelize text normalization with multiple instances of tokenize-anything.sh\n\n') + sys.stderr.write('Usage: {} in-file [jobs [temp-dir]] >out-file\n'.format(argv[0])) + sys.exit(2) + + in_file = argv[1] + jobs = int(argv[2]) if len(argv[1:]) > 1 else DEFAULT_JOBS + tmp = argv[3] if len(argv[1:]) > 2 else DEFAULT_TMP + + work = tempfile.mkdtemp(prefix='tok.', dir=tmp) + in_wc = wc(in_file) + # Don't start more jobs than we have lines + jobs = min(jobs, in_wc) + lines_per = int(math.ceil(float(in_wc)/jobs)) + + inp = gzopen(in_file) + procs = [] + files = [] + outs = [] + for i in range(jobs): + raw = os.path.join(work, 'in.{}'.format(i)) + tok = os.path.join(work, 'out.{}'.format(i)) + files.append(tok) + # Write raw batch + raw_out = open(raw, 'w') + for _ in range(lines_per): + line = inp.readline() + if not line: + break + raw_out.write(line) + raw_out.close() + # Start tokenizer + raw_in = open(raw) + tok_out = open(tok, 'w') + outs.append(tok_out) + p = subprocess.Popen(TOKENIZER, stdin=raw_in, stdout=tok_out) + procs.append(p) + + # Cat output of each tokenizer as it finishes + for (p, f, o) in zip(procs, files, outs): + p.wait() + o.close() + for line in open(f): + sys.stdout.write(line) + + # Cleanup + shutil.rmtree(work) + +if __name__ == '__main__': + main(sys.argv) -- cgit v1.2.3