diff options
Diffstat (limited to 'realtime')
-rw-r--r-- | realtime/README.md | 18 | ||||
-rwxr-xr-x | realtime/mkconfig.py | 4 | ||||
-rwxr-xr-x | realtime/mkinput.py | 18 | ||||
-rwxr-xr-x | realtime/realtime.py | 95 | ||||
-rw-r--r-- | realtime/rt/__init__.py | 15 | ||||
-rw-r--r-- | realtime/rt/aligner.py | 34 | ||||
-rw-r--r-- | realtime/rt/decoder.py | 63 | ||||
-rw-r--r-- | realtime/rt/rt.py | 375 | ||||
-rw-r--r-- | realtime/rt/util.py | 66 |
9 files changed, 560 insertions, 128 deletions
diff --git a/realtime/README.md b/realtime/README.md index b37dddc8..e5290fc5 100644 --- a/realtime/README.md +++ b/realtime/README.md @@ -1 +1,17 @@ -More to come. +cdec Realtime +------------- + +Code by Michael Denkowski (http://www.cs.cmu.edu/~mdenkows/, mdenkows@cs.cmu.edu) + +``` +@misc{denkowski-proposal2013, + author = {Michael Denkowski}, + title = {Machine Translation for Human Translators}, + year = {2013}, + month = {May}, + day = {30}, + howpublished = {{Ph.D.} Thesis Proposal, Carnegie Mellon University} +} +``` + +For a full tutorial, see http://www.cs.cmu.edu/~mdenkows/cdec-realtime.html diff --git a/realtime/mkconfig.py b/realtime/mkconfig.py index 32388978..f193c57a 100755 --- a/realtime/mkconfig.py +++ b/realtime/mkconfig.py @@ -4,9 +4,11 @@ import os import shutil import sys +# Import first to make sure pycdec is on path +import rt + from cdec.configobj import ConfigObj -import rt def main(): diff --git a/realtime/mkinput.py b/realtime/mkinput.py new file mode 100755 index 00000000..a1b1256d --- /dev/null +++ b/realtime/mkinput.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +import itertools +import sys + +def main(): + + if len(sys.argv[1:]) < 2: + sys.stderr.write('usage: {} test.src test.ref [ctx_name] >test.input\n'.format(sys.argv[0])) + sys.exit(2) + + ctx_name = ' {}'.format(sys.argv[3]) if len(sys.argv[1:]) > 2 else '' + for (src, ref) in itertools.izip(open(sys.argv[1]), open(sys.argv[2])): + sys.stdout.write('TR{} ||| {}'.format(ctx_name, src)) + sys.stdout.write('LEARN{} ||| {} ||| {}'.format(ctx_name, src.strip(), ref)) + +if __name__ == '__main__': + main() diff --git a/realtime/realtime.py b/realtime/realtime.py index dff7e90c..ec15b59d 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -3,50 +3,109 @@ import argparse import logging import sys +import threading +import time import rt +ABOUT = '''Realtime adaptive translation with cdec (See README.md) + +Code by Michael Denkowski + +Citation: +@misc{denkowski-proposal2013, + author = {Michael Denkowski}, + title = {Machine Translation for Human Translators}, + year = {2013}, + month = {May}, + day = {30}, + howpublished = {{Ph.D.} Thesis Proposal, Carnegie Mellon University} +} + +''' + class Parser(argparse.ArgumentParser): def error(self, message): + sys.stderr.write(ABOUT) self.print_help() sys.stderr.write('\n{}\n'.format(message)) sys.exit(2) +def handle_line(translator, line, output, ctx_name): + res = translator.command_line(line, ctx_name) + if res: + output.write('{}\n'.format(res)) + output.flush() + +def test1(translator, input, output, ctx_name): + inp = open(input) + out = open(output, 'w') + for line in inp: + handle_line(translator, line.strip(), out, ctx_name) + out.close() + +def debug(translator, input): + # Test 1: multiple contexts + threads = [] + for i in range(4): + t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i))) + threads.append(t) + t.start() + time.sleep(30) + # Test 2: flood + out = open('{}.out.flood'.format(input), 'w') + inp = open(input) + while True: + line = inp.readline() + if not line: + break + line = line.strip() + t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None)) + threads.append(t) + t.start() + time.sleep(1) + translator.drop_ctx(None) + # Join test threads + for t in threads: + t.join() + def main(): - parser = Parser(description='Real-time adaptive translation with cdec.') - parser.add_argument('-c', '--config', required=True, help='Config directory (see README.md)') + parser = Parser() + parser.add_argument('-c', '--config', required=True, help='Config directory') + parser.add_argument('-s', '--state', help='Load state file to default context (saved incremental data)') parser.add_argument('-n', '--normalize', help='Normalize text (tokenize, translate, detokenize)', action='store_true') parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') parser.add_argument('-v', '--verbose', help='Info to stderr', action='store_true') + parser.add_argument('-D', '--debug-test', help='Run debug tests on input file') args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.INFO) - rtd = rt.RealtimeDecoder(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) + with rt.RealtimeTranslator(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as translator: + + # Debugging + if args.debug_test: + debug(translator, args.debug_test) + return + + # Load state if given + if args.state: + rtd.load_state(state) - try: + # Read lines and commands while True: line = sys.stdin.readline() if not line: break - input = [f.strip() for f in line.split('|||')] - if len(input) == 1: - hyp = rtd.decode(input[0]) - sys.stdout.write('{}\n'.format(hyp)) + line = line.strip() + res = translator.command_line(line) + if res: + sys.stdout.write('{}\n'.format(res)) sys.stdout.flush() - elif len(input) == 2: - rtd.learn(*input) - - # Clean exit on ctrl+c - except KeyboardInterrupt: - logging.info('Caught KeyboardInterrupt, exiting') - - # Cleanup - rtd.close() - + if __name__ == '__main__': main() diff --git a/realtime/rt/__init__.py b/realtime/rt/__init__.py index 738777f3..c76acc4d 100644 --- a/realtime/rt/__init__.py +++ b/realtime/rt/__init__.py @@ -1,3 +1,18 @@ +# Add pycdec to the Python path if user hasn't +import os +import sys +try: + import cdec +except ImportError as ie: + try: + pycdec = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'python') + sys.path.append(pycdec) + import cdec + except: + sys.stderr.write('Error: cannot import pycdec. Please check that cdec/python is built.\n') + raise ie + +# Regular init imports from rt import * import aligner import decoder diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index 3c6ea144..e1782496 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -2,14 +2,17 @@ import logging import os import sys import subprocess +import threading import util +logger = logging.getLogger('rt.aligner') + class ForceAligner: - def __init__(self, fwd_params, fwd_err, rev_params, rev_err): + def __init__(self, fwd_params, fwd_err, rev_params, rev_err, heuristic='grow-diag-final-and'): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) fast_align = os.path.join(cdec_root, 'word-aligner', 'fast_align') atools = os.path.join(cdec_root, 'utils', 'atools') @@ -18,21 +21,27 @@ class ForceAligner: fwd_cmd = [fast_align, '-i', '-', '-d', '-T', fwd_T, '-m', fwd_m, '-f', fwd_params] rev_cmd = [fast_align, '-i', '-', '-d', '-T', rev_T, '-m', rev_m, '-f', rev_params, '-r'] - tools_cmd = [atools, '-i', '-', '-j', '-', '-c', 'grow-diag-final-and'] + tools_cmd = [atools, '-i', '-', '-j', '-', '-c', heuristic] - logging.info('Executing: {}'.format(' '.join(fwd_cmd))) + logger.info('Executing: {}'.format(' '.join(fwd_cmd))) self.fwd_align = util.popen_io(fwd_cmd) - logging.info('Executing: {}'.format(' '.join(rev_cmd))) + logger.info('Executing: {}'.format(' '.join(rev_cmd))) self.rev_align = util.popen_io(rev_cmd) - logging.info('Executing: {}'.format(' '.join(tools_cmd))) + logger.info('Executing: {}'.format(' '.join(tools_cmd))) self.tools = util.popen_io(tools_cmd) + # Used to guarantee thread safety + self.lock = util.FIFOLock() + def align(self, source, target): + '''Threadsafe, FIFO''' return self.align_formatted('{} ||| {}'.format(source, target)) def align_formatted(self, line): + '''Threadsafe, FIFO''' + self.lock.acquire() self.fwd_align.stdin.write('{}\n'.format(line)) self.rev_align.stdin.write('{}\n'.format(line)) # f words ||| e words ||| links ||| score @@ -40,12 +49,21 @@ class ForceAligner: rev_line = self.rev_align.stdout.readline().split('|||')[2].strip() self.tools.stdin.write('{}\n'.format(fwd_line)) self.tools.stdin.write('{}\n'.format(rev_line)) - return self.tools.stdout.readline().strip() + al_line = self.tools.stdout.readline().strip() + self.lock.release() + return al_line - def close(self): + def close(self, force=False): + if not force: + self.lock.acquire() self.fwd_align.stdin.close() + self.fwd_align.wait() self.rev_align.stdin.close() + self.rev_align.wait() self.tools.stdin.close() + self.tools.wait() + if not force: + self.lock.release() def read_err(self, err): (T, m) = ('', '') diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 0a202fae..ed45c248 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -1,39 +1,80 @@ import logging import os import subprocess +import threading import util +logger = logging.getLogger('rt.decoder') + class Decoder: - def close(self): + def close(self, force=False): + if not force: + self.lock.acquire() self.decoder.stdin.close() + self.decoder.wait() + if not force: + self.lock.release() - def decode(self, sentence, grammar): - input = '<seg grammar="{g}">{s}</seg>\n'.format(s=sentence, g=grammar) + def decode(self, sentence, grammar=None): + '''Threadsafe, FIFO''' + self.lock.acquire() + input = '<seg grammar="{g}">{s}</seg>\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) self.decoder.stdin.write(input) - return self.decoder.stdout.readline().strip() + hyp = self.decoder.stdout.readline().strip() + self.lock.release() + return hyp class CdecDecoder(Decoder): - + def __init__(self, config, weights): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) decoder = os.path.join(cdec_root, 'decoder', 'cdec') decoder_cmd = [decoder, '-c', config, '-w', weights] - logging.info('Executing: {}'.format(' '.join(decoder_cmd))) + logger.info('Executing: {}'.format(' '.join(decoder_cmd))) self.decoder = util.popen_io(decoder_cmd) + self.lock = util.FIFOLock() class MIRADecoder(Decoder): def __init__(self, config, weights): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) mira = os.path.join(cdec_root, 'training', 'mira', 'kbest_cut_mira') # optimizer=2 step=0.001 best=500, k=500, uniq, stream mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t'] - logging.info('Executing: {}'.format(' '.join(mira_cmd))) + logger.info('Executing: {}'.format(' '.join(mira_cmd))) self.decoder = util.popen_io(mira_cmd) + self.lock = util.FIFOLock() + + def get_weights(self): + '''Threadsafe, FIFO''' + self.lock.acquire() + self.decoder.stdin.write('WEIGHTS ||| WRITE\n') + weights = self.decoder.stdout.readline().strip() + self.lock.release() + return weights + + def set_weights(self, w_line): + '''Threadsafe, FIFO''' + self.lock.acquire() + try: + # Check validity + for w_str in w_line.split(): + (k, v) = w_str.split('=') + float(v) + self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) + self.lock.release() + except: + self.lock.release() + raise Exception('Invalid weights line: {}'.format(w_line)) + def update(self, sentence, grammar, reference): - input = '<seg grammar="{g}">{s}</seg> ||| {r}\n'.format(s=sentence, g=grammar, r=reference) + '''Threadsafe, FIFO''' + self.lock.acquire() + input = 'LEARN ||| <seg grammar="{g}">{s}</seg> ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.decoder.stdin.write(input) - return self.decoder.stdout.readline().strip() + log = self.decoder.stdout.readline().strip() + self.lock.release() + return log diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 892cc217..c0aec410 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -5,33 +5,88 @@ import collections import logging import os import shutil -import sys +import StringIO import subprocess +import sys import tempfile +import threading import time -from cdec.configobj import ConfigObj -import cdec.sa - +import cdec import aligner import decoder import util +# Dummy input token that is unlikely to appear in normalized data (but no fatal errors if it does) +LIKELY_OOV = '(OOV)' + +logger = logging.getLogger('rt') + class RealtimeDecoder: + '''Do not use directly unless you know what you're doing. Use RealtimeTranslator.''' + + def __init__(self, configdir, tmpdir): + + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + self.tmp = tmpdir + os.mkdir(self.tmp) + + # HPYPLM reference stream + ref_fifo_file = os.path.join(self.tmp, 'ref.fifo') + os.mkfifo(ref_fifo_file) + self.ref_fifo = open(ref_fifo_file, 'w+') + # Start with empty line (do not learn prior to first input) + self.ref_fifo.write('\n') + self.ref_fifo.flush() + + # Decoder + decoder_config = [[f.strip() for f in line.split('=')] for line in open(os.path.join(configdir, 'cdec.ini'))] + util.cdec_ini_for_realtime(decoder_config, os.path.abspath(configdir), ref_fifo_file) + decoder_config_file = os.path.join(self.tmp, 'cdec.ini') + with open(decoder_config_file, 'w') as output: + for (k, v) in decoder_config: + output.write('{}={}\n'.format(k, v)) + decoder_weights = os.path.join(configdir, 'weights.final') + self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights) + + def close(self, force=False): + logger.info('Closing decoder and removing {}'.format(self.tmp)) + self.decoder.close(force) + self.ref_fifo.close() + shutil.rmtree(self.tmp) + +class RealtimeTranslator: + '''Main entry point into API: serves translations to any number of concurrent users''' def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + # name -> (method, set of possible nargs) + self.COMMANDS = { + 'TR': (self.translate, set((1,))), + 'LEARN': (self.learn, set((2,))), + 'SAVE': (self.save_state, set((0, 1))), + 'LOAD': (self.load_state, set((0, 1))), + 'DROP': (self.drop_ctx, set((0,))), + 'LIST': (self.list_ctx, set((0,))), + } + + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + ### Single instance for all contexts + self.config = configdir # Temporary work dir self.tmp = tempfile.mkdtemp(dir=tmpdir, prefix='realtime.') - logging.info('Using temp dir {}'.format(self.tmp)) + logger.info('Using temp dir {}'.format(self.tmp)) # Normalization self.norm = norm if self.norm: self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u']) + self.tokenizer_lock = util.FIFOLock() self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')]) + self.detokenizer_lock = util.FIFOLock() # Word aligner fwd_params = os.path.join(configdir, 'a.fwd_params') @@ -41,112 +96,302 @@ class RealtimeDecoder: self.aligner = aligner.ForceAligner(fwd_params, fwd_err, rev_params, rev_err) # Grammar extractor - sa_config = ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True) + sa_config = cdec.configobj.ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True) sa_config.filename = os.path.join(self.tmp, 'sa.ini') util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir)) sa_config.write() self.extractor = cdec.sa.GrammarExtractor(sa_config.filename, online=True) - self.grammar_files = collections.deque() - self.grammar_dict = {} self.cache_size = cache_size - # HPYPLM reference stream - ref_fifo_file = os.path.join(self.tmp, 'ref.fifo') - os.mkfifo(ref_fifo_file) - self.ref_fifo = open(ref_fifo_file, 'w+') - # Start with empty line (do not learn prior to first input) - self.ref_fifo.write('\n') - self.ref_fifo.flush() + ### One instance per context - # Decoder - decoder_config = [[f.strip() for f in line.split('=')] for line in open(os.path.join(configdir, 'cdec.ini'))] - util.cdec_ini_for_realtime(decoder_config, os.path.abspath(configdir), ref_fifo_file) - decoder_config_file = os.path.join(self.tmp, 'cdec.ini') - with open(decoder_config_file, 'w') as output: - for (k, v) in decoder_config: - output.write('{}={}\n'.format(k, v)) - decoder_weights = os.path.join(configdir, 'weights.final') - self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights) + self.ctx_names = set() + # All context-dependent operations are atomic + self.ctx_locks = collections.defaultdict(util.FIFOLock) + # ctx -> list of (source, target, alignment) + self.ctx_data = {} - def close(self): - logging.info('Closing processes') - self.aligner.close() - self.decoder.close() - self.ref_fifo.close() + # Grammar extractor is not threadsafe + self.extractor_lock = util.FIFOLock() + # ctx -> deque of file + self.grammar_files = {} + # ctx -> dict of {sentence: file} + self.grammar_dict = {} + + self.decoders = {} + + def __enter__(self): + return self + + def __exit__(self, ex_type, ex_value, ex_traceback): + # Force shutdown on exception + self.close(ex_type is not None) + + def close(self, force=False): + '''Cleanup''' + if force: + logger.info('Forced shutdown: stopping immediately') + for ctx_name in list(self.ctx_names): + self.drop_ctx(ctx_name, force) + logger.info('Closing processes') + self.aligner.close(force) if self.norm: + if not force: + self.tokenizer_lock.acquire() + self.detokenizer_lock.acquire() self.tokenizer.stdin.close() + self.tokenizer.wait() self.detokenizer.stdin.close() - logging.info('Deleting {}'.format(self.tmp)) + self.detokenizer.wait() + if not force: + self.tokenizer_lock.release() + self.detokenizer_lock.release() + logger.info('Deleting {}'.format(self.tmp)) shutil.rmtree(self.tmp) - def grammar(self, sentence): - grammar_file = self.grammar_dict.get(sentence, None) + def lazy_ctx(self, ctx_name): + '''Initialize a context (inc starting a new decoder) if needed. + NOT threadsafe, acquire ctx_name lock before calling.''' + if ctx_name in self.ctx_names: + return + logger.info('({}) New context'.format(ctx_name)) + self.ctx_names.add(ctx_name) + self.ctx_data[ctx_name] = [] + self.grammar_files[ctx_name] = collections.deque() + self.grammar_dict[ctx_name] = {} + tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) + self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) + + def drop_ctx(self, ctx_name=None, force=False): + '''Delete a context (inc stopping the decoder) + Threadsafe and FIFO unless forced.''' + lock = self.ctx_locks[ctx_name] + if not force: + lock.acquire() + if ctx_name not in self.ctx_names: + logger.info('({}) No context found, no action taken'.format(ctx_name)) + if not force: + lock.release() + return + logger.info('({}) Dropping context'.format(ctx_name)) + self.ctx_names.remove(ctx_name) + self.ctx_data.pop(ctx_name) + self.extractor.drop_ctx(ctx_name) + self.grammar_files.pop(ctx_name) + self.grammar_dict.pop(ctx_name) + self.decoders.pop(ctx_name).close(force) + self.ctx_locks.pop(ctx_name) + if not force: + lock.release() + + def list_ctx(self, ctx_name=None): + '''Return a string of active contexts''' + return 'ctx_name ||| {}'.format(' '.join(sorted(str(ctx_name) for ctx_name in self.ctx_names))) + + def grammar(self, sentence, ctx_name=None): + '''Extract a sentence-level grammar on demand (or return cached) + Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock + before calling.''' + self.extractor_lock.acquire() + self.lazy_ctx(ctx_name) + grammar_dict = self.grammar_dict[ctx_name] + grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: - logging.info('Grammar cache hit') + logger.info('({}) Grammar cache hit: {}'.format(ctx_name, grammar_file)) + self.extractor_lock.release() return grammar_file # Extract and cache - grammar_file = tempfile.mkstemp(dir=self.tmp, prefix='grammar.')[1] + (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') + os.close(fid) with open(grammar_file, 'w') as output: - for rule in self.extractor.grammar(sentence): + for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - if len(self.grammar_files) == self.cache_size: - rm_sent = self.grammar_files.popleft() + grammar_files = self.grammar_files[ctx_name] + if len(grammar_files) == self.cache_size: + rm_sent = grammar_files.popleft() # If not already removed by learn method - if rm_sent in self.grammar_dict: - rm_grammar = self.grammar_dict.pop(rm_sent) + if rm_sent in grammar_dict: + rm_grammar = grammar_dict.pop(rm_sent) os.remove(rm_grammar) - self.grammar_files.append(sentence) - self.grammar_dict[sentence] = grammar_file + grammar_files.append(sentence) + grammar_dict[sentence] = grammar_file + self.extractor_lock.release() return grammar_file - def decode(self, sentence): + def translate(self, sentence, ctx_name=None): + '''Decode a sentence (inc extracting a grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() + self.lazy_ctx(ctx_name) # Empty in, empty out if sentence.strip() == '': + lock.release() return '' if self.norm: sentence = self.tokenize(sentence) - logging.info('Normalized input: {}'.format(sentence)) - grammar_file = self.grammar(sentence) + logger.info('({}) Normalized input: {}'.format(ctx_name, sentence)) + grammar_file = self.grammar(sentence, ctx_name) + decoder = self.decoders[ctx_name] start_time = time.time() - hyp = self.decoder.decode(sentence, grammar_file) + hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() - logging.info('Translation time: {} seconds'.format(stop_time - start_time)) + logger.info('({}) Translation time: {} seconds'.format(ctx_name, stop_time - start_time)) # Empty reference: HPYPLM does not learn prior to next translation - self.ref_fifo.write('\n') - self.ref_fifo.flush() + decoder.ref_fifo.write('\n') + decoder.ref_fifo.flush() if self.norm: - logging.info('Normalized translation: {}'.format(hyp)) + logger.info('({}) Normalized translation: {}'.format(ctx_name, hyp)) hyp = self.detokenize(hyp) + lock.release() return hyp def tokenize(self, line): + self.tokenizer_lock.acquire() self.tokenizer.stdin.write('{}\n'.format(line)) - return self.tokenizer.stdout.readline().strip() + tok_line = self.tokenizer.stdout.readline().strip() + self.tokenizer_lock.release() + return tok_line def detokenize(self, line): + self.detokenizer_lock.acquire() self.detokenizer.stdin.write('{}\n'.format(line)) - return self.detokenizer.stdout.readline().strip() + detok_line = self.detokenizer.stdout.readline().strip() + self.detokenizer_lock.release() + return detok_line - def learn(self, source, target): + def command_line(self, line, ctx_name=None): + # COMMAND [ctx_name] ||| arg1 [||| arg2 ...] + args = [f.strip() for f in line.split('|||')] + if args[-1] == '': + args = args[:-1] + if len(args) > 0: + cmd_name = args[0].split() + # ctx_name provided + if len(cmd_name) == 2: + (cmd_name, ctx_name) = cmd_name + # ctx_name default/passed + else: + cmd_name = cmd_name[0] + (command, nargs) = self.COMMANDS.get(cmd_name, (None, None)) + if command and len(args[1:]) in nargs: + logger.info('({}) {} ||| {}'.format(ctx_name, cmd_name, ' ||| '.join(args[1:]))) + return command(*args[1:], ctx_name=ctx_name) + logger.info('ERROR: command: {}'.format(' ||| '.join(args))) + + def learn(self, source, target, ctx_name=None): + '''Learn from training instance (inc extracting grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() + self.lazy_ctx(ctx_name) if '' in (source.strip(), target.strip()): - logging.info('Error empty source or target: {} ||| {}'.format(source, target)) + logger.info('({}) ERROR: empty source or target: {} ||| {}'.format(ctx_name, source, target)) + lock.release() return if self.norm: source = self.tokenize(source) target = self.tokenize(target) + # Align instance + alignment = self.aligner.align(source, target) + grammar_file = self.grammar(source, ctx_name) # MIRA update before adding data to grammar extractor - grammar_file = self.grammar(source) - mira_log = self.decoder.update(source, grammar_file, target) - logging.info('MIRA: {}'.format(mira_log)) + decoder = self.decoders[ctx_name] + mira_log = decoder.decoder.update(source, grammar_file, target) + logger.info('({}) MIRA HBF: {}'.format(ctx_name, mira_log)) + # Add to HPYPLM by writing to fifo (read on next translation) + logger.info('({}) Adding to HPYPLM: {}'.format(ctx_name, target)) + decoder.ref_fifo.write('{}\n'.format(target)) + decoder.ref_fifo.flush() + # Store incremental data for save/load + self.ctx_data[ctx_name].append((source, target, alignment)) # Add aligned sentence pair to grammar extractor - alignment = self.aligner.align(source, target) - logging.info('Adding instance: {} ||| {} ||| {}'.format(source, target, alignment)) - self.extractor.add_instance(source, target, alignment) + logger.info('({}) Adding to bitext: {} ||| {} ||| {}'.format(ctx_name, source, target, alignment)) + self.extractor.add_instance(source, target, alignment, ctx_name) # Clear (old) cached grammar - rm_grammar = self.grammar_dict.pop(source) + rm_grammar = self.grammar_dict[ctx_name].pop(source) os.remove(rm_grammar) - # Add to HPYPLM by writing to fifo (read on next translation) - logging.info('Adding to HPYPLM: {}'.format(target)) - self.ref_fifo.write('{}\n'.format(target)) - self.ref_fifo.flush() + lock.release() + + def save_state(self, file_or_stringio=None, ctx_name=None): + '''Write state (several lines terminated by EOF line) to file, buffer, or stdout''' + lock = self.ctx_locks[ctx_name] + lock.acquire() + self.lazy_ctx(ctx_name) + ctx_data = self.ctx_data[ctx_name] + # Filename, StringIO or None (stdout) + if file_or_stringio: + if isinstance(file_or_stringio, StringIO.StringIO): + out = file_or_stringio + else: + out = open(file_or_stringio, 'w') + else: + out = sys.stdout + logger.info('({}) Saving state with {} sentences'.format(ctx_name, len(ctx_data))) + out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) + for (source, target, alignment) in ctx_data: + out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) + out.write('EOF\n') + # Close if file + if file_or_stringio and not isinstance(file_or_stringio, StringIO.StringIO): + out.close() + lock.release() + + def load_state(self, file_or_stringio=None, ctx_name=None): + '''Load state (several lines terminated by EOF line) from file, buffer, or stdin. + Restarts context on any error.''' + lock = self.ctx_locks[ctx_name] + lock.acquire() + self.lazy_ctx(ctx_name) + ctx_data = self.ctx_data[ctx_name] + decoder = self.decoders[ctx_name] + # Filename, StringIO, or None (stdin) + if file_or_stringio: + if isinstance(file_or_stringio, StringIO.StringIO): + input = file_or_stringio + else: + input = open(file_or_stringio) + else: + input = sys.stdin + # Non-initial load error + if ctx_data: + logger.info('({}) ERROR: Incremental data has already been added to context'.format(ctx_name)) + logger.info(' State can only be loaded to a new context.') + lock.release() + return + # Many things can go wrong if bad state data is given + try: + # MIRA weights + line = input.readline().strip() + # Throws exception if bad line + decoder.decoder.set_weights(line) + logger.info('({}) Loading state...'.format(ctx_name)) + start_time = time.time() + # Lines source ||| target ||| alignment + while True: + line = input.readline() + if not line: + raise Exception('End of file before EOF line') + line = line.strip() + if line == 'EOF': + break + (source, target, alignment) = line.split(' ||| ') + ctx_data.append((source, target, alignment)) + # Extractor + self.extractor.add_instance(source, target, alignment, ctx_name) + # HPYPLM + hyp = decoder.decoder.decode(LIKELY_OOV) + decoder.ref_fifo.write('{}\n'.format(target)) + decoder.ref_fifo.flush() + stop_time = time.time() + logger.info('({}) Loaded state with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time)) + lock.release() + # Recover from bad load attempt by restarting context. + # Guaranteed not to cause data loss since only a new context can load state. + except: + logger.info('({}) ERROR: could not load state, restarting context'.format(ctx_name)) + # ctx_name is already owned and needs to be restarted before other blocking threads use + self.drop_ctx(ctx_name, force=True) + self.lazy_ctx(ctx_name) + lock.release() diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 10e94909..52767dac 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -1,4 +1,5 @@ import os +import Queue import subprocess import sys import threading @@ -13,29 +14,58 @@ SA_INI_FILES = set(( 'precompute_file', )) +class FIFOLock: + '''Lock that preserves FIFO order of blocking threads''' + + def __init__(self): + self.q = Queue.Queue() + self.i = 0 + self.lock = threading.Lock() + + def acquire(self): + self.lock.acquire() + self.i += 1 + if self.i > 1: + event = threading.Event() + self.q.put(event) + self.lock.release() + event.wait() + return + self.lock.release() + + def release(self): + self.lock.acquire() + self.i -= 1 + if self.i > 0: + self.q.get().set() + self.lock.release() + def cdec_ini_for_config(config): - cdec_ini_handle(config, os.path.basename, hpyplm_rm_ref) + # This is a list of (k, v), not a ConfigObj or dict + for i in range(len(config)): + if config[i][0] == 'feature_function': + if config[i][1].startswith('KLanguageModel'): + f = config[i][1].split() + f[-1] = 'mono.klm' + config[i][1] = ' '.join(f) + elif config[i][1].startswith('External'): + config[i][1] = 'External libcdec_ff_hpyplm.so corpus.hpyplm' def cdec_ini_for_realtime(config, path, ref_fifo): - cdec_ini_handle(config, lambda x: os.path.join(path, x), lambda x: hpyplm_add_ref(x, ref_fifo)) - -def cdec_ini_handle(config, path_fn, hpyplm_fn): # This is a list of (k, v), not a ConfigObj or dict for i in range(len(config)): if config[i][0] == 'feature_function': if config[i][1].startswith('KLanguageModel'): f = config[i][1].split() - f[-1] = path_fn(f[-1]) + f[-1] = os.path.join(path, f[-1]) config[i][1] = ' '.join(f) elif config[i][1].startswith('External'): f = config[i][1].split() - if f[1].endswith('libcdec_ff_hpyplm.so'): - # Modify paths - for j in range(1, len(f)): - if not f[j].startswith('-'): - f[j] = path_fn(f[j]) - # Modify hpyplm args - hpyplm_fn(f) + f[1] = os.path.join(path, f[1]) + f[2] = os.path.join(path, f[2]) + f.append('-r') + f.append(ref_fifo) + f.append('-t') config[i][1] = ' '.join(f) def consume_stream(stream): @@ -44,18 +74,6 @@ def consume_stream(stream): pass threading.Thread(target=consume, args=(stream,)).start() -def hpyplm_add_ref(f, ref): - f.append('-r') - f.append(ref) - f.append('-t') - -def hpyplm_rm_ref(f): - for i in range(1, len(f)): - if f[i] == '-r': - f.pop(i) - f.pop(i) - return - def popen_io(cmd): p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) consume_stream(p.stderr) |