From 4236729897ab454f6b28613364b06e94ebbb080e Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Fri, 18 Apr 2014 15:15:40 -0400 Subject: Stream mode for grammar extractor --- python/cdec/sa/extract.py | 52 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 4 deletions(-) (limited to 'python/cdec/sa') diff --git a/python/cdec/sa/extract.py b/python/cdec/sa/extract.py index b6502c52..92d38af9 100644 --- a/python/cdec/sa/extract.py +++ b/python/cdec/sa/extract.py @@ -62,13 +62,44 @@ def extract(inp): grammar_file = os.path.abspath(grammar_file) return '{}{}'.format(grammar_file, i, sentence, suffix) +def stream_extract(): + global extractor, online, compress + while True: + line = sys.stdin.readline() + if not line: + break + fields = re.split('\s*\|\|\|\s*', line.strip()) + # context ||| cmd + if len(fields) == 2: + (context, cmd) = fields + if cmd.lower() == 'drop': + if online: + extractor.drop_ctx(context) + else: + sys.stderr.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) + # context ||| sentence ||| grammar_file + elif len(fields) == 3: + (context, sentence, grammar_file) = fields + with (gzip.open if compress else open)(grammar_file, 'w') as output: + for rule in extractor.grammar(sentence, context): + output.write(str(rule)+'\n') + # context ||| sentence ||| reference ||| alignment + elif len(fields) == 4: + (context, sentence, reference, alignment) = fields + if online: + extractor.add_instance(sentence, reference, alignment, context) + else: + sys.stderr.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) + else: + sys.stderr.write('Error: see README.md for stream mode usage. Skipping line: {}\n'.format(line.strip())) + def main(): global online logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser(description='Extract grammars from a compiled corpus.') parser.add_argument('-c', '--config', required=True, help='extractor configuration') - parser.add_argument('-g', '--grammars', required=True, + parser.add_argument('-g', '--grammars', help='grammar output path') parser.add_argument('-j', '--jobs', type=int, default=1, help='number of parallel extractors') @@ -80,9 +111,15 @@ def main(): help='online grammar extraction') parser.add_argument('-z', '--compress', action='store_true', help='compress grammars with gzip') + parser.add_argument('-t', '--stream', action='store_true', + help='stream mode (see README.md)') args = parser.parse_args() - if not os.path.exists(args.grammars): + if not (args.grammars or args.stream): + sys.stderr.write('Error: either -g/--grammars or -t/--stream required\n') + sys.exit(1) + + if args.grammars and not os.path.exists(args.grammars): os.mkdir(args.grammars) for featdef in args.features: if not featdef.endswith('.py'): @@ -91,9 +128,13 @@ def main(): sys.exit(1) online = args.online + stream = args.stream start_time = monitor_cpu() if args.jobs > 1: + if stream: + sys.stderr.write('Error: stream mode incompatible with multiple jobs\n') + sys.exit(1) logging.info('Starting %d workers; chunk size: %d', args.jobs, args.chunksize) pool = mp.Pool(args.jobs, make_extractor, (args,)) try: @@ -103,8 +144,11 @@ def main(): pool.terminate() else: make_extractor(args) - for output in map(extract, enumerate(sys.stdin)): - print(output) + if stream: + stream_extract() + else: + for output in map(extract, enumerate(sys.stdin)): + print(output) stop_time = monitor_cpu() logging.info("Overall extraction step took %f seconds", stop_time - start_time) -- cgit v1.2.3