From 4236729897ab454f6b28613364b06e94ebbb080e Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Fri, 18 Apr 2014 15:15:40 -0400 Subject: Stream mode for grammar extractor --- python/README.md | 20 +++++++++++++++++- python/cdec/sa/extract.py | 52 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/python/README.md b/python/README.md index 953971d3..37c7b78e 100644 --- a/python/README.md +++ b/python/README.md @@ -23,7 +23,25 @@ Extract grammar rules from the compiled corpus: cat input.txt | python -m cdec.sa.extract -c extract.ini -g grammars/ -z This will create per-sentence grammar files in the `grammars` directory and output annotated input suitable for translation with cdec. - + +Extract rules in stream mode: + + python -m cdec.sa.extract -c extract.ini -t -z + +This will enable stdio interaction with the following types of lines: + +Extract grammar: + + context ||| sentence ||| grammar_file + +Learn (online mode, specify context name): + + context ||| sentence ||| reference ||| alignment + +Drop (online mode, specify context name): + + context ||| drop + ## Library usage A basic demo of pycdec's features is available in `examples/test.py`. diff --git a/python/cdec/sa/extract.py b/python/cdec/sa/extract.py index b6502c52..92d38af9 100644 --- a/python/cdec/sa/extract.py +++ b/python/cdec/sa/extract.py @@ -62,13 +62,44 @@ def extract(inp): grammar_file = os.path.abspath(grammar_file) return '{}{}'.format(grammar_file, i, sentence, suffix) +def stream_extract(): + global extractor, online, compress + while True: + line = sys.stdin.readline() + if not line: + break + fields = re.split('\s*\|\|\|\s*', line.strip()) + # context ||| cmd + if len(fields) == 2: + (context, cmd) = fields + if cmd.lower() == 'drop': + if online: + extractor.drop_ctx(context) + else: + sys.stderr.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) + # context ||| sentence ||| grammar_file + elif len(fields) == 3: + (context, sentence, grammar_file) = fields + with (gzip.open if compress else open)(grammar_file, 'w') as output: + for rule in extractor.grammar(sentence, context): + output.write(str(rule)+'\n') + # context ||| sentence ||| reference ||| alignment + elif len(fields) == 4: + (context, sentence, reference, alignment) = fields + if online: + extractor.add_instance(sentence, reference, alignment, context) + else: + sys.stderr.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) + else: + sys.stderr.write('Error: see README.md for stream mode usage. Skipping line: {}\n'.format(line.strip())) + def main(): global online logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser(description='Extract grammars from a compiled corpus.') parser.add_argument('-c', '--config', required=True, help='extractor configuration') - parser.add_argument('-g', '--grammars', required=True, + parser.add_argument('-g', '--grammars', help='grammar output path') parser.add_argument('-j', '--jobs', type=int, default=1, help='number of parallel extractors') @@ -80,9 +111,15 @@ def main(): help='online grammar extraction') parser.add_argument('-z', '--compress', action='store_true', help='compress grammars with gzip') + parser.add_argument('-t', '--stream', action='store_true', + help='stream mode (see README.md)') args = parser.parse_args() - if not os.path.exists(args.grammars): + if not (args.grammars or args.stream): + sys.stderr.write('Error: either -g/--grammars or -t/--stream required\n') + sys.exit(1) + + if args.grammars and not os.path.exists(args.grammars): os.mkdir(args.grammars) for featdef in args.features: if not featdef.endswith('.py'): @@ -91,9 +128,13 @@ def main(): sys.exit(1) online = args.online + stream = args.stream start_time = monitor_cpu() if args.jobs > 1: + if stream: + sys.stderr.write('Error: stream mode incompatible with multiple jobs\n') + sys.exit(1) logging.info('Starting %d workers; chunk size: %d', args.jobs, args.chunksize) pool = mp.Pool(args.jobs, make_extractor, (args,)) try: @@ -103,8 +144,11 @@ def main(): pool.terminate() else: make_extractor(args) - for output in map(extract, enumerate(sys.stdin)): - print(output) + if stream: + stream_extract() + else: + for output in map(extract, enumerate(sys.stdin)): + print(output) stop_time = monitor_cpu() logging.info("Overall extraction step took %f seconds", stop_time - start_time) -- cgit v1.2.3 From 33dacc7534879b9d5b25f406194c7ae702b1e83b Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Fri, 18 Apr 2014 15:23:17 -0400 Subject: stdout feedback --- python/cdec/sa/extract.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/cdec/sa/extract.py b/python/cdec/sa/extract.py index 92d38af9..b6c11f05 100644 --- a/python/cdec/sa/extract.py +++ b/python/cdec/sa/extract.py @@ -75,23 +75,27 @@ def stream_extract(): if cmd.lower() == 'drop': if online: extractor.drop_ctx(context) + sys.stdout.write('drop {}\n'.format(context)) else: - sys.stderr.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) + sys.stdout.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) # context ||| sentence ||| grammar_file elif len(fields) == 3: (context, sentence, grammar_file) = fields with (gzip.open if compress else open)(grammar_file, 'w') as output: for rule in extractor.grammar(sentence, context): output.write(str(rule)+'\n') + sys.stdout.write('{}\n'.format(grammar_file)) # context ||| sentence ||| reference ||| alignment elif len(fields) == 4: (context, sentence, reference, alignment) = fields if online: extractor.add_instance(sentence, reference, alignment, context) + sys.stdout.write('learn {}\n'.format(context)) else: - sys.stderr.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) + sys.stdout.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip())) else: - sys.stderr.write('Error: see README.md for stream mode usage. Skipping line: {}\n'.format(line.strip())) + sys.stdout.write('Error: see README.md for stream mode usage. Skipping line: {}\n'.format(line.strip())) + sys.stdout.flush() def main(): global online -- cgit v1.2.3 From 9f24de23f4e1bc19819024e95744667134c766eb Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Fri, 18 Apr 2014 17:36:58 -0400 Subject: Each grammar extractor gets its own process to avoid Cython segfaults. --- realtime/rt/rt.py | 64 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 27eeb3ca..70ed0c3c 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -5,6 +5,7 @@ import collections import logging import os import shutil +import signal import StringIO import subprocess import sys @@ -25,6 +26,54 @@ TRUE = ('true', 'True', 'TRUE') logger = logging.getLogger('rt') +class ExtractorWrapper: + '''Wrap cdec.sa.GrammarExtractor. Used to keep multiple instances of the extractor from causing Python to segfault. + Do not use directly unless you know what you're doing.''' + + def __init__(self, config): + # Make sure pycdec is on PYTHONPATH + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + pycdec = os.path.join(cdec_root, 'python') + env = os.environ.copy() + python_path = env.get('PYTHONPATH', '') + if 'cdec/python' not in python_path: + python_path = '{}:{}'.format(python_path, pycdec) if len(python_path) > 0 else pycdec + env['PYTHONPATH'] = python_path + # Start grammar extractor as separate process using stdio + cmd = ['python', '-m', 'cdec.sa.extract', '-o', '-z', '-c', config, '-t'] + logger.info('Executing: {}'.format(' '.join(cmd))) + self.p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) + util.consume_stream(self.p.stderr) + self.lock = util.FIFOLock() + + def close(self, force=False): + if not force: + self.lock.acquire() + self.p.stdin.close() + self.p.wait() + self.lock.release() + else: + os.kill(self.p.pid, signal.SIGTERM) + + + def drop_ctx(self, ctx_name): + self.lock.acquire() + self.p.stdin.write('{} ||| drop\n'.format(ctx_name)) + self.p.stdout.readline() + self.lock.release() + + def grammar(self, sentence, grammar_file, ctx_name): + self.lock.acquire() + self.p.stdin.write('{} ||| {} ||| {}\n'.format(ctx_name, sentence, grammar_file)) + self.p.stdout.readline() + self.lock.release() + + def add_instance(self, source, target, alignment, ctx_name): + self.lock.acquire() + self.p.stdin.write('{} ||| {} ||| {} ||| {}\n'.format(ctx_name, source, target, alignment)) + self.p.stdout.readline() + self.lock.release() + class RealtimeDecoder: '''Do not use directly unless you know what you're doing. Use RealtimeTranslator.''' @@ -111,7 +160,7 @@ class RealtimeTranslator: sa_config.filename = os.path.join(self.tmp, 'sa.ini') util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir)) sa_config.write() - self.extractor = cdec.sa.GrammarExtractor(sa_config.filename, online=True) + self.extractor = ExtractorWrapper(sa_config.filename) self.cache_size = cache_size ### One instance per context @@ -142,10 +191,13 @@ class RealtimeTranslator: '''Cleanup''' if force: logger.info('Forced shutdown: stopping immediately') - for ctx_name in list(self.ctx_names): - self.drop_ctx(ctx_name, force) + # Drop contexts before closing processes unless forced + if not force: + for ctx_name in list(self.ctx_names): + self.drop_ctx(ctx_name, force) logger.info('Closing processes') self.aligner.close(force) + self.extractor.close(force) if self.norm: if not force: self.tokenizer_lock.acquire() @@ -213,11 +265,9 @@ class RealtimeTranslator: self.extractor_lock.release() return grammar_file # Extract and cache - (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') + (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.', suffix='.gz') os.close(fid) - with open(grammar_file, 'w') as output: - for rule in self.extractor.grammar(sentence, ctx_name): - output.write('{}\n'.format(str(rule))) + self.extractor.grammar(sentence, grammar_file, ctx_name) grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() -- cgit v1.2.3