From 38c38f707e58960f80a8dc216673ae0bb0796ade Mon Sep 17 00:00:00 2001 From: Victor Chahuneau Date: Tue, 4 Sep 2012 10:21:25 +0100 Subject: Multi-processing grammar extraction + various surface fixes --- python/pkg/cdec/sa/extract.py | 45 ++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) (limited to 'python/pkg/cdec/sa/extract.py') diff --git a/python/pkg/cdec/sa/extract.py b/python/pkg/cdec/sa/extract.py index 875bf42e..39eac824 100644 --- a/python/pkg/cdec/sa/extract.py +++ b/python/pkg/cdec/sa/extract.py @@ -3,29 +3,52 @@ import sys import os import argparse import logging +import multiprocessing as mp +import signal import cdec.sa +extractor, prefix = None, None +def make_extractor(config, grammars): + global extractor, prefix + signal.signal(signal.SIGINT, signal.SIG_IGN) # Let parent process catch Ctrl+C + extractor = cdec.sa.GrammarExtractor(config) + prefix = grammars + +def extract(inp): + global extractor, prefix + i, sentence = inp + sentence = sentence[:-1] + grammar_file = os.path.join(prefix, 'grammar.{0}'.format(i)) + with open(grammar_file, 'w') as output: + for rule in extractor.grammar(sentence): + output.write(str(rule)+'\n') + grammar_file = os.path.abspath(grammar_file) + return '{2}'.format(grammar_file, i, sentence) + + def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser(description='Extract grammars from a compiled corpus.') parser.add_argument('-c', '--config', required=True, - help='Extractor configuration') + help='extractor configuration') parser.add_argument('-g', '--grammars', required=True, - help='Grammar output path') + help='grammar output path') + parser.add_argument('-j', '--jobs', type=int, default=1, + help='number of parallel extractors') + parser.add_argument('-s', '--chunksize', type=int, default=10, + help='number of sentences / chunk') args = parser.parse_args() if not os.path.exists(args.grammars): os.mkdir(args.grammars) - extractor = cdec.sa.GrammarExtractor(args.config) - for i, sentence in enumerate(sys.stdin): - sentence = sentence[:-1] - grammar_file = os.path.join(args.grammars, 'grammar.{0}'.format(i)) - with open(grammar_file, 'w') as output: - for rule in extractor.grammar(sentence): - output.write(str(rule)+'\n') - grammar_file = os.path.abspath(grammar_file) - print('{2}'.format(grammar_file, i, sentence)) + logging.info('Starting %d workers; chunk size: %d', args.jobs, args.chunksize) + pool = mp.Pool(args.jobs, make_extractor, (args.config, args.grammars)) + try: + for output in pool.imap(extract, enumerate(sys.stdin), args.chunksize): + print(output) + except KeyboardInterrupt: + pool.terminate() if __name__ == '__main__': main() -- cgit v1.2.3