summaryrefslogtreecommitdiff
path: root/realtime
diff options
context:
space:
mode:
Diffstat (limited to 'realtime')
-rw-r--r--realtime/README.md18
-rwxr-xr-xrealtime/mkconfig.py4
-rwxr-xr-xrealtime/mkinput.py18
-rwxr-xr-xrealtime/realtime.py95
-rw-r--r--realtime/rt/__init__.py15
-rw-r--r--realtime/rt/aligner.py34
-rw-r--r--realtime/rt/decoder.py63
-rw-r--r--realtime/rt/rt.py375
-rw-r--r--realtime/rt/util.py66
9 files changed, 560 insertions, 128 deletions
diff --git a/realtime/README.md b/realtime/README.md
index b37dddc8..e5290fc5 100644
--- a/realtime/README.md
+++ b/realtime/README.md
@@ -1 +1,17 @@
-More to come.
+cdec Realtime
+-------------
+
+Code by Michael Denkowski (http://www.cs.cmu.edu/~mdenkows/, mdenkows@cs.cmu.edu)
+
+```
+@misc{denkowski-proposal2013,
+ author = {Michael Denkowski},
+ title = {Machine Translation for Human Translators},
+ year = {2013},
+ month = {May},
+ day = {30},
+ howpublished = {{Ph.D.} Thesis Proposal, Carnegie Mellon University}
+}
+```
+
+For a full tutorial, see http://www.cs.cmu.edu/~mdenkows/cdec-realtime.html
diff --git a/realtime/mkconfig.py b/realtime/mkconfig.py
index 32388978..f193c57a 100755
--- a/realtime/mkconfig.py
+++ b/realtime/mkconfig.py
@@ -4,9 +4,11 @@ import os
import shutil
import sys
+# Import first to make sure pycdec is on path
+import rt
+
from cdec.configobj import ConfigObj
-import rt
def main():
diff --git a/realtime/mkinput.py b/realtime/mkinput.py
new file mode 100755
index 00000000..a1b1256d
--- /dev/null
+++ b/realtime/mkinput.py
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+import itertools
+import sys
+
+def main():
+
+ if len(sys.argv[1:]) < 2:
+ sys.stderr.write('usage: {} test.src test.ref [ctx_name] >test.input\n'.format(sys.argv[0]))
+ sys.exit(2)
+
+ ctx_name = ' {}'.format(sys.argv[3]) if len(sys.argv[1:]) > 2 else ''
+ for (src, ref) in itertools.izip(open(sys.argv[1]), open(sys.argv[2])):
+ sys.stdout.write('TR{} ||| {}'.format(ctx_name, src))
+ sys.stdout.write('LEARN{} ||| {} ||| {}'.format(ctx_name, src.strip(), ref))
+
+if __name__ == '__main__':
+ main()
diff --git a/realtime/realtime.py b/realtime/realtime.py
index dff7e90c..ec15b59d 100755
--- a/realtime/realtime.py
+++ b/realtime/realtime.py
@@ -3,50 +3,109 @@
import argparse
import logging
import sys
+import threading
+import time
import rt
+ABOUT = '''Realtime adaptive translation with cdec (See README.md)
+
+Code by Michael Denkowski
+
+Citation:
+@misc{denkowski-proposal2013,
+ author = {Michael Denkowski},
+ title = {Machine Translation for Human Translators},
+ year = {2013},
+ month = {May},
+ day = {30},
+ howpublished = {{Ph.D.} Thesis Proposal, Carnegie Mellon University}
+}
+
+'''
+
class Parser(argparse.ArgumentParser):
def error(self, message):
+ sys.stderr.write(ABOUT)
self.print_help()
sys.stderr.write('\n{}\n'.format(message))
sys.exit(2)
+def handle_line(translator, line, output, ctx_name):
+ res = translator.command_line(line, ctx_name)
+ if res:
+ output.write('{}\n'.format(res))
+ output.flush()
+
+def test1(translator, input, output, ctx_name):
+ inp = open(input)
+ out = open(output, 'w')
+ for line in inp:
+ handle_line(translator, line.strip(), out, ctx_name)
+ out.close()
+
+def debug(translator, input):
+ # Test 1: multiple contexts
+ threads = []
+ for i in range(4):
+ t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i)))
+ threads.append(t)
+ t.start()
+ time.sleep(30)
+ # Test 2: flood
+ out = open('{}.out.flood'.format(input), 'w')
+ inp = open(input)
+ while True:
+ line = inp.readline()
+ if not line:
+ break
+ line = line.strip()
+ t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None))
+ threads.append(t)
+ t.start()
+ time.sleep(1)
+ translator.drop_ctx(None)
+ # Join test threads
+ for t in threads:
+ t.join()
+
def main():
- parser = Parser(description='Real-time adaptive translation with cdec.')
- parser.add_argument('-c', '--config', required=True, help='Config directory (see README.md)')
+ parser = Parser()
+ parser.add_argument('-c', '--config', required=True, help='Config directory')
+ parser.add_argument('-s', '--state', help='Load state file to default context (saved incremental data)')
parser.add_argument('-n', '--normalize', help='Normalize text (tokenize, translate, detokenize)', action='store_true')
parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp')
parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5')
parser.add_argument('-v', '--verbose', help='Info to stderr', action='store_true')
+ parser.add_argument('-D', '--debug-test', help='Run debug tests on input file')
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
- rtd = rt.RealtimeDecoder(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize)
+ with rt.RealtimeTranslator(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as translator:
+
+ # Debugging
+ if args.debug_test:
+ debug(translator, args.debug_test)
+ return
+
+ # Load state if given
+ if args.state:
+ rtd.load_state(state)
- try:
+ # Read lines and commands
while True:
line = sys.stdin.readline()
if not line:
break
- input = [f.strip() for f in line.split('|||')]
- if len(input) == 1:
- hyp = rtd.decode(input[0])
- sys.stdout.write('{}\n'.format(hyp))
+ line = line.strip()
+ res = translator.command_line(line)
+ if res:
+ sys.stdout.write('{}\n'.format(res))
sys.stdout.flush()
- elif len(input) == 2:
- rtd.learn(*input)
-
- # Clean exit on ctrl+c
- except KeyboardInterrupt:
- logging.info('Caught KeyboardInterrupt, exiting')
-
- # Cleanup
- rtd.close()
-
+
if __name__ == '__main__':
main()
diff --git a/realtime/rt/__init__.py b/realtime/rt/__init__.py
index 738777f3..c76acc4d 100644
--- a/realtime/rt/__init__.py
+++ b/realtime/rt/__init__.py
@@ -1,3 +1,18 @@
+# Add pycdec to the Python path if user hasn't
+import os
+import sys
+try:
+ import cdec
+except ImportError as ie:
+ try:
+ pycdec = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'python')
+ sys.path.append(pycdec)
+ import cdec
+ except:
+ sys.stderr.write('Error: cannot import pycdec. Please check that cdec/python is built.\n')
+ raise ie
+
+# Regular init imports
from rt import *
import aligner
import decoder
diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py
index 3c6ea144..e1782496 100644
--- a/realtime/rt/aligner.py
+++ b/realtime/rt/aligner.py
@@ -2,14 +2,17 @@ import logging
import os
import sys
import subprocess
+import threading
import util
+logger = logging.getLogger('rt.aligner')
+
class ForceAligner:
- def __init__(self, fwd_params, fwd_err, rev_params, rev_err):
+ def __init__(self, fwd_params, fwd_err, rev_params, rev_err, heuristic='grow-diag-final-and'):
- cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
fast_align = os.path.join(cdec_root, 'word-aligner', 'fast_align')
atools = os.path.join(cdec_root, 'utils', 'atools')
@@ -18,21 +21,27 @@ class ForceAligner:
fwd_cmd = [fast_align, '-i', '-', '-d', '-T', fwd_T, '-m', fwd_m, '-f', fwd_params]
rev_cmd = [fast_align, '-i', '-', '-d', '-T', rev_T, '-m', rev_m, '-f', rev_params, '-r']
- tools_cmd = [atools, '-i', '-', '-j', '-', '-c', 'grow-diag-final-and']
+ tools_cmd = [atools, '-i', '-', '-j', '-', '-c', heuristic]
- logging.info('Executing: {}'.format(' '.join(fwd_cmd)))
+ logger.info('Executing: {}'.format(' '.join(fwd_cmd)))
self.fwd_align = util.popen_io(fwd_cmd)
- logging.info('Executing: {}'.format(' '.join(rev_cmd)))
+ logger.info('Executing: {}'.format(' '.join(rev_cmd)))
self.rev_align = util.popen_io(rev_cmd)
- logging.info('Executing: {}'.format(' '.join(tools_cmd)))
+ logger.info('Executing: {}'.format(' '.join(tools_cmd)))
self.tools = util.popen_io(tools_cmd)
+ # Used to guarantee thread safety
+ self.lock = util.FIFOLock()
+
def align(self, source, target):
+ '''Threadsafe, FIFO'''
return self.align_formatted('{} ||| {}'.format(source, target))
def align_formatted(self, line):
+ '''Threadsafe, FIFO'''
+ self.lock.acquire()
self.fwd_align.stdin.write('{}\n'.format(line))
self.rev_align.stdin.write('{}\n'.format(line))
# f words ||| e words ||| links ||| score
@@ -40,12 +49,21 @@ class ForceAligner:
rev_line = self.rev_align.stdout.readline().split('|||')[2].strip()
self.tools.stdin.write('{}\n'.format(fwd_line))
self.tools.stdin.write('{}\n'.format(rev_line))
- return self.tools.stdout.readline().strip()
+ al_line = self.tools.stdout.readline().strip()
+ self.lock.release()
+ return al_line
- def close(self):
+ def close(self, force=False):
+ if not force:
+ self.lock.acquire()
self.fwd_align.stdin.close()
+ self.fwd_align.wait()
self.rev_align.stdin.close()
+ self.rev_align.wait()
self.tools.stdin.close()
+ self.tools.wait()
+ if not force:
+ self.lock.release()
def read_err(self, err):
(T, m) = ('', '')
diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py
index 0a202fae..ed45c248 100644
--- a/realtime/rt/decoder.py
+++ b/realtime/rt/decoder.py
@@ -1,39 +1,80 @@
import logging
import os
import subprocess
+import threading
import util
+logger = logging.getLogger('rt.decoder')
+
class Decoder:
- def close(self):
+ def close(self, force=False):
+ if not force:
+ self.lock.acquire()
self.decoder.stdin.close()
+ self.decoder.wait()
+ if not force:
+ self.lock.release()
- def decode(self, sentence, grammar):
- input = '<seg grammar="{g}">{s}</seg>\n'.format(s=sentence, g=grammar)
+ def decode(self, sentence, grammar=None):
+ '''Threadsafe, FIFO'''
+ self.lock.acquire()
+ input = '<seg grammar="{g}">{s}</seg>\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence)
self.decoder.stdin.write(input)
- return self.decoder.stdout.readline().strip()
+ hyp = self.decoder.stdout.readline().strip()
+ self.lock.release()
+ return hyp
class CdecDecoder(Decoder):
-
+
def __init__(self, config, weights):
- cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
decoder = os.path.join(cdec_root, 'decoder', 'cdec')
decoder_cmd = [decoder, '-c', config, '-w', weights]
- logging.info('Executing: {}'.format(' '.join(decoder_cmd)))
+ logger.info('Executing: {}'.format(' '.join(decoder_cmd)))
self.decoder = util.popen_io(decoder_cmd)
+ self.lock = util.FIFOLock()
class MIRADecoder(Decoder):
def __init__(self, config, weights):
- cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
mira = os.path.join(cdec_root, 'training', 'mira', 'kbest_cut_mira')
# optimizer=2 step=0.001 best=500, k=500, uniq, stream
mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t']
- logging.info('Executing: {}'.format(' '.join(mira_cmd)))
+ logger.info('Executing: {}'.format(' '.join(mira_cmd)))
self.decoder = util.popen_io(mira_cmd)
+ self.lock = util.FIFOLock()
+
+ def get_weights(self):
+ '''Threadsafe, FIFO'''
+ self.lock.acquire()
+ self.decoder.stdin.write('WEIGHTS ||| WRITE\n')
+ weights = self.decoder.stdout.readline().strip()
+ self.lock.release()
+ return weights
+
+ def set_weights(self, w_line):
+ '''Threadsafe, FIFO'''
+ self.lock.acquire()
+ try:
+ # Check validity
+ for w_str in w_line.split():
+ (k, v) = w_str.split('=')
+ float(v)
+ self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line))
+ self.lock.release()
+ except:
+ self.lock.release()
+ raise Exception('Invalid weights line: {}'.format(w_line))
+
def update(self, sentence, grammar, reference):
- input = '<seg grammar="{g}">{s}</seg> ||| {r}\n'.format(s=sentence, g=grammar, r=reference)
+ '''Threadsafe, FIFO'''
+ self.lock.acquire()
+ input = 'LEARN ||| <seg grammar="{g}">{s}</seg> ||| {r}\n'.format(s=sentence, g=grammar, r=reference)
self.decoder.stdin.write(input)
- return self.decoder.stdout.readline().strip()
+ log = self.decoder.stdout.readline().strip()
+ self.lock.release()
+ return log
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py
index 892cc217..c0aec410 100644
--- a/realtime/rt/rt.py
+++ b/realtime/rt/rt.py
@@ -5,33 +5,88 @@ import collections
import logging
import os
import shutil
-import sys
+import StringIO
import subprocess
+import sys
import tempfile
+import threading
import time
-from cdec.configobj import ConfigObj
-import cdec.sa
-
+import cdec
import aligner
import decoder
import util
+# Dummy input token that is unlikely to appear in normalized data (but no fatal errors if it does)
+LIKELY_OOV = '(OOV)'
+
+logger = logging.getLogger('rt')
+
class RealtimeDecoder:
+ '''Do not use directly unless you know what you're doing. Use RealtimeTranslator.'''
+
+ def __init__(self, configdir, tmpdir):
+
+ cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+ self.tmp = tmpdir
+ os.mkdir(self.tmp)
+
+ # HPYPLM reference stream
+ ref_fifo_file = os.path.join(self.tmp, 'ref.fifo')
+ os.mkfifo(ref_fifo_file)
+ self.ref_fifo = open(ref_fifo_file, 'w+')
+ # Start with empty line (do not learn prior to first input)
+ self.ref_fifo.write('\n')
+ self.ref_fifo.flush()
+
+ # Decoder
+ decoder_config = [[f.strip() for f in line.split('=')] for line in open(os.path.join(configdir, 'cdec.ini'))]
+ util.cdec_ini_for_realtime(decoder_config, os.path.abspath(configdir), ref_fifo_file)
+ decoder_config_file = os.path.join(self.tmp, 'cdec.ini')
+ with open(decoder_config_file, 'w') as output:
+ for (k, v) in decoder_config:
+ output.write('{}={}\n'.format(k, v))
+ decoder_weights = os.path.join(configdir, 'weights.final')
+ self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights)
+
+ def close(self, force=False):
+ logger.info('Closing decoder and removing {}'.format(self.tmp))
+ self.decoder.close(force)
+ self.ref_fifo.close()
+ shutil.rmtree(self.tmp)
+
+class RealtimeTranslator:
+ '''Main entry point into API: serves translations to any number of concurrent users'''
def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False):
- cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ # name -> (method, set of possible nargs)
+ self.COMMANDS = {
+ 'TR': (self.translate, set((1,))),
+ 'LEARN': (self.learn, set((2,))),
+ 'SAVE': (self.save_state, set((0, 1))),
+ 'LOAD': (self.load_state, set((0, 1))),
+ 'DROP': (self.drop_ctx, set((0,))),
+ 'LIST': (self.list_ctx, set((0,))),
+ }
+
+ cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+ ### Single instance for all contexts
+ self.config = configdir
# Temporary work dir
self.tmp = tempfile.mkdtemp(dir=tmpdir, prefix='realtime.')
- logging.info('Using temp dir {}'.format(self.tmp))
+ logger.info('Using temp dir {}'.format(self.tmp))
# Normalization
self.norm = norm
if self.norm:
self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u'])
+ self.tokenizer_lock = util.FIFOLock()
self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')])
+ self.detokenizer_lock = util.FIFOLock()
# Word aligner
fwd_params = os.path.join(configdir, 'a.fwd_params')
@@ -41,112 +96,302 @@ class RealtimeDecoder:
self.aligner = aligner.ForceAligner(fwd_params, fwd_err, rev_params, rev_err)
# Grammar extractor
- sa_config = ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True)
+ sa_config = cdec.configobj.ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True)
sa_config.filename = os.path.join(self.tmp, 'sa.ini')
util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir))
sa_config.write()
self.extractor = cdec.sa.GrammarExtractor(sa_config.filename, online=True)
- self.grammar_files = collections.deque()
- self.grammar_dict = {}
self.cache_size = cache_size
- # HPYPLM reference stream
- ref_fifo_file = os.path.join(self.tmp, 'ref.fifo')
- os.mkfifo(ref_fifo_file)
- self.ref_fifo = open(ref_fifo_file, 'w+')
- # Start with empty line (do not learn prior to first input)
- self.ref_fifo.write('\n')
- self.ref_fifo.flush()
+ ### One instance per context
- # Decoder
- decoder_config = [[f.strip() for f in line.split('=')] for line in open(os.path.join(configdir, 'cdec.ini'))]
- util.cdec_ini_for_realtime(decoder_config, os.path.abspath(configdir), ref_fifo_file)
- decoder_config_file = os.path.join(self.tmp, 'cdec.ini')
- with open(decoder_config_file, 'w') as output:
- for (k, v) in decoder_config:
- output.write('{}={}\n'.format(k, v))
- decoder_weights = os.path.join(configdir, 'weights.final')
- self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights)
+ self.ctx_names = set()
+ # All context-dependent operations are atomic
+ self.ctx_locks = collections.defaultdict(util.FIFOLock)
+ # ctx -> list of (source, target, alignment)
+ self.ctx_data = {}
- def close(self):
- logging.info('Closing processes')
- self.aligner.close()
- self.decoder.close()
- self.ref_fifo.close()
+ # Grammar extractor is not threadsafe
+ self.extractor_lock = util.FIFOLock()
+ # ctx -> deque of file
+ self.grammar_files = {}
+ # ctx -> dict of {sentence: file}
+ self.grammar_dict = {}
+
+ self.decoders = {}
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, ex_type, ex_value, ex_traceback):
+ # Force shutdown on exception
+ self.close(ex_type is not None)
+
+ def close(self, force=False):
+ '''Cleanup'''
+ if force:
+ logger.info('Forced shutdown: stopping immediately')
+ for ctx_name in list(self.ctx_names):
+ self.drop_ctx(ctx_name, force)
+ logger.info('Closing processes')
+ self.aligner.close(force)
if self.norm:
+ if not force:
+ self.tokenizer_lock.acquire()
+ self.detokenizer_lock.acquire()
self.tokenizer.stdin.close()
+ self.tokenizer.wait()
self.detokenizer.stdin.close()
- logging.info('Deleting {}'.format(self.tmp))
+ self.detokenizer.wait()
+ if not force:
+ self.tokenizer_lock.release()
+ self.detokenizer_lock.release()
+ logger.info('Deleting {}'.format(self.tmp))
shutil.rmtree(self.tmp)
- def grammar(self, sentence):
- grammar_file = self.grammar_dict.get(sentence, None)
+ def lazy_ctx(self, ctx_name):
+ '''Initialize a context (inc starting a new decoder) if needed.
+ NOT threadsafe, acquire ctx_name lock before calling.'''
+ if ctx_name in self.ctx_names:
+ return
+ logger.info('({}) New context'.format(ctx_name))
+ self.ctx_names.add(ctx_name)
+ self.ctx_data[ctx_name] = []
+ self.grammar_files[ctx_name] = collections.deque()
+ self.grammar_dict[ctx_name] = {}
+ tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name))
+ self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir)
+
+ def drop_ctx(self, ctx_name=None, force=False):
+ '''Delete a context (inc stopping the decoder)
+ Threadsafe and FIFO unless forced.'''
+ lock = self.ctx_locks[ctx_name]
+ if not force:
+ lock.acquire()
+ if ctx_name not in self.ctx_names:
+ logger.info('({}) No context found, no action taken'.format(ctx_name))
+ if not force:
+ lock.release()
+ return
+ logger.info('({}) Dropping context'.format(ctx_name))
+ self.ctx_names.remove(ctx_name)
+ self.ctx_data.pop(ctx_name)
+ self.extractor.drop_ctx(ctx_name)
+ self.grammar_files.pop(ctx_name)
+ self.grammar_dict.pop(ctx_name)
+ self.decoders.pop(ctx_name).close(force)
+ self.ctx_locks.pop(ctx_name)
+ if not force:
+ lock.release()
+
+ def list_ctx(self, ctx_name=None):
+ '''Return a string of active contexts'''
+ return 'ctx_name ||| {}'.format(' '.join(sorted(str(ctx_name) for ctx_name in self.ctx_names)))
+
+ def grammar(self, sentence, ctx_name=None):
+ '''Extract a sentence-level grammar on demand (or return cached)
+ Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock
+ before calling.'''
+ self.extractor_lock.acquire()
+ self.lazy_ctx(ctx_name)
+ grammar_dict = self.grammar_dict[ctx_name]
+ grammar_file = grammar_dict.get(sentence, None)
# Cache hit
if grammar_file:
- logging.info('Grammar cache hit')
+ logger.info('({}) Grammar cache hit: {}'.format(ctx_name, grammar_file))
+ self.extractor_lock.release()
return grammar_file
# Extract and cache
- grammar_file = tempfile.mkstemp(dir=self.tmp, prefix='grammar.')[1]
+ (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.')
+ os.close(fid)
with open(grammar_file, 'w') as output:
- for rule in self.extractor.grammar(sentence):
+ for rule in self.extractor.grammar(sentence, ctx_name):
output.write('{}\n'.format(str(rule)))
- if len(self.grammar_files) == self.cache_size:
- rm_sent = self.grammar_files.popleft()
+ grammar_files = self.grammar_files[ctx_name]
+ if len(grammar_files) == self.cache_size:
+ rm_sent = grammar_files.popleft()
# If not already removed by learn method
- if rm_sent in self.grammar_dict:
- rm_grammar = self.grammar_dict.pop(rm_sent)
+ if rm_sent in grammar_dict:
+ rm_grammar = grammar_dict.pop(rm_sent)
os.remove(rm_grammar)
- self.grammar_files.append(sentence)
- self.grammar_dict[sentence] = grammar_file
+ grammar_files.append(sentence)
+ grammar_dict[sentence] = grammar_file
+ self.extractor_lock.release()
return grammar_file
- def decode(self, sentence):
+ def translate(self, sentence, ctx_name=None):
+ '''Decode a sentence (inc extracting a grammar if needed)
+ Threadsafe, FIFO'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
+ self.lazy_ctx(ctx_name)
# Empty in, empty out
if sentence.strip() == '':
+ lock.release()
return ''
if self.norm:
sentence = self.tokenize(sentence)
- logging.info('Normalized input: {}'.format(sentence))
- grammar_file = self.grammar(sentence)
+ logger.info('({}) Normalized input: {}'.format(ctx_name, sentence))
+ grammar_file = self.grammar(sentence, ctx_name)
+ decoder = self.decoders[ctx_name]
start_time = time.time()
- hyp = self.decoder.decode(sentence, grammar_file)
+ hyp = decoder.decoder.decode(sentence, grammar_file)
stop_time = time.time()
- logging.info('Translation time: {} seconds'.format(stop_time - start_time))
+ logger.info('({}) Translation time: {} seconds'.format(ctx_name, stop_time - start_time))
# Empty reference: HPYPLM does not learn prior to next translation
- self.ref_fifo.write('\n')
- self.ref_fifo.flush()
+ decoder.ref_fifo.write('\n')
+ decoder.ref_fifo.flush()
if self.norm:
- logging.info('Normalized translation: {}'.format(hyp))
+ logger.info('({}) Normalized translation: {}'.format(ctx_name, hyp))
hyp = self.detokenize(hyp)
+ lock.release()
return hyp
def tokenize(self, line):
+ self.tokenizer_lock.acquire()
self.tokenizer.stdin.write('{}\n'.format(line))
- return self.tokenizer.stdout.readline().strip()
+ tok_line = self.tokenizer.stdout.readline().strip()
+ self.tokenizer_lock.release()
+ return tok_line
def detokenize(self, line):
+ self.detokenizer_lock.acquire()
self.detokenizer.stdin.write('{}\n'.format(line))
- return self.detokenizer.stdout.readline().strip()
+ detok_line = self.detokenizer.stdout.readline().strip()
+ self.detokenizer_lock.release()
+ return detok_line
- def learn(self, source, target):
+ def command_line(self, line, ctx_name=None):
+ # COMMAND [ctx_name] ||| arg1 [||| arg2 ...]
+ args = [f.strip() for f in line.split('|||')]
+ if args[-1] == '':
+ args = args[:-1]
+ if len(args) > 0:
+ cmd_name = args[0].split()
+ # ctx_name provided
+ if len(cmd_name) == 2:
+ (cmd_name, ctx_name) = cmd_name
+ # ctx_name default/passed
+ else:
+ cmd_name = cmd_name[0]
+ (command, nargs) = self.COMMANDS.get(cmd_name, (None, None))
+ if command and len(args[1:]) in nargs:
+ logger.info('({}) {} ||| {}'.format(ctx_name, cmd_name, ' ||| '.join(args[1:])))
+ return command(*args[1:], ctx_name=ctx_name)
+ logger.info('ERROR: command: {}'.format(' ||| '.join(args)))
+
+ def learn(self, source, target, ctx_name=None):
+ '''Learn from training instance (inc extracting grammar if needed)
+ Threadsafe, FIFO'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
+ self.lazy_ctx(ctx_name)
if '' in (source.strip(), target.strip()):
- logging.info('Error empty source or target: {} ||| {}'.format(source, target))
+ logger.info('({}) ERROR: empty source or target: {} ||| {}'.format(ctx_name, source, target))
+ lock.release()
return
if self.norm:
source = self.tokenize(source)
target = self.tokenize(target)
+ # Align instance
+ alignment = self.aligner.align(source, target)
+ grammar_file = self.grammar(source, ctx_name)
# MIRA update before adding data to grammar extractor
- grammar_file = self.grammar(source)
- mira_log = self.decoder.update(source, grammar_file, target)
- logging.info('MIRA: {}'.format(mira_log))
+ decoder = self.decoders[ctx_name]
+ mira_log = decoder.decoder.update(source, grammar_file, target)
+ logger.info('({}) MIRA HBF: {}'.format(ctx_name, mira_log))
+ # Add to HPYPLM by writing to fifo (read on next translation)
+ logger.info('({}) Adding to HPYPLM: {}'.format(ctx_name, target))
+ decoder.ref_fifo.write('{}\n'.format(target))
+ decoder.ref_fifo.flush()
+ # Store incremental data for save/load
+ self.ctx_data[ctx_name].append((source, target, alignment))
# Add aligned sentence pair to grammar extractor
- alignment = self.aligner.align(source, target)
- logging.info('Adding instance: {} ||| {} ||| {}'.format(source, target, alignment))
- self.extractor.add_instance(source, target, alignment)
+ logger.info('({}) Adding to bitext: {} ||| {} ||| {}'.format(ctx_name, source, target, alignment))
+ self.extractor.add_instance(source, target, alignment, ctx_name)
# Clear (old) cached grammar
- rm_grammar = self.grammar_dict.pop(source)
+ rm_grammar = self.grammar_dict[ctx_name].pop(source)
os.remove(rm_grammar)
- # Add to HPYPLM by writing to fifo (read on next translation)
- logging.info('Adding to HPYPLM: {}'.format(target))
- self.ref_fifo.write('{}\n'.format(target))
- self.ref_fifo.flush()
+ lock.release()
+
+ def save_state(self, file_or_stringio=None, ctx_name=None):
+ '''Write state (several lines terminated by EOF line) to file, buffer, or stdout'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
+ self.lazy_ctx(ctx_name)
+ ctx_data = self.ctx_data[ctx_name]
+ # Filename, StringIO or None (stdout)
+ if file_or_stringio:
+ if isinstance(file_or_stringio, StringIO.StringIO):
+ out = file_or_stringio
+ else:
+ out = open(file_or_stringio, 'w')
+ else:
+ out = sys.stdout
+ logger.info('({}) Saving state with {} sentences'.format(ctx_name, len(ctx_data)))
+ out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights()))
+ for (source, target, alignment) in ctx_data:
+ out.write('{} ||| {} ||| {}\n'.format(source, target, alignment))
+ out.write('EOF\n')
+ # Close if file
+ if file_or_stringio and not isinstance(file_or_stringio, StringIO.StringIO):
+ out.close()
+ lock.release()
+
+ def load_state(self, file_or_stringio=None, ctx_name=None):
+ '''Load state (several lines terminated by EOF line) from file, buffer, or stdin.
+ Restarts context on any error.'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
+ self.lazy_ctx(ctx_name)
+ ctx_data = self.ctx_data[ctx_name]
+ decoder = self.decoders[ctx_name]
+ # Filename, StringIO, or None (stdin)
+ if file_or_stringio:
+ if isinstance(file_or_stringio, StringIO.StringIO):
+ input = file_or_stringio
+ else:
+ input = open(file_or_stringio)
+ else:
+ input = sys.stdin
+ # Non-initial load error
+ if ctx_data:
+ logger.info('({}) ERROR: Incremental data has already been added to context'.format(ctx_name))
+ logger.info(' State can only be loaded to a new context.')
+ lock.release()
+ return
+ # Many things can go wrong if bad state data is given
+ try:
+ # MIRA weights
+ line = input.readline().strip()
+ # Throws exception if bad line
+ decoder.decoder.set_weights(line)
+ logger.info('({}) Loading state...'.format(ctx_name))
+ start_time = time.time()
+ # Lines source ||| target ||| alignment
+ while True:
+ line = input.readline()
+ if not line:
+ raise Exception('End of file before EOF line')
+ line = line.strip()
+ if line == 'EOF':
+ break
+ (source, target, alignment) = line.split(' ||| ')
+ ctx_data.append((source, target, alignment))
+ # Extractor
+ self.extractor.add_instance(source, target, alignment, ctx_name)
+ # HPYPLM
+ hyp = decoder.decoder.decode(LIKELY_OOV)
+ decoder.ref_fifo.write('{}\n'.format(target))
+ decoder.ref_fifo.flush()
+ stop_time = time.time()
+ logger.info('({}) Loaded state with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time))
+ lock.release()
+ # Recover from bad load attempt by restarting context.
+ # Guaranteed not to cause data loss since only a new context can load state.
+ except:
+ logger.info('({}) ERROR: could not load state, restarting context'.format(ctx_name))
+ # ctx_name is already owned and needs to be restarted before other blocking threads use
+ self.drop_ctx(ctx_name, force=True)
+ self.lazy_ctx(ctx_name)
+ lock.release()
diff --git a/realtime/rt/util.py b/realtime/rt/util.py
index 10e94909..52767dac 100644
--- a/realtime/rt/util.py
+++ b/realtime/rt/util.py
@@ -1,4 +1,5 @@
import os
+import Queue
import subprocess
import sys
import threading
@@ -13,29 +14,58 @@ SA_INI_FILES = set((
'precompute_file',
))
+class FIFOLock:
+ '''Lock that preserves FIFO order of blocking threads'''
+
+ def __init__(self):
+ self.q = Queue.Queue()
+ self.i = 0
+ self.lock = threading.Lock()
+
+ def acquire(self):
+ self.lock.acquire()
+ self.i += 1
+ if self.i > 1:
+ event = threading.Event()
+ self.q.put(event)
+ self.lock.release()
+ event.wait()
+ return
+ self.lock.release()
+
+ def release(self):
+ self.lock.acquire()
+ self.i -= 1
+ if self.i > 0:
+ self.q.get().set()
+ self.lock.release()
+
def cdec_ini_for_config(config):
- cdec_ini_handle(config, os.path.basename, hpyplm_rm_ref)
+ # This is a list of (k, v), not a ConfigObj or dict
+ for i in range(len(config)):
+ if config[i][0] == 'feature_function':
+ if config[i][1].startswith('KLanguageModel'):
+ f = config[i][1].split()
+ f[-1] = 'mono.klm'
+ config[i][1] = ' '.join(f)
+ elif config[i][1].startswith('External'):
+ config[i][1] = 'External libcdec_ff_hpyplm.so corpus.hpyplm'
def cdec_ini_for_realtime(config, path, ref_fifo):
- cdec_ini_handle(config, lambda x: os.path.join(path, x), lambda x: hpyplm_add_ref(x, ref_fifo))
-
-def cdec_ini_handle(config, path_fn, hpyplm_fn):
# This is a list of (k, v), not a ConfigObj or dict
for i in range(len(config)):
if config[i][0] == 'feature_function':
if config[i][1].startswith('KLanguageModel'):
f = config[i][1].split()
- f[-1] = path_fn(f[-1])
+ f[-1] = os.path.join(path, f[-1])
config[i][1] = ' '.join(f)
elif config[i][1].startswith('External'):
f = config[i][1].split()
- if f[1].endswith('libcdec_ff_hpyplm.so'):
- # Modify paths
- for j in range(1, len(f)):
- if not f[j].startswith('-'):
- f[j] = path_fn(f[j])
- # Modify hpyplm args
- hpyplm_fn(f)
+ f[1] = os.path.join(path, f[1])
+ f[2] = os.path.join(path, f[2])
+ f.append('-r')
+ f.append(ref_fifo)
+ f.append('-t')
config[i][1] = ' '.join(f)
def consume_stream(stream):
@@ -44,18 +74,6 @@ def consume_stream(stream):
pass
threading.Thread(target=consume, args=(stream,)).start()
-def hpyplm_add_ref(f, ref):
- f.append('-r')
- f.append(ref)
- f.append('-t')
-
-def hpyplm_rm_ref(f):
- for i in range(1, len(f)):
- if f[i] == '-r':
- f.pop(i)
- f.pop(i)
- return
-
def popen_io(cmd):
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
consume_stream(p.stderr)