summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Dyer <redpony@gmail.com>2014-04-20 22:25:20 -0400
committerChris Dyer <redpony@gmail.com>2014-04-20 22:25:20 -0400
commit97ee2a6f856c211a37a1c99b031319bff7dbee1c (patch)
tree234c910977a0c2ccdf21c4954d6548cfbc80b768
parentbb709970ea2142358f03ff85c29b307b34e9001a (diff)
parent9f24de23f4e1bc19819024e95744667134c766eb (diff)
Merge branch 'master' of https://github.com/redpony/cdec
-rw-r--r--python/README.md20
-rw-r--r--python/cdec/sa/extract.py56
-rw-r--r--realtime/rt/rt.py64
3 files changed, 128 insertions, 12 deletions
diff --git a/python/README.md b/python/README.md
index 953971d3..37c7b78e 100644
--- a/python/README.md
+++ b/python/README.md
@@ -23,7 +23,25 @@ Extract grammar rules from the compiled corpus:
cat input.txt | python -m cdec.sa.extract -c extract.ini -g grammars/ -z
This will create per-sentence grammar files in the `grammars` directory and output annotated input suitable for translation with cdec.
-
+
+Extract rules in stream mode:
+
+ python -m cdec.sa.extract -c extract.ini -t -z
+
+This will enable stdio interaction with the following types of lines:
+
+Extract grammar:
+
+ context ||| sentence ||| grammar_file
+
+Learn (online mode, specify context name):
+
+ context ||| sentence ||| reference ||| alignment
+
+Drop (online mode, specify context name):
+
+ context ||| drop
+
## Library usage
A basic demo of pycdec's features is available in `examples/test.py`.
diff --git a/python/cdec/sa/extract.py b/python/cdec/sa/extract.py
index b6502c52..b6c11f05 100644
--- a/python/cdec/sa/extract.py
+++ b/python/cdec/sa/extract.py
@@ -62,13 +62,48 @@ def extract(inp):
grammar_file = os.path.abspath(grammar_file)
return '<seg grammar="{}" id="{}">{}</seg>{}'.format(grammar_file, i, sentence, suffix)
+def stream_extract():
+ global extractor, online, compress
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ break
+ fields = re.split('\s*\|\|\|\s*', line.strip())
+ # context ||| cmd
+ if len(fields) == 2:
+ (context, cmd) = fields
+ if cmd.lower() == 'drop':
+ if online:
+ extractor.drop_ctx(context)
+ sys.stdout.write('drop {}\n'.format(context))
+ else:
+ sys.stdout.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip()))
+ # context ||| sentence ||| grammar_file
+ elif len(fields) == 3:
+ (context, sentence, grammar_file) = fields
+ with (gzip.open if compress else open)(grammar_file, 'w') as output:
+ for rule in extractor.grammar(sentence, context):
+ output.write(str(rule)+'\n')
+ sys.stdout.write('{}\n'.format(grammar_file))
+ # context ||| sentence ||| reference ||| alignment
+ elif len(fields) == 4:
+ (context, sentence, reference, alignment) = fields
+ if online:
+ extractor.add_instance(sentence, reference, alignment, context)
+ sys.stdout.write('learn {}\n'.format(context))
+ else:
+ sys.stdout.write('Error: online mode not set. Skipping line: {}\n'.format(line.strip()))
+ else:
+ sys.stdout.write('Error: see README.md for stream mode usage. Skipping line: {}\n'.format(line.strip()))
+ sys.stdout.flush()
+
def main():
global online
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description='Extract grammars from a compiled corpus.')
parser.add_argument('-c', '--config', required=True,
help='extractor configuration')
- parser.add_argument('-g', '--grammars', required=True,
+ parser.add_argument('-g', '--grammars',
help='grammar output path')
parser.add_argument('-j', '--jobs', type=int, default=1,
help='number of parallel extractors')
@@ -80,9 +115,15 @@ def main():
help='online grammar extraction')
parser.add_argument('-z', '--compress', action='store_true',
help='compress grammars with gzip')
+ parser.add_argument('-t', '--stream', action='store_true',
+ help='stream mode (see README.md)')
args = parser.parse_args()
- if not os.path.exists(args.grammars):
+ if not (args.grammars or args.stream):
+ sys.stderr.write('Error: either -g/--grammars or -t/--stream required\n')
+ sys.exit(1)
+
+ if args.grammars and not os.path.exists(args.grammars):
os.mkdir(args.grammars)
for featdef in args.features:
if not featdef.endswith('.py'):
@@ -91,9 +132,13 @@ def main():
sys.exit(1)
online = args.online
+ stream = args.stream
start_time = monitor_cpu()
if args.jobs > 1:
+ if stream:
+ sys.stderr.write('Error: stream mode incompatible with multiple jobs\n')
+ sys.exit(1)
logging.info('Starting %d workers; chunk size: %d', args.jobs, args.chunksize)
pool = mp.Pool(args.jobs, make_extractor, (args,))
try:
@@ -103,8 +148,11 @@ def main():
pool.terminate()
else:
make_extractor(args)
- for output in map(extract, enumerate(sys.stdin)):
- print(output)
+ if stream:
+ stream_extract()
+ else:
+ for output in map(extract, enumerate(sys.stdin)):
+ print(output)
stop_time = monitor_cpu()
logging.info("Overall extraction step took %f seconds", stop_time - start_time)
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py
index 27eeb3ca..70ed0c3c 100644
--- a/realtime/rt/rt.py
+++ b/realtime/rt/rt.py
@@ -5,6 +5,7 @@ import collections
import logging
import os
import shutil
+import signal
import StringIO
import subprocess
import sys
@@ -25,6 +26,54 @@ TRUE = ('true', 'True', 'TRUE')
logger = logging.getLogger('rt')
+class ExtractorWrapper:
+ '''Wrap cdec.sa.GrammarExtractor. Used to keep multiple instances of the extractor from causing Python to segfault.
+ Do not use directly unless you know what you're doing.'''
+
+ def __init__(self, config):
+ # Make sure pycdec is on PYTHONPATH
+ cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+ pycdec = os.path.join(cdec_root, 'python')
+ env = os.environ.copy()
+ python_path = env.get('PYTHONPATH', '')
+ if 'cdec/python' not in python_path:
+ python_path = '{}:{}'.format(python_path, pycdec) if len(python_path) > 0 else pycdec
+ env['PYTHONPATH'] = python_path
+ # Start grammar extractor as separate process using stdio
+ cmd = ['python', '-m', 'cdec.sa.extract', '-o', '-z', '-c', config, '-t']
+ logger.info('Executing: {}'.format(' '.join(cmd)))
+ self.p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
+ util.consume_stream(self.p.stderr)
+ self.lock = util.FIFOLock()
+
+ def close(self, force=False):
+ if not force:
+ self.lock.acquire()
+ self.p.stdin.close()
+ self.p.wait()
+ self.lock.release()
+ else:
+ os.kill(self.p.pid, signal.SIGTERM)
+
+
+ def drop_ctx(self, ctx_name):
+ self.lock.acquire()
+ self.p.stdin.write('{} ||| drop\n'.format(ctx_name))
+ self.p.stdout.readline()
+ self.lock.release()
+
+ def grammar(self, sentence, grammar_file, ctx_name):
+ self.lock.acquire()
+ self.p.stdin.write('{} ||| {} ||| {}\n'.format(ctx_name, sentence, grammar_file))
+ self.p.stdout.readline()
+ self.lock.release()
+
+ def add_instance(self, source, target, alignment, ctx_name):
+ self.lock.acquire()
+ self.p.stdin.write('{} ||| {} ||| {} ||| {}\n'.format(ctx_name, source, target, alignment))
+ self.p.stdout.readline()
+ self.lock.release()
+
class RealtimeDecoder:
'''Do not use directly unless you know what you're doing. Use RealtimeTranslator.'''
@@ -111,7 +160,7 @@ class RealtimeTranslator:
sa_config.filename = os.path.join(self.tmp, 'sa.ini')
util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir))
sa_config.write()
- self.extractor = cdec.sa.GrammarExtractor(sa_config.filename, online=True)
+ self.extractor = ExtractorWrapper(sa_config.filename)
self.cache_size = cache_size
### One instance per context
@@ -142,10 +191,13 @@ class RealtimeTranslator:
'''Cleanup'''
if force:
logger.info('Forced shutdown: stopping immediately')
- for ctx_name in list(self.ctx_names):
- self.drop_ctx(ctx_name, force)
+ # Drop contexts before closing processes unless forced
+ if not force:
+ for ctx_name in list(self.ctx_names):
+ self.drop_ctx(ctx_name, force)
logger.info('Closing processes')
self.aligner.close(force)
+ self.extractor.close(force)
if self.norm:
if not force:
self.tokenizer_lock.acquire()
@@ -213,11 +265,9 @@ class RealtimeTranslator:
self.extractor_lock.release()
return grammar_file
# Extract and cache
- (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.')
+ (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.', suffix='.gz')
os.close(fid)
- with open(grammar_file, 'w') as output:
- for rule in self.extractor.grammar(sentence, ctx_name):
- output.write('{}\n'.format(str(rule)))
+ self.extractor.grammar(sentence, grammar_file, ctx_name)
grammar_files = self.grammar_files[ctx_name]
if len(grammar_files) == self.cache_size:
rm_sent = grammar_files.popleft()