diff options
| author | Chris Dyer <redpony@gmail.com> | 2014-04-20 22:25:20 -0400 | 
|---|---|---|
| committer | Chris Dyer <redpony@gmail.com> | 2014-04-20 22:25:20 -0400 | 
| commit | 97ee2a6f856c211a37a1c99b031319bff7dbee1c (patch) | |
| tree | 234c910977a0c2ccdf21c4954d6548cfbc80b768 /realtime | |
| parent | bb709970ea2142358f03ff85c29b307b34e9001a (diff) | |
| parent | 9f24de23f4e1bc19819024e95744667134c766eb (diff) | |
Merge branch 'master' of https://github.com/redpony/cdec
Diffstat (limited to 'realtime')
| -rw-r--r-- | realtime/rt/rt.py | 64 | 
1 files changed, 57 insertions, 7 deletions
| diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 27eeb3ca..70ed0c3c 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -5,6 +5,7 @@ import collections  import logging  import os  import shutil +import signal  import StringIO  import subprocess  import sys @@ -25,6 +26,54 @@ TRUE = ('true', 'True', 'TRUE')  logger = logging.getLogger('rt') +class ExtractorWrapper: +    '''Wrap cdec.sa.GrammarExtractor.  Used to keep multiple instances of the extractor from causing Python to segfault. +       Do not use directly unless you know what you're doing.''' + +    def __init__(self, config): +        # Make sure pycdec is on PYTHONPATH +        cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +        pycdec = os.path.join(cdec_root, 'python') +        env = os.environ.copy() +        python_path = env.get('PYTHONPATH', '') +        if 'cdec/python' not in python_path: +            python_path = '{}:{}'.format(python_path, pycdec) if len(python_path) > 0 else pycdec +            env['PYTHONPATH'] = python_path +        # Start grammar extractor as separate process using stdio +        cmd = ['python', '-m', 'cdec.sa.extract', '-o', '-z', '-c', config, '-t'] +        logger.info('Executing: {}'.format(' '.join(cmd))) +        self.p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) +        util.consume_stream(self.p.stderr) +        self.lock = util.FIFOLock() + +    def close(self, force=False): +        if not force: +            self.lock.acquire() +            self.p.stdin.close() +            self.p.wait() +            self.lock.release() +        else: +            os.kill(self.p.pid, signal.SIGTERM) +             + +    def drop_ctx(self, ctx_name): +        self.lock.acquire() +        self.p.stdin.write('{} ||| drop\n'.format(ctx_name)) +        self.p.stdout.readline() +        self.lock.release() + +    def grammar(self, sentence, grammar_file, ctx_name): +        self.lock.acquire() +        self.p.stdin.write('{} ||| {} ||| {}\n'.format(ctx_name, sentence, grammar_file)) +        self.p.stdout.readline() +        self.lock.release() + +    def add_instance(self, source, target, alignment, ctx_name): +        self.lock.acquire() +        self.p.stdin.write('{} ||| {} ||| {} ||| {}\n'.format(ctx_name, source, target, alignment)) +        self.p.stdout.readline() +        self.lock.release() +  class RealtimeDecoder:      '''Do not use directly unless you know what you're doing.  Use RealtimeTranslator.''' @@ -111,7 +160,7 @@ class RealtimeTranslator:          sa_config.filename = os.path.join(self.tmp, 'sa.ini')          util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir))          sa_config.write() -        self.extractor = cdec.sa.GrammarExtractor(sa_config.filename, online=True) +        self.extractor = ExtractorWrapper(sa_config.filename)          self.cache_size = cache_size          ### One instance per context @@ -142,10 +191,13 @@ class RealtimeTranslator:          '''Cleanup'''          if force:              logger.info('Forced shutdown: stopping immediately') -        for ctx_name in list(self.ctx_names): -            self.drop_ctx(ctx_name, force) +        # Drop contexts before closing processes unless forced +        if not force: +            for ctx_name in list(self.ctx_names): +                self.drop_ctx(ctx_name, force)          logger.info('Closing processes')          self.aligner.close(force) +        self.extractor.close(force)          if self.norm:              if not force:                  self.tokenizer_lock.acquire() @@ -213,11 +265,9 @@ class RealtimeTranslator:              self.extractor_lock.release()              return grammar_file          # Extract and cache -        (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') +        (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.', suffix='.gz')          os.close(fid) -        with open(grammar_file, 'w') as output: -            for rule in self.extractor.grammar(sentence, ctx_name): -                output.write('{}\n'.format(str(rule))) +        self.extractor.grammar(sentence, grammar_file, ctx_name)          grammar_files = self.grammar_files[ctx_name]          if len(grammar_files) == self.cache_size:              rm_sent = grammar_files.popleft() | 
