From 8b429e08566ffc640c2de0f0eca66c354c8377f9 Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Wed, 11 Sep 2013 16:20:36 -0400 Subject: Find pycdec --- realtime/rt/__init__.py | 15 +++++++++++++++ realtime/rt/aligner.py | 2 +- realtime/rt/decoder.py | 4 ++-- realtime/rt/rt.py | 8 +++----- 4 files changed, 21 insertions(+), 8 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/__init__.py b/realtime/rt/__init__.py index 738777f3..fbde8f4d 100644 --- a/realtime/rt/__init__.py +++ b/realtime/rt/__init__.py @@ -1,3 +1,18 @@ +# Add pycdec to the Python path if user hasn't +import os +import sys +try: + import cdec +except ImportError as ie: + try: + pycdec = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'python') + sys.path.append(pycdec) + import cdec + except: + sys.stderr.write('Error: cannot import pycdec. Please check the cdec/python is built.\n') + raise ie + +# Regular init imports from rt import * import aligner import decoder diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index 3c6ea144..80835412 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -9,7 +9,7 @@ class ForceAligner: def __init__(self, fwd_params, fwd_err, rev_params, rev_err): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) fast_align = os.path.join(cdec_root, 'word-aligner', 'fast_align') atools = os.path.join(cdec_root, 'utils', 'atools') diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 0a202fae..34b5d391 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -17,7 +17,7 @@ class Decoder: class CdecDecoder(Decoder): def __init__(self, config, weights): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) decoder = os.path.join(cdec_root, 'decoder', 'cdec') decoder_cmd = [decoder, '-c', config, '-w', weights] logging.info('Executing: {}'.format(' '.join(decoder_cmd))) @@ -26,7 +26,7 @@ class CdecDecoder(Decoder): class MIRADecoder(Decoder): def __init__(self, config, weights): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) mira = os.path.join(cdec_root, 'training', 'mira', 'kbest_cut_mira') # optimizer=2 step=0.001 best=500, k=500, uniq, stream mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t'] diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 892cc217..4dc2de09 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -10,9 +10,7 @@ import subprocess import tempfile import time -from cdec.configobj import ConfigObj -import cdec.sa - +import cdec import aligner import decoder import util @@ -21,7 +19,7 @@ class RealtimeDecoder: def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False): - cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Temporary work dir self.tmp = tempfile.mkdtemp(dir=tmpdir, prefix='realtime.') @@ -41,7 +39,7 @@ class RealtimeDecoder: self.aligner = aligner.ForceAligner(fwd_params, fwd_err, rev_params, rev_err) # Grammar extractor - sa_config = ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True) + sa_config = cdec.ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True) sa_config.filename = os.path.join(self.tmp, 'sa.ini') util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir)) sa_config.write() -- cgit v1.2.3 From 2cfa58d956e6fa4b5e4c2ff0e283c50bd6bed799 Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Sat, 14 Sep 2013 13:42:17 -0400 Subject: Package name --- realtime/rt/rt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'realtime/rt') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 4dc2de09..1b8ac58c 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -39,7 +39,7 @@ class RealtimeDecoder: self.aligner = aligner.ForceAligner(fwd_params, fwd_err, rev_params, rev_err) # Grammar extractor - sa_config = cdec.ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True) + sa_config = cdec.configobj.ConfigObj(os.path.join(configdir, 'sa.ini'), unrepr=True) sa_config.filename = os.path.join(self.tmp, 'sa.ini') util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir)) sa_config.write() -- cgit v1.2.3 From b67c9b33ca9e91fc0200a6687fc76f73c0a022f3 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Sun, 15 Sep 2013 12:01:44 -0700 Subject: Fixed pathes for mkconfig --- realtime/rt/util.py | 39 +++++++++++++++------------------------ 1 file changed, 15 insertions(+), 24 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 10e94909..b823e12f 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -14,28 +14,31 @@ SA_INI_FILES = set(( )) def cdec_ini_for_config(config): - cdec_ini_handle(config, os.path.basename, hpyplm_rm_ref) + # This is a list of (k, v), not a ConfigObj or dict + for i in range(len(config)): + if config[i][0] == 'feature_function': + if config[i][1].startswith('KLanguageModel'): + f = config[i][1].split() + f[-1] = 'mono.klm' + config[i][1] = ' '.join(f) + elif config[i][1].startswith('External'): + config[i][1] = 'External libcdec_ff_hpyplm.so corpus.hpyplm' def cdec_ini_for_realtime(config, path, ref_fifo): - cdec_ini_handle(config, lambda x: os.path.join(path, x), lambda x: hpyplm_add_ref(x, ref_fifo)) - -def cdec_ini_handle(config, path_fn, hpyplm_fn): # This is a list of (k, v), not a ConfigObj or dict for i in range(len(config)): if config[i][0] == 'feature_function': if config[i][1].startswith('KLanguageModel'): f = config[i][1].split() - f[-1] = path_fn(f[-1]) + f[-1] = os.path.join(path, f[-1]) config[i][1] = ' '.join(f) elif config[i][1].startswith('External'): f = config[i][1].split() - if f[1].endswith('libcdec_ff_hpyplm.so'): - # Modify paths - for j in range(1, len(f)): - if not f[j].startswith('-'): - f[j] = path_fn(f[j]) - # Modify hpyplm args - hpyplm_fn(f) + f[1] = os.path.join(path, f[1]) + f[2] = os.path.join(path, f[2]) + f.append('-r') + f.append(ref) + f.append('-t') config[i][1] = ' '.join(f) def consume_stream(stream): @@ -44,18 +47,6 @@ def consume_stream(stream): pass threading.Thread(target=consume, args=(stream,)).start() -def hpyplm_add_ref(f, ref): - f.append('-r') - f.append(ref) - f.append('-t') - -def hpyplm_rm_ref(f): - for i in range(1, len(f)): - if f[i] == '-r': - f.pop(i) - f.pop(i) - return - def popen_io(cmd): p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) consume_stream(p.stderr) -- cgit v1.2.3 From 58763340c02ddafa056d0a00a061cecfb33c9c0c Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Sun, 15 Sep 2013 19:51:08 -0700 Subject: Typo fix --- realtime/rt/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'realtime/rt') diff --git a/realtime/rt/util.py b/realtime/rt/util.py index b823e12f..6e07f116 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -37,7 +37,7 @@ def cdec_ini_for_realtime(config, path, ref_fifo): f[1] = os.path.join(path, f[1]) f[2] = os.path.join(path, f[2]) f.append('-r') - f.append(ref) + f.append(ref_fifo) f.append('-t') config[i][1] = ' '.join(f) -- cgit v1.2.3 From 895dfd64ea5599ab16981cbfb538ec5f4073c8c1 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Sun, 15 Sep 2013 20:32:59 -0700 Subject: Move to using named commands --- realtime/mkinput.py | 17 +++++++++++++++++ realtime/realtime.py | 4 ++-- realtime/rt/decoder.py | 2 +- realtime/rt/rt.py | 8 ++++++++ training/mira/kbest_cut_mira.cc | 15 ++++++++++++--- 5 files changed, 40 insertions(+), 6 deletions(-) create mode 100755 realtime/mkinput.py (limited to 'realtime/rt') diff --git a/realtime/mkinput.py b/realtime/mkinput.py new file mode 100755 index 00000000..897b44fd --- /dev/null +++ b/realtime/mkinput.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python + +import itertools +import sys + +def main(): + + if len(sys.argv[1:]) != 2: + sys.stderr.write('usage: {} test.src test.ref >test.input\n'.format(sys.argv[0])) + sys.exit(2) + + for (src, ref) in itertools.izip(open(sys.argv[1]), open(sys.argv[2])): + sys.stdout.write('{}'.format(src)) + sys.stdout.write('LEARN ||| {} ||| {}'.format(src.strip(), ref)) + +if __name__ == '__main__': + main() diff --git a/realtime/realtime.py b/realtime/realtime.py index dff7e90c..554c52ca 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -38,8 +38,8 @@ def main(): hyp = rtd.decode(input[0]) sys.stdout.write('{}\n'.format(hyp)) sys.stdout.flush() - elif len(input) == 2: - rtd.learn(*input) + else: + rtd.command(input) # Clean exit on ctrl+c except KeyboardInterrupt: diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 34b5d391..57739d93 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -34,6 +34,6 @@ class MIRADecoder(Decoder): self.decoder = util.popen_io(mira_cmd) def update(self, sentence, grammar, reference): - input = '{s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) + input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.decoder.stdin.write(input) return self.decoder.stdout.readline().strip() diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 1b8ac58c..0ce05a56 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -19,6 +19,8 @@ class RealtimeDecoder: def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False): + self.commands = {'LEARN': self.learn} + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Temporary work dir @@ -126,6 +128,12 @@ class RealtimeDecoder: self.detokenizer.stdin.write('{}\n'.format(line)) return self.detokenizer.stdout.readline().strip() + def command(self, args): + try: + self.commands[args[0]](*args[1:]) + except: + logging.info('Command error: {}'.format(' ||| '.join(args))) + def learn(self, source, target): if '' in (source.strip(), target.strip()): logging.info('Error empty source or target: {} ||| {}'.format(source, target)) diff --git a/training/mira/kbest_cut_mira.cc b/training/mira/kbest_cut_mira.cc index e4435abb..a9a4aeb6 100644 --- a/training/mira/kbest_cut_mira.cc +++ b/training/mira/kbest_cut_mira.cc @@ -734,10 +734,19 @@ int main(int argc, char** argv) { ViterbiESentence(bobs.hypergraph[0], &trans); cout << TD::GetString(trans) << endl; continue; - // Translate and update (normal MIRA) + // Special command: + // CMD ||| arg1 ||| arg2 ... } else { - ds->update(buf.substr(delim + 5)); - buf = buf.substr(0, delim); + string cmd = buf.substr(0, delim); + buf = buf.substr(delim + 5); + // Translate and update (normal MIRA) + // LEARN ||| source ||| reference + if (cmd == "LEARN") { + delim = buf.find(" ||| "); + ds->update(buf.substr(delim + 5)); + buf = buf.substr(0, delim); + } + // TODO: additional commands } } //TODO: allow batch updating -- cgit v1.2.3 From f96397d3a34fd5279c92a21f66cfa7538f60cb52 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Tue, 17 Sep 2013 22:05:32 -0700 Subject: Save/load state in realtime --- realtime/realtime.py | 48 +++++++++++++++++----------------- realtime/rt/__init__.py | 2 +- realtime/rt/decoder.py | 11 ++++++-- realtime/rt/rt.py | 68 ++++++++++++++++++++++++++++++++++++++++++++----- utils/weights.cc | 10 +++----- 5 files changed, 101 insertions(+), 38 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/realtime.py b/realtime/realtime.py index 554c52ca..a6ed1511 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -15,8 +15,9 @@ class Parser(argparse.ArgumentParser): def main(): - parser = Parser(description='Real-time adaptive translation with cdec.') - parser.add_argument('-c', '--config', required=True, help='Config directory (see README.md)') + parser = Parser(description='Real-time adaptive translation with cdec. (See README.md)') + parser.add_argument('-c', '--config', required=True, help='Config directory') + parser.add_argument('-s', '--state', help='Load state file (saved incremental data)') parser.add_argument('-n', '--normalize', help='Normalize text (tokenize, translate, detokenize)', action='store_true') parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') @@ -26,27 +27,28 @@ def main(): if args.verbose: logging.basicConfig(level=logging.INFO) - rtd = rt.RealtimeDecoder(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) - - try: - while True: - line = sys.stdin.readline() - if not line: - break - input = [f.strip() for f in line.split('|||')] - if len(input) == 1: - hyp = rtd.decode(input[0]) - sys.stdout.write('{}\n'.format(hyp)) - sys.stdout.flush() - else: - rtd.command(input) - - # Clean exit on ctrl+c - except KeyboardInterrupt: - logging.info('Caught KeyboardInterrupt, exiting') - - # Cleanup - rtd.close() + with rt.RealtimeDecoder(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as rtd: + + try: + # Load state if given + if args.state: + rtd.load_state(args.state) + # Read lines and commands + while True: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if '|||' in line: + rtd.command_line(line) + else: + hyp = rtd.decode(line) + sys.stdout.write('{}\n'.format(hyp)) + sys.stdout.flush() + + # Clean exit on ctrl+c + except KeyboardInterrupt: + logging.info('Caught KeyboardInterrupt, exiting') if __name__ == '__main__': main() diff --git a/realtime/rt/__init__.py b/realtime/rt/__init__.py index fbde8f4d..c76acc4d 100644 --- a/realtime/rt/__init__.py +++ b/realtime/rt/__init__.py @@ -9,7 +9,7 @@ except ImportError as ie: sys.path.append(pycdec) import cdec except: - sys.stderr.write('Error: cannot import pycdec. Please check the cdec/python is built.\n') + sys.stderr.write('Error: cannot import pycdec. Please check that cdec/python is built.\n') raise ie # Regular init imports diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 57739d93..aa6db64d 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -9,8 +9,8 @@ class Decoder: def close(self): self.decoder.stdin.close() - def decode(self, sentence, grammar): - input = '{s}\n'.format(s=sentence, g=grammar) + def decode(self, sentence, grammar=None): + input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) self.decoder.stdin.write(input) return self.decoder.stdout.readline().strip() @@ -33,6 +33,13 @@ class MIRADecoder(Decoder): logging.info('Executing: {}'.format(' '.join(mira_cmd))) self.decoder = util.popen_io(mira_cmd) + def get_weights(self): + self.decoder.stdin.write('WEIGHTS ||| WRITE\n') + return self.decoder.stdout.readline().strip() + + def set_weights(self, w_line): + self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) + def update(self, sentence, grammar, reference): input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.decoder.stdin.write(input) diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 0ce05a56..fedc1fcf 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -15,14 +15,18 @@ import aligner import decoder import util +LIKELY_OOV = '("OOV")' + class RealtimeDecoder: - def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False): + def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False, state=None): - self.commands = {'LEARN': self.learn} + self.commands = {'LEARN': self.learn, 'SAVE': self.save_state, 'LOAD': self.load_state} cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + self.inc_data = [] # instances of (source, target) + # Temporary work dir self.tmp = tempfile.mkdtemp(dir=tmpdir, prefix='realtime.') logging.info('Using temp dir {}'.format(self.tmp)) @@ -68,6 +72,17 @@ class RealtimeDecoder: decoder_weights = os.path.join(configdir, 'weights.final') self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights) + # Load state if given + if state: + with open(state) as input: + self.load_state(input) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + def close(self): logging.info('Closing processes') self.aligner.close() @@ -128,9 +143,13 @@ class RealtimeDecoder: self.detokenizer.stdin.write('{}\n'.format(line)) return self.detokenizer.stdout.readline().strip() - def command(self, args): + def command_line(self, line): + args = [f.strip() for f in line.split('|||')] try: - self.commands[args[0]](*args[1:]) + if len(args) == 2 and not args[1]: + self.commands[args[0]]() + else: + self.commands[args[0]](*args[1:]) except: logging.info('Command error: {}'.format(' ||| '.join(args))) @@ -145,9 +164,12 @@ class RealtimeDecoder: grammar_file = self.grammar(source) mira_log = self.decoder.update(source, grammar_file, target) logging.info('MIRA: {}'.format(mira_log)) - # Add aligned sentence pair to grammar extractor + # Align instance alignment = self.aligner.align(source, target) - logging.info('Adding instance: {} ||| {} ||| {}'.format(source, target, alignment)) + # Store incremental data for save/load + self.inc_data.append((source, target, alignment)) + # Add aligned sentence pair to grammar extractor + logging.info('Adding to bitext: {} ||| {} ||| {}'.format(source, target, alignment)) self.extractor.add_instance(source, target, alignment) # Clear (old) cached grammar rm_grammar = self.grammar_dict.pop(source) @@ -156,3 +178,37 @@ class RealtimeDecoder: logging.info('Adding to HPYPLM: {}'.format(target)) self.ref_fifo.write('{}\n'.format(target)) self.ref_fifo.flush() + + def save_state(self): + logging.info('Saving state with {} sentences'.format(len(self.inc_data))) + sys.stdout.write('{}\n'.format(self.decoder.get_weights())) + for (source, target, alignment) in self.inc_data: + sys.stdout.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) + sys.stdout.write('EOF\n') + + def load_state(self, input=sys.stdin): + # Non-initial load error + if self.inc_data: + logging.info('Error: Incremental data has already been added to decoder.') + logging.info(' State can only be loaded by a freshly started decoder.') + return + # MIRA weights + line = input.readline().strip() + self.decoder.set_weights(line) + logging.info('Loading state...') + start_time = time.time() + # Lines source ||| target ||| alignment + while True: + line = input.readline().strip() + if line == 'EOF': + break + (source, target, alignment) = line.split(' ||| ') + self.inc_data.append((source, target, alignment)) + # Extractor + self.extractor.add_instance(source, target, alignment) + # HPYPLM + hyp = self.decoder.decode(LIKELY_OOV) + self.ref_fifo.write('{}\n'.format(target)) + self.ref_fifo.flush() + stop_time = time.time() + logging.info('Loaded state with {} sentences in {} seconds'.format(len(self.inc_data), stop_time - start_time)) diff --git a/utils/weights.cc b/utils/weights.cc index 1284f686..effdfc5e 100644 --- a/utils/weights.cc +++ b/utils/weights.cc @@ -163,15 +163,13 @@ string Weights::GetString(const vector& w, os.precision(17); int nf = FD::NumFeats(); for (unsigned i = 1; i < nf; i++) { - if (hide_zero_value_features && w[i] == 0.0) { + weight_t val = (i < w.size() ? w[i] : 0.0); + if (hide_zero_value_features && val == 0.0) { continue; } - os << FD::Convert(i) << '=' << w[i]; - if (i < nf - 1) { - os << ' '; - } + os << ' ' << FD::Convert(i) << '=' << val; } - return os.str(); + return os.str().substr(1); } void Weights::UpdateFromString(string& w_string, -- cgit v1.2.3 From f650759280f7ceb71e36be3ac834f3046112d037 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Wed, 18 Sep 2013 11:01:41 -0700 Subject: Support writing state to file --- realtime/rt/rt.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index fedc1fcf..b1cf19ef 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -179,12 +179,15 @@ class RealtimeDecoder: self.ref_fifo.write('{}\n'.format(target)) self.ref_fifo.flush() - def save_state(self): + def save_state(self, filename=None): + out = open(filename, 'w') if filename else sys.stdout logging.info('Saving state with {} sentences'.format(len(self.inc_data))) - sys.stdout.write('{}\n'.format(self.decoder.get_weights())) + out.write('{}\n'.format(self.decoder.get_weights())) for (source, target, alignment) in self.inc_data: - sys.stdout.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) - sys.stdout.write('EOF\n') + out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) + out.write('EOF\n') + if filename: + out.close() def load_state(self, input=sys.stdin): # Non-initial load error -- cgit v1.2.3 From 903339fdd7c294a72a8b2066550dbb5dd78f5384 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Sun, 22 Sep 2013 22:49:46 -0400 Subject: Don't leak open files. --- realtime/rt/rt.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'realtime/rt') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index b1cf19ef..033ed790 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -101,7 +101,8 @@ class RealtimeDecoder: logging.info('Grammar cache hit') return grammar_file # Extract and cache - grammar_file = tempfile.mkstemp(dir=self.tmp, prefix='grammar.')[1] + (fid, grammar_file) = tempfile.mkstemp(dir=self.tmp, prefix='grammar.') + os.close(fid) with open(grammar_file, 'w') as output: for rule in self.extractor.grammar(sentence): output.write('{}\n'.format(str(rule))) -- cgit v1.2.3 From 5866bdb0541bf136d897cc8ecc72c5ed4b6a93ee Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Wed, 25 Sep 2013 16:20:51 -0700 Subject: Super multi-user thread safety update --- realtime/realtime.py | 43 +++++---- realtime/rt/aligner.py | 11 ++- realtime/rt/decoder.py | 32 ++++++- realtime/rt/rt.py | 250 +++++++++++++++++++++++++++++++++++-------------- 4 files changed, 241 insertions(+), 95 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/realtime.py b/realtime/realtime.py index 3c384fa2..282d3311 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -2,7 +2,9 @@ import argparse import logging +import signal import sys +import threading import rt @@ -22,34 +24,37 @@ def main(): parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') parser.add_argument('-v', '--verbose', help='Info to stderr', action='store_true') + parser.add_argument('-D', '--debug-test', help='Test thread safety (debug use only)', action='store_true') args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.INFO) - with rt.RealtimeDecoder(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as rtd: + with rt.RealtimeTranslator(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as translator: - try: # Load state if given if args.state: with open(args.state) as input: rtd.load_state(input) - # Read lines and commands - while True: - line = sys.stdin.readline() - if not line: - break - line = line.strip() - if '|||' in line: - rtd.command_line(line) - else: - hyp = rtd.decode(line) - sys.stdout.write('{}\n'.format(hyp)) - sys.stdout.flush() - - # Clean exit on ctrl+c - except KeyboardInterrupt: - logging.info('Caught KeyboardInterrupt, exiting') - + if not args.debug_test: + run(translator) + else: + # TODO: write test + run(translator) + +def run(translator, input=sys.stdin, output=sys.stdout, ctx_name=None): + # Read lines and commands + while True: + line = input.readline() + if not line: + break + line = line.strip() + if '|||' in line: + translator.command_line(line, ctx_name) + else: + hyp = translator.decode(line, ctx_name) + output.write('{}\n'.format(hyp)) + output.flush() + if __name__ == '__main__': main() diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index 80835412..a14121db 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -2,6 +2,7 @@ import logging import os import sys import subprocess +import threading import util @@ -29,10 +30,16 @@ class ForceAligner: logging.info('Executing: {}'.format(' '.join(tools_cmd))) self.tools = util.popen_io(tools_cmd) + # Used to guarantee thread safety + self.semaphore = threading.Semaphore() + def align(self, source, target): + '''Threadsafe''' return self.align_formatted('{} ||| {}'.format(source, target)) def align_formatted(self, line): + '''Threadsafe''' + self.semaphore.acquire() self.fwd_align.stdin.write('{}\n'.format(line)) self.rev_align.stdin.write('{}\n'.format(line)) # f words ||| e words ||| links ||| score @@ -40,7 +47,9 @@ class ForceAligner: rev_line = self.rev_align.stdout.readline().split('|||')[2].strip() self.tools.stdin.write('{}\n'.format(fwd_line)) self.tools.stdin.write('{}\n'.format(rev_line)) - return self.tools.stdout.readline().strip() + al_line = self.tools.stdout.readline().strip() + self.semaphore.release() + return al_line def close(self): self.fwd_align.stdin.close() diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index aa6db64d..72b5b959 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -1,27 +1,37 @@ import logging import os import subprocess +import threading import util class Decoder: - def close(self): + def close(self, force=False): + if not force: + self.semaphore.acquire() self.decoder.stdin.close() + if not force: + self.semaphore.release() def decode(self, sentence, grammar=None): + '''Threadsafe''' input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) + self.semaphore.acquire() self.decoder.stdin.write(input) - return self.decoder.stdout.readline().strip() + hyp = self.decoder.stdout.readline().strip() + self.semaphore.release() + return hyp class CdecDecoder(Decoder): - + def __init__(self, config, weights): cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) decoder = os.path.join(cdec_root, 'decoder', 'cdec') decoder_cmd = [decoder, '-c', config, '-w', weights] logging.info('Executing: {}'.format(' '.join(decoder_cmd))) self.decoder = util.popen_io(decoder_cmd) + self.semaphore = threading.Semaphore() class MIRADecoder(Decoder): @@ -32,15 +42,27 @@ class MIRADecoder(Decoder): mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t'] logging.info('Executing: {}'.format(' '.join(mira_cmd))) self.decoder = util.popen_io(mira_cmd) + self.semaphore = threading.Semaphore() def get_weights(self): + '''Threadsafe''' + self.semaphore.acquire() self.decoder.stdin.write('WEIGHTS ||| WRITE\n') - return self.decoder.stdout.readline().strip() + weights = self.decoder.stdout.readline().strip() + self.semaphore.release() + return weights def set_weights(self, w_line): + '''Threadsafe''' + self.semaphore.acquire() self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) + self.semaphore.release() def update(self, sentence, grammar, reference): + '''Threadsafe''' input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) + self.semaphore.acquire() self.decoder.stdin.write(input) - return self.decoder.stdout.readline().strip() + log = self.decoder.stdout.readline().strip() + self.semaphore.release() + return log diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 033ed790..6f1fb70f 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -8,6 +8,7 @@ import shutil import sys import subprocess import tempfile +import threading import time import cdec @@ -15,18 +16,56 @@ import aligner import decoder import util -LIKELY_OOV = '("OOV")' +# Dummy input token that is unlikely to appear in normalized data (but no fatal errors if it does) +LIKELY_OOV = '(OOV)' class RealtimeDecoder: + '''Do not use directly unless you know what you're doing. Use RealtimeTranslator.''' + + def __init__(self, configdir, tmpdir): + + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + self.tmp = tmpdir + os.mkdir(self.tmp) + + # HPYPLM reference stream + ref_fifo_file = os.path.join(self.tmp, 'ref.fifo') + os.mkfifo(ref_fifo_file) + self.ref_fifo = open(ref_fifo_file, 'w+') + # Start with empty line (do not learn prior to first input) + self.ref_fifo.write('\n') + self.ref_fifo.flush() + + # Decoder + decoder_config = [[f.strip() for f in line.split('=')] for line in open(os.path.join(configdir, 'cdec.ini'))] + util.cdec_ini_for_realtime(decoder_config, os.path.abspath(configdir), ref_fifo_file) + decoder_config_file = os.path.join(self.tmp, 'cdec.ini') + with open(decoder_config_file, 'w') as output: + for (k, v) in decoder_config: + output.write('{}={}\n'.format(k, v)) + decoder_weights = os.path.join(configdir, 'weights.final') + self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights) + + def close(self, force=False): + logging.info('Closing decoder and removing {}'.format(self.tmp)) + self.decoder.close(force) + self.ref_fifo.close() + shutil.rmtree(self.tmp) + +class RealtimeTranslator: + '''Main entry point into API: serves translations to any number of concurrent users''' def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False, state=None): + # TODO: save/load self.commands = {'LEARN': self.learn, 'SAVE': self.save_state, 'LOAD': self.load_state} cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - self.inc_data = [] # instances of (source, target) + ### Single instance for all contexts + self.config = configdir # Temporary work dir self.tmp = tempfile.mkdtemp(dir=tmpdir, prefix='realtime.') logging.info('Using temp dir {}'.format(self.tmp)) @@ -35,7 +74,9 @@ class RealtimeDecoder: self.norm = norm if self.norm: self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u']) + self.tokenizer_sem = threading.Semaphore() self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')]) + self.detokenizer_sem = threading.Semaphore() # Word aligner fwd_params = os.path.join(configdir, 'a.fwd_params') @@ -50,28 +91,24 @@ class RealtimeDecoder: util.sa_ini_for_realtime(sa_config, os.path.abspath(configdir)) sa_config.write() self.extractor = cdec.sa.GrammarExtractor(sa_config.filename, online=True) - self.grammar_files = collections.deque() - self.grammar_dict = {} self.cache_size = cache_size - # HPYPLM reference stream - ref_fifo_file = os.path.join(self.tmp, 'ref.fifo') - os.mkfifo(ref_fifo_file) - self.ref_fifo = open(ref_fifo_file, 'w+') - # Start with empty line (do not learn prior to first input) - self.ref_fifo.write('\n') - self.ref_fifo.flush() + ### One instance per context - # Decoder - decoder_config = [[f.strip() for f in line.split('=')] for line in open(os.path.join(configdir, 'cdec.ini'))] - util.cdec_ini_for_realtime(decoder_config, os.path.abspath(configdir), ref_fifo_file) - decoder_config_file = os.path.join(self.tmp, 'cdec.ini') - with open(decoder_config_file, 'w') as output: - for (k, v) in decoder_config: - output.write('{}={}\n'.format(k, v)) - decoder_weights = os.path.join(configdir, 'weights.final') - self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights) + self.ctx_names = set() + # All context-dependent operations are atomic + self.ctx_sems = collections.defaultdict(threading.Semaphore) + # ctx -> list of (source, target, alignment) + self.ctx_data = {} + + # ctx -> deque of file + self.grammar_files = {} + # ctx -> dict of {sentence: file} + self.grammar_dict = {} + self.decoders = {} + + # TODO: state # Load state if given if state: with open(state) as input: @@ -80,125 +117,197 @@ class RealtimeDecoder: def __enter__(self): return self - def __exit__(self, type, value, traceback): - self.close() + def __exit__(self, ex_type, ex_value, ex_traceback): + self.close(ex_type is KeyboardInterrupt) - def close(self): + def close(self, force=False): + '''Cleanup''' + if force: + logging.info('Forced shutdown: stopping immediately') + for ctx_name in list(self.ctx_names): + self.drop_ctx(ctx_name, force) logging.info('Closing processes') self.aligner.close() - self.decoder.close() - self.ref_fifo.close() if self.norm: self.tokenizer.stdin.close() self.detokenizer.stdin.close() logging.info('Deleting {}'.format(self.tmp)) shutil.rmtree(self.tmp) - def grammar(self, sentence): - grammar_file = self.grammar_dict.get(sentence, None) + def lazy_ctx(self, ctx_name): + '''Initialize a context (inc starting a new decoder) if needed''' + self.ctx_sems[ctx_name].acquire() + if ctx_name in self.ctx_names: + self.ctx_sems[ctx_name].release() + return + logging.info('New context: {}'.format(ctx_name)) + self.ctx_names.add(ctx_name) + self.ctx_data[ctx_name] = [] + self.grammar_files[ctx_name] = collections.deque() + self.grammar_dict[ctx_name] = {} + tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) + self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) + self.ctx_sems[ctx_name].release() + + def drop_ctx(self, ctx_name, force=False): + '''Delete a context (inc stopping the decoder)''' + if not force: + sem = self.ctx_sems[ctx_name] + sem.acquire() + logging.info('Dropping context: {}'.format(ctx_name)) + self.ctx_names.remove(ctx_name) + self.ctx_data.pop(ctx_name) + self.extractor.drop_ctx(ctx_name) + self.grammar_files.pop(ctx_name) + self.grammar_dict.pop(ctx_name) + self.decoders.pop(ctx_name).close(force) + self.ctx_sems.pop(ctx_name) + if not force: + sem.release() + + def grammar(self, sentence, ctx_name=None): + '''Extract a sentence-level grammar on demand (or return cached)''' + self.lazy_ctx(ctx_name) + sem = self.ctx_sems[ctx_name] + sem.acquire() + grammar_dict = self.grammar_dict[ctx_name] + grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: - logging.info('Grammar cache hit') + logging.info('Grammar cache hit: {}'.format(grammar_file)) + sem.release() return grammar_file # Extract and cache - (fid, grammar_file) = tempfile.mkstemp(dir=self.tmp, prefix='grammar.') + (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - for rule in self.extractor.grammar(sentence): + for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - if len(self.grammar_files) == self.cache_size: - rm_sent = self.grammar_files.popleft() + grammar_files = self.grammar_files[ctx_name] + if len(grammar_files) == self.cache_size: + rm_sent = grammar_files.popleft() # If not already removed by learn method - if rm_sent in self.grammar_dict: - rm_grammar = self.grammar_dict.pop(rm_sent) + if rm_sent in grammar_dict: + rm_grammar = grammar_dict.pop(rm_sent) os.remove(rm_grammar) - self.grammar_files.append(sentence) - self.grammar_dict[sentence] = grammar_file + grammar_files.append(sentence) + grammar_dict[sentence] = grammar_file + sem.release() return grammar_file - def decode(self, sentence): + def decode(self, sentence, ctx_name=None): + '''Decode a sentence (inc extracting a grammar if needed)''' + self.lazy_ctx(ctx_name) # Empty in, empty out if sentence.strip() == '': return '' if self.norm: sentence = self.tokenize(sentence) logging.info('Normalized input: {}'.format(sentence)) - grammar_file = self.grammar(sentence) + # grammar method is threadsafe + grammar_file = self.grammar(sentence, ctx_name) + decoder = self.decoders[ctx_name] + sem = self.ctx_sems[ctx_name] + sem.acquire() start_time = time.time() - hyp = self.decoder.decode(sentence, grammar_file) + hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() logging.info('Translation time: {} seconds'.format(stop_time - start_time)) # Empty reference: HPYPLM does not learn prior to next translation - self.ref_fifo.write('\n') - self.ref_fifo.flush() + decoder.ref_fifo.write('\n') + decoder.ref_fifo.flush() + sem.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) return hyp def tokenize(self, line): + self.tokenizer_sem.acquire() self.tokenizer.stdin.write('{}\n'.format(line)) - return self.tokenizer.stdout.readline().strip() + tok_line = self.tokenizer.stdout.readline().strip() + self.tokenizer_sem.release() + return tok_line def detokenize(self, line): + self.detokenizer_sem.acquire() self.detokenizer.stdin.write('{}\n'.format(line)) - return self.detokenizer.stdout.readline().strip() + detok_line = self.detokenizer.stdout.readline().strip() + self.detokenizer_sem.release() + return detok_line - def command_line(self, line): + # TODO + def command_line(self, line, ctx_name=None): args = [f.strip() for f in line.split('|||')] try: if len(args) == 2 and not args[1]: - self.commands[args[0]]() + self.commands[args[0]](ctx_name) else: - self.commands[args[0]](*args[1:]) + self.commands[args[0]](*args[1:], ctx_name=ctx_name) except: logging.info('Command error: {}'.format(' ||| '.join(args))) - def learn(self, source, target): + def learn(self, source, target, ctx_name=None): + self.lazy_ctx(ctx_name) if '' in (source.strip(), target.strip()): logging.info('Error empty source or target: {} ||| {}'.format(source, target)) return if self.norm: source = self.tokenize(source) target = self.tokenize(target) + # Align instance (threadsafe) + alignment = self.aligner.align(source, target) + # grammar method is threadsafe + grammar_file = self.grammar(source, ctx_name) + sem = self.ctx_sems[ctx_name] + sem.acquire() # MIRA update before adding data to grammar extractor - grammar_file = self.grammar(source) - mira_log = self.decoder.update(source, grammar_file, target) + decoder = self.decoders[ctx_name] + mira_log = decoder.decoder.update(source, grammar_file, target) logging.info('MIRA: {}'.format(mira_log)) - # Align instance - alignment = self.aligner.align(source, target) + # Add to HPYPLM by writing to fifo (read on next translation) + logging.info('Adding to HPYPLM: {}'.format(target)) + decoder.ref_fifo.write('{}\n'.format(target)) + decoder.ref_fifo.flush() # Store incremental data for save/load - self.inc_data.append((source, target, alignment)) + self.ctx_data[ctx_name].append((source, target, alignment)) # Add aligned sentence pair to grammar extractor logging.info('Adding to bitext: {} ||| {} ||| {}'.format(source, target, alignment)) - self.extractor.add_instance(source, target, alignment) + self.extractor.add_instance(source, target, alignment, ctx_name) # Clear (old) cached grammar - rm_grammar = self.grammar_dict.pop(source) + rm_grammar = self.grammar_dict[ctx_name].pop(source) os.remove(rm_grammar) - # Add to HPYPLM by writing to fifo (read on next translation) - logging.info('Adding to HPYPLM: {}'.format(target)) - self.ref_fifo.write('{}\n'.format(target)) - self.ref_fifo.flush() + sem.release() - def save_state(self, filename=None): + def save_state(self, filename=None, ctx_name=None): + self.lazy_ctx(ctx_name) out = open(filename, 'w') if filename else sys.stdout - logging.info('Saving state with {} sentences'.format(len(self.inc_data))) - out.write('{}\n'.format(self.decoder.get_weights())) - for (source, target, alignment) in self.inc_data: + sem = self.ctx_sems[ctx_name] + sem.acquire() + ctx_data = self.ctx_data[ctx_name] + logging.info('Saving state with {} sentences'.format(len(self.ctx_data))) + out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) + for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) + sem.release() out.write('EOF\n') if filename: out.close() - def load_state(self, input=sys.stdin): - # Non-initial load error - if self.inc_data: + def load_state(self, input=sys.stdin, ctx_name=None): + self.lazy_ctx(ctx_name) + sem = self.ctx_sems[ctx_name] + sem.acquire() + ctx_data = self.ctx_data[ctx_name] + decoder = self.decoders[ctx_name] + # Non-initial load error + if ctx_data: logging.info('Error: Incremental data has already been added to decoder.') logging.info(' State can only be loaded by a freshly started decoder.') return # MIRA weights line = input.readline().strip() - self.decoder.set_weights(line) + decoder.decoder.set_weights(line) logging.info('Loading state...') start_time = time.time() # Lines source ||| target ||| alignment @@ -207,12 +316,13 @@ class RealtimeDecoder: if line == 'EOF': break (source, target, alignment) = line.split(' ||| ') - self.inc_data.append((source, target, alignment)) + ctx_data.append((source, target, alignment)) # Extractor - self.extractor.add_instance(source, target, alignment) + self.extractor.add_instance(source, target, alignment, ctx_name) # HPYPLM - hyp = self.decoder.decode(LIKELY_OOV) + hyp = decoder.decoder.decode(LIKELY_OOV) self.ref_fifo.write('{}\n'.format(target)) self.ref_fifo.flush() stop_time = time.time() - logging.info('Loaded state with {} sentences in {} seconds'.format(len(self.inc_data), stop_time - start_time)) + logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time)) + sem.release() -- cgit v1.2.3 From 49ddc45e717ac495c6adf80a20d02a1672a069df Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Wed, 25 Sep 2013 18:45:28 -0700 Subject: Threading tests --- realtime/realtime.py | 79 +++++++++++++++++++++++++++++++++++----------------- realtime/rt/rt.py | 4 +++ 2 files changed, 58 insertions(+), 25 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/realtime.py b/realtime/realtime.py index 282d3311..bbec288b 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -2,9 +2,9 @@ import argparse import logging -import signal import sys import threading +import time import rt @@ -15,6 +15,41 @@ class Parser(argparse.ArgumentParser): sys.stderr.write('\n{}\n'.format(message)) sys.exit(2) +def handle_line(translator, line, output, ctx_name): + if '|||' in line: + translator.command_line(line, ctx_name) + else: + hyp = translator.decode(line, ctx_name) + output.write('{}\n'.format(hyp)) + output.flush() + +def test1(translator, input, output, ctx_name): + inp = open(input) + out = open(output, 'w') + for line in inp: + handle_line(translator, line.strip(), out, ctx_name) + out.close() + +def debug(translator, input): + # Test 1: identical output + threads = [] + for i in range(4): + t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i))) + threads.append(t) + t.start() + time.sleep(30) + for t in threads: + t.join() + # Test 2: flood (same number of lines) + threads = [] + out = open('{}.out.flood'.format(input), 'w') + for line in open(input): + t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None)) + threads.append(t) + t.start() + for t in threads: + t.join() + def main(): parser = Parser(description='Real-time adaptive translation with cdec. (See README.md)') @@ -24,7 +59,7 @@ def main(): parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') parser.add_argument('-v', '--verbose', help='Info to stderr', action='store_true') - parser.add_argument('-D', '--debug-test', help='Test thread safety (debug use only)', action='store_true') + parser.add_argument('-D', '--debug-test', help='Run debug tests on input file') args = parser.parse_args() if args.verbose: @@ -32,29 +67,23 @@ def main(): with rt.RealtimeTranslator(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as translator: - # Load state if given - if args.state: - with open(args.state) as input: - rtd.load_state(input) - if not args.debug_test: - run(translator) + # Debugging + if args.debug_test: + debug(translator, args.debug_test) + return + + # Read lines and commands + while True: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if '|||' in line: + translator.command_line(line) else: - # TODO: write test - run(translator) - -def run(translator, input=sys.stdin, output=sys.stdout, ctx_name=None): - # Read lines and commands - while True: - line = input.readline() - if not line: - break - line = line.strip() - if '|||' in line: - translator.command_line(line, ctx_name) - else: - hyp = translator.decode(line, ctx_name) - output.write('{}\n'.format(hyp)) - output.flush() - + hyp = translator.decode(line) + sys.stdout.write('{}\n'.format(hyp)) + sys.stdout.flush() + if __name__ == '__main__': main() diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 6f1fb70f..f8126283 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -101,6 +101,8 @@ class RealtimeTranslator: # ctx -> list of (source, target, alignment) self.ctx_data = {} + # Grammar extractor is not threadsafe + self.extractor_sem = threading.Semaphore() # ctx -> deque of file self.grammar_files = {} # ctx -> dict of {sentence: file} @@ -181,8 +183,10 @@ class RealtimeTranslator: (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: + self.extractor_sem.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) + self.extractor_sem.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() -- cgit v1.2.3 From cb718c763e07b8e1417383ef7ae5c1aca36d2a0a Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Thu, 26 Sep 2013 14:28:42 -0700 Subject: FIFO Locks --- realtime/rt/aligner.py | 6 ++--- realtime/rt/decoder.py | 24 +++++++++--------- realtime/rt/rt.py | 66 +++++++++++++++++++++++++------------------------- realtime/rt/util.py | 19 +++++++++++++++ 4 files changed, 67 insertions(+), 48 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index a14121db..62ce32b8 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -31,7 +31,7 @@ class ForceAligner: self.tools = util.popen_io(tools_cmd) # Used to guarantee thread safety - self.semaphore = threading.Semaphore() + self.lock = util.FIFOLock() def align(self, source, target): '''Threadsafe''' @@ -39,7 +39,7 @@ class ForceAligner: def align_formatted(self, line): '''Threadsafe''' - self.semaphore.acquire() + self.lock.acquire() self.fwd_align.stdin.write('{}\n'.format(line)) self.rev_align.stdin.write('{}\n'.format(line)) # f words ||| e words ||| links ||| score @@ -48,7 +48,7 @@ class ForceAligner: self.tools.stdin.write('{}\n'.format(fwd_line)) self.tools.stdin.write('{}\n'.format(rev_line)) al_line = self.tools.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return al_line def close(self): diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 72b5b959..7c36b441 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -9,18 +9,18 @@ class Decoder: def close(self, force=False): if not force: - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.close() if not force: - self.semaphore.release() + self.lock.release() def decode(self, sentence, grammar=None): '''Threadsafe''' input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write(input) hyp = self.decoder.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return hyp class CdecDecoder(Decoder): @@ -31,7 +31,7 @@ class CdecDecoder(Decoder): decoder_cmd = [decoder, '-c', config, '-w', weights] logging.info('Executing: {}'.format(' '.join(decoder_cmd))) self.decoder = util.popen_io(decoder_cmd) - self.semaphore = threading.Semaphore() + self.lock = util.FIFOLock() class MIRADecoder(Decoder): @@ -42,27 +42,27 @@ class MIRADecoder(Decoder): mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t'] logging.info('Executing: {}'.format(' '.join(mira_cmd))) self.decoder = util.popen_io(mira_cmd) - self.semaphore = threading.Semaphore() + self.lock = util.FIFOLock() def get_weights(self): '''Threadsafe''' - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| WRITE\n') weights = self.decoder.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return weights def set_weights(self, w_line): '''Threadsafe''' - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) - self.semaphore.release() + self.lock.release() def update(self, sentence, grammar, reference): '''Threadsafe''' input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write(input) log = self.decoder.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return log diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index f8126283..1e78e188 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -74,9 +74,9 @@ class RealtimeTranslator: self.norm = norm if self.norm: self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u']) - self.tokenizer_sem = threading.Semaphore() + self.tokenizer_lock = util.FIFOLock() self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')]) - self.detokenizer_sem = threading.Semaphore() + self.detokenizer_lock = util.FIFOLock() # Word aligner fwd_params = os.path.join(configdir, 'a.fwd_params') @@ -97,12 +97,12 @@ class RealtimeTranslator: self.ctx_names = set() # All context-dependent operations are atomic - self.ctx_sems = collections.defaultdict(threading.Semaphore) + self.ctx_locks = collections.defaultdict(util.FIFOLock) # ctx -> list of (source, target, alignment) self.ctx_data = {} # Grammar extractor is not threadsafe - self.extractor_sem = threading.Semaphore() + self.extractor_lock = util.FIFOLock() # ctx -> deque of file self.grammar_files = {} # ctx -> dict of {sentence: file} @@ -138,9 +138,9 @@ class RealtimeTranslator: def lazy_ctx(self, ctx_name): '''Initialize a context (inc starting a new decoder) if needed''' - self.ctx_sems[ctx_name].acquire() + self.ctx_locks[ctx_name].acquire() if ctx_name in self.ctx_names: - self.ctx_sems[ctx_name].release() + self.ctx_locks[ctx_name].release() return logging.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) @@ -149,13 +149,13 @@ class RealtimeTranslator: self.grammar_dict[ctx_name] = {} tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) - self.ctx_sems[ctx_name].release() + self.ctx_locks[ctx_name].release() def drop_ctx(self, ctx_name, force=False): '''Delete a context (inc stopping the decoder)''' if not force: - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) self.ctx_data.pop(ctx_name) @@ -163,30 +163,30 @@ class RealtimeTranslator: self.grammar_files.pop(ctx_name) self.grammar_dict.pop(ctx_name) self.decoders.pop(ctx_name).close(force) - self.ctx_sems.pop(ctx_name) + self.ctx_locks.pop(ctx_name) if not force: - sem.release() + lock.release() def grammar(self, sentence, ctx_name=None): '''Extract a sentence-level grammar on demand (or return cached)''' self.lazy_ctx(ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() grammar_dict = self.grammar_dict[ctx_name] grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: logging.info('Grammar cache hit: {}'.format(grammar_file)) - sem.release() + lock.release() return grammar_file # Extract and cache (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - self.extractor_sem.acquire() + self.extractor_lock.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - self.extractor_sem.release() + self.extractor_lock.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() @@ -196,7 +196,7 @@ class RealtimeTranslator: os.remove(rm_grammar) grammar_files.append(sentence) grammar_dict[sentence] = grammar_file - sem.release() + lock.release() return grammar_file def decode(self, sentence, ctx_name=None): @@ -211,8 +211,8 @@ class RealtimeTranslator: # grammar method is threadsafe grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() @@ -220,24 +220,24 @@ class RealtimeTranslator: # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() - sem.release() + lock.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) return hyp def tokenize(self, line): - self.tokenizer_sem.acquire() + self.tokenizer_lock.acquire() self.tokenizer.stdin.write('{}\n'.format(line)) tok_line = self.tokenizer.stdout.readline().strip() - self.tokenizer_sem.release() + self.tokenizer_lock.release() return tok_line def detokenize(self, line): - self.detokenizer_sem.acquire() + self.detokenizer_lock.acquire() self.detokenizer.stdin.write('{}\n'.format(line)) detok_line = self.detokenizer.stdout.readline().strip() - self.detokenizer_sem.release() + self.detokenizer_lock.release() return detok_line # TODO @@ -263,8 +263,8 @@ class RealtimeTranslator: alignment = self.aligner.align(source, target) # grammar method is threadsafe grammar_file = self.grammar(source, ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) @@ -281,27 +281,27 @@ class RealtimeTranslator: # Clear (old) cached grammar rm_grammar = self.grammar_dict[ctx_name].pop(source) os.remove(rm_grammar) - sem.release() + lock.release() def save_state(self, filename=None, ctx_name=None): self.lazy_ctx(ctx_name) out = open(filename, 'w') if filename else sys.stdout - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() ctx_data = self.ctx_data[ctx_name] logging.info('Saving state with {} sentences'.format(len(self.ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) - sem.release() + lock.release() out.write('EOF\n') if filename: out.close() def load_state(self, input=sys.stdin, ctx_name=None): self.lazy_ctx(ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() ctx_data = self.ctx_data[ctx_name] decoder = self.decoders[ctx_name] # Non-initial load error @@ -329,4 +329,4 @@ class RealtimeTranslator: self.ref_fifo.flush() stop_time = time.time() logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time)) - sem.release() + lock.release() diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 6e07f116..05dcae96 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -1,4 +1,5 @@ import os +import Queue import subprocess import sys import threading @@ -13,6 +14,24 @@ SA_INI_FILES = set(( 'precompute_file', )) +class FIFOLock: + + def __init__(self): + self.q = Queue.Queue() + self.i = 0 + + def acquire(self): + self.i += 1 + if self.i > 1: + event = threading.Event() + self.q.put(event) + event.wait() + + def release(self): + self.i -= 1 + if self.i > 0: + self.q.get().set() + def cdec_ini_for_config(config): # This is a list of (k, v), not a ConfigObj or dict for i in range(len(config)): -- cgit v1.2.3 From b8116c5c3c7e31a276ff38fc8173eab37f292364 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Fri, 27 Sep 2013 13:39:24 -0700 Subject: Decoding and learning with multiple contexts is threadsafe and FIFO. --- realtime/realtime.py | 17 ++++++++----- realtime/rt/aligner.py | 10 +++++--- realtime/rt/decoder.py | 8 +++---- realtime/rt/rt.py | 65 ++++++++++++++++++++++++++++---------------------- realtime/rt/util.py | 8 +++++++ 5 files changed, 67 insertions(+), 41 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/realtime.py b/realtime/realtime.py index bbec288b..38da4413 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -31,22 +31,27 @@ def test1(translator, input, output, ctx_name): out.close() def debug(translator, input): - # Test 1: identical output + # Test 1: multiple contexts threads = [] for i in range(4): t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i))) threads.append(t) t.start() time.sleep(30) - for t in threads: - t.join() - # Test 2: flood (same number of lines) - threads = [] + # Test 2: flood out = open('{}.out.flood'.format(input), 'w') - for line in open(input): + inp = open(input) + while True: + line = inp.readline() + if not line: + break + line = line.strip() t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None)) threads.append(t) t.start() + time.sleep(1) + translator.drop_ctx(None) + # Join test threads for t in threads: t.join() diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index 62ce32b8..def3fcb5 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -34,11 +34,11 @@ class ForceAligner: self.lock = util.FIFOLock() def align(self, source, target): - '''Threadsafe''' + '''Threadsafe, FIFO''' return self.align_formatted('{} ||| {}'.format(source, target)) def align_formatted(self, line): - '''Threadsafe''' + '''Threadsafe, FIFO''' self.lock.acquire() self.fwd_align.stdin.write('{}\n'.format(line)) self.rev_align.stdin.write('{}\n'.format(line)) @@ -51,10 +51,14 @@ class ForceAligner: self.lock.release() return al_line - def close(self): + def close(self, force=False): + if not force: + self.lock.acquire() self.fwd_align.stdin.close() self.rev_align.stdin.close() self.tools.stdin.close() + if not force: + self.lock.release() def read_err(self, err): (T, m) = ('', '') diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 7c36b441..da646f68 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -15,7 +15,7 @@ class Decoder: self.lock.release() def decode(self, sentence, grammar=None): - '''Threadsafe''' + '''Threadsafe, FIFO''' input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) self.lock.acquire() self.decoder.stdin.write(input) @@ -45,7 +45,7 @@ class MIRADecoder(Decoder): self.lock = util.FIFOLock() def get_weights(self): - '''Threadsafe''' + '''Threadsafe, FIFO''' self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| WRITE\n') weights = self.decoder.stdout.readline().strip() @@ -53,13 +53,13 @@ class MIRADecoder(Decoder): return weights def set_weights(self, w_line): - '''Threadsafe''' + '''Threadsafe, FIFO''' self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) self.lock.release() def update(self, sentence, grammar, reference): - '''Threadsafe''' + '''Threadsafe, FIFO''' input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.lock.acquire() self.decoder.stdin.write(input) diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 1e78e188..5ace5d59 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -129,18 +129,23 @@ class RealtimeTranslator: for ctx_name in list(self.ctx_names): self.drop_ctx(ctx_name, force) logging.info('Closing processes') - self.aligner.close() + self.aligner.close(force) if self.norm: + if not force: + self.tokenizer_lock.acquire() + self.detokenizer_lock.acquire() self.tokenizer.stdin.close() self.detokenizer.stdin.close() + if not force: + self.tokenizer_lock.release() + self.detokenizer_lock.release() logging.info('Deleting {}'.format(self.tmp)) shutil.rmtree(self.tmp) def lazy_ctx(self, ctx_name): - '''Initialize a context (inc starting a new decoder) if needed''' - self.ctx_locks[ctx_name].acquire() + '''Initialize a context (inc starting a new decoder) if needed. + NOT threadsafe, acquire ctx_name lock before calling.''' if ctx_name in self.ctx_names: - self.ctx_locks[ctx_name].release() return logging.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) @@ -149,12 +154,12 @@ class RealtimeTranslator: self.grammar_dict[ctx_name] = {} tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) - self.ctx_locks[ctx_name].release() - def drop_ctx(self, ctx_name, force=False): - '''Delete a context (inc stopping the decoder)''' + def drop_ctx(self, ctx_name=None, force=False): + '''Delete a context (inc stopping the decoder) + Threadsafe and FIFO unless forced.''' + lock = self.ctx_locks[ctx_name] if not force: - lock = self.ctx_locks[ctx_name] lock.acquire() logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) @@ -168,25 +173,24 @@ class RealtimeTranslator: lock.release() def grammar(self, sentence, ctx_name=None): - '''Extract a sentence-level grammar on demand (or return cached)''' + '''Extract a sentence-level grammar on demand (or return cached) + Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock + before calling.''' + self.extractor_lock.acquire() self.lazy_ctx(ctx_name) - lock = self.ctx_locks[ctx_name] - lock.acquire() grammar_dict = self.grammar_dict[ctx_name] grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: logging.info('Grammar cache hit: {}'.format(grammar_file)) - lock.release() + self.extractor_lock.release() return grammar_file # Extract and cache (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - self.extractor_lock.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - self.extractor_lock.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() @@ -196,23 +200,25 @@ class RealtimeTranslator: os.remove(rm_grammar) grammar_files.append(sentence) grammar_dict[sentence] = grammar_file - lock.release() + self.extractor_lock.release() return grammar_file def decode(self, sentence, ctx_name=None): - '''Decode a sentence (inc extracting a grammar if needed)''' + '''Decode a sentence (inc extracting a grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() self.lazy_ctx(ctx_name) + logging.info('DECODE: {}'.format(sentence)) # Empty in, empty out if sentence.strip() == '': + lock.release() return '' if self.norm: sentence = self.tokenize(sentence) logging.info('Normalized input: {}'.format(sentence)) - # grammar method is threadsafe grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] - lock = self.ctx_locks[ctx_name] - lock.acquire() start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() @@ -220,10 +226,10 @@ class RealtimeTranslator: # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() - lock.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) + lock.release() return hyp def tokenize(self, line): @@ -242,29 +248,32 @@ class RealtimeTranslator: # TODO def command_line(self, line, ctx_name=None): - args = [f.strip() for f in line.split('|||')] - try: + args = [f.strip() for f in line.split('|||')] + #try: if len(args) == 2 and not args[1]: self.commands[args[0]](ctx_name) else: self.commands[args[0]](*args[1:], ctx_name=ctx_name) - except: - logging.info('Command error: {}'.format(' ||| '.join(args))) + #except: + # logging.info('Command error: {}'.format(' ||| '.join(args))) def learn(self, source, target, ctx_name=None): + '''Learn from training instance (inc extracting grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() self.lazy_ctx(ctx_name) + logging.info('LEARN: {}'.format(source)) if '' in (source.strip(), target.strip()): logging.info('Error empty source or target: {} ||| {}'.format(source, target)) + lock.release() return if self.norm: source = self.tokenize(source) target = self.tokenize(target) - # Align instance (threadsafe) + # Align instance alignment = self.aligner.align(source, target) - # grammar method is threadsafe grammar_file = self.grammar(source, ctx_name) - lock = self.ctx_locks[ctx_name] - lock.acquire() # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 05dcae96..52767dac 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -15,22 +15,30 @@ SA_INI_FILES = set(( )) class FIFOLock: + '''Lock that preserves FIFO order of blocking threads''' def __init__(self): self.q = Queue.Queue() self.i = 0 + self.lock = threading.Lock() def acquire(self): + self.lock.acquire() self.i += 1 if self.i > 1: event = threading.Event() self.q.put(event) + self.lock.release() event.wait() + return + self.lock.release() def release(self): + self.lock.acquire() self.i -= 1 if self.i > 0: self.q.get().set() + self.lock.release() def cdec_ini_for_config(config): # This is a list of (k, v), not a ConfigObj or dict -- cgit v1.2.3 From e97bb01cc3a0790dc8dc474db2fd204cbd3fe579 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Mon, 30 Sep 2013 07:45:06 -0700 Subject: Command handling --- realtime/mkinput.py | 2 +- realtime/realtime.py | 16 ++++++---------- realtime/rt/rt.py | 37 ++++++++++++++++++++++--------------- 3 files changed, 29 insertions(+), 26 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/mkinput.py b/realtime/mkinput.py index 897b44fd..df434c76 100755 --- a/realtime/mkinput.py +++ b/realtime/mkinput.py @@ -10,7 +10,7 @@ def main(): sys.exit(2) for (src, ref) in itertools.izip(open(sys.argv[1]), open(sys.argv[2])): - sys.stdout.write('{}'.format(src)) + sys.stdout.write('TR ||| {}'.format(src)) sys.stdout.write('LEARN ||| {} ||| {}'.format(src.strip(), ref)) if __name__ == '__main__': diff --git a/realtime/realtime.py b/realtime/realtime.py index 38da4413..af3a3aba 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -16,11 +16,9 @@ class Parser(argparse.ArgumentParser): sys.exit(2) def handle_line(translator, line, output, ctx_name): - if '|||' in line: - translator.command_line(line, ctx_name) - else: - hyp = translator.decode(line, ctx_name) - output.write('{}\n'.format(hyp)) + res = translator.command_line(line, ctx_name) + if res: + output.write('{}\n'.format(res)) output.flush() def test1(translator, input, output, ctx_name): @@ -83,11 +81,9 @@ def main(): if not line: break line = line.strip() - if '|||' in line: - translator.command_line(line) - else: - hyp = translator.decode(line) - sys.stdout.write('{}\n'.format(hyp)) + res = translator.command_line(line) + if res: + sys.stdout.write('{}\n'.format(res)) sys.stdout.flush() if __name__ == '__main__': diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 5ace5d59..4a31070f 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -58,9 +58,14 @@ class RealtimeTranslator: def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False, state=None): - # TODO: save/load - self.commands = {'LEARN': self.learn, 'SAVE': self.save_state, 'LOAD': self.load_state} - + # name -> (method, set of possible nargs) + self.COMMANDS = { + 'TR': (self.translate, set((1,))), + 'LEARN': (self.learn, set((2,))), + 'SAVE': (self.save_state, set((0, 1))), + 'LOAD': (self.load_state, set((0, 1))), + } + cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ### Single instance for all contexts @@ -203,13 +208,12 @@ class RealtimeTranslator: self.extractor_lock.release() return grammar_file - def decode(self, sentence, ctx_name=None): + def translate(self, sentence, ctx_name=None): '''Decode a sentence (inc extracting a grammar if needed) Threadsafe, FIFO''' lock = self.ctx_locks[ctx_name] lock.acquire() self.lazy_ctx(ctx_name) - logging.info('DECODE: {}'.format(sentence)) # Empty in, empty out if sentence.strip() == '': lock.release() @@ -246,16 +250,20 @@ class RealtimeTranslator: self.detokenizer_lock.release() return detok_line - # TODO def command_line(self, line, ctx_name=None): - args = [f.strip() for f in line.split('|||')] - #try: - if len(args) == 2 and not args[1]: - self.commands[args[0]](ctx_name) - else: - self.commands[args[0]](*args[1:], ctx_name=ctx_name) - #except: - # logging.info('Command error: {}'.format(' ||| '.join(args))) + args = [f.strip() for f in line.split('|||')] + (command, nargs) = self.COMMANDS[args[0]] + # ctx_name provided + if len(args[1:]) + 1 in nargs: + logging.info('Context {}: {} ||| {}'.format(args[1], args[0], ' ||| '.join(args[2:]))) + return command(*args[2:], ctx_name=args[1]) + # No ctx_name, use default or passed + elif len(args[1:]) in nargs: + logging.info('Context {}: {} ||| {}'.format(ctx_name, args[0], ' ||| '.join(args[1:]))) + return command(*args[1:], ctx_name=ctx_name) + # nargs doesn't match + else: + logging.info('Command error: {}'.format(' ||| '.join(args))) def learn(self, source, target, ctx_name=None): '''Learn from training instance (inc extracting grammar if needed) @@ -263,7 +271,6 @@ class RealtimeTranslator: lock = self.ctx_locks[ctx_name] lock.acquire() self.lazy_ctx(ctx_name) - logging.info('LEARN: {}'.format(source)) if '' in (source.strip(), target.strip()): logging.info('Error empty source or target: {} ||| {}'.format(source, target)) lock.release() -- cgit v1.2.3 From bf3b6d31e7402252188ecb814a8fcdfd755c10ac Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Mon, 30 Sep 2013 16:00:12 -0700 Subject: New commands, save/load context --- realtime/mkinput.py | 9 ++-- realtime/rt/decoder.py | 12 ++++- realtime/rt/rt.py | 121 +++++++++++++++++++++++++++++++------------------ 3 files changed, 91 insertions(+), 51 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/mkinput.py b/realtime/mkinput.py index df434c76..a1b1256d 100755 --- a/realtime/mkinput.py +++ b/realtime/mkinput.py @@ -5,13 +5,14 @@ import sys def main(): - if len(sys.argv[1:]) != 2: - sys.stderr.write('usage: {} test.src test.ref >test.input\n'.format(sys.argv[0])) + if len(sys.argv[1:]) < 2: + sys.stderr.write('usage: {} test.src test.ref [ctx_name] >test.input\n'.format(sys.argv[0])) sys.exit(2) + ctx_name = ' {}'.format(sys.argv[3]) if len(sys.argv[1:]) > 2 else '' for (src, ref) in itertools.izip(open(sys.argv[1]), open(sys.argv[2])): - sys.stdout.write('TR ||| {}'.format(src)) - sys.stdout.write('LEARN ||| {} ||| {}'.format(src.strip(), ref)) + sys.stdout.write('TR{} ||| {}'.format(ctx_name, src)) + sys.stdout.write('LEARN{} ||| {} ||| {}'.format(ctx_name, src.strip(), ref)) if __name__ == '__main__': main() diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index da646f68..15f7db3f 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -55,8 +55,16 @@ class MIRADecoder(Decoder): def set_weights(self, w_line): '''Threadsafe, FIFO''' self.lock.acquire() - self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) - self.lock.release() + try: + # Check validity + for w_str in w_line.split(): + (k, v) = w_str.split('=') + float(v) + self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) + self.lock.release() + except: + raise Exception('Invalid weights line: {}'.format(w_line)) + def update(self, sentence, grammar, reference): '''Threadsafe, FIFO''' diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 4a31070f..40305f66 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -64,6 +64,8 @@ class RealtimeTranslator: 'LEARN': (self.learn, set((2,))), 'SAVE': (self.save_state, set((0, 1))), 'LOAD': (self.load_state, set((0, 1))), + 'DROP': (self.drop_ctx, set((0,))), + 'LIST': (self.list_ctx, set((0,))), } cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -115,7 +117,6 @@ class RealtimeTranslator: self.decoders = {} - # TODO: state # Load state if given if state: with open(state) as input: @@ -125,7 +126,8 @@ class RealtimeTranslator: return self def __exit__(self, ex_type, ex_value, ex_traceback): - self.close(ex_type is KeyboardInterrupt) + # Force shutdown on exception + self.close(ex_type is not None) def close(self, force=False): '''Cleanup''' @@ -166,6 +168,11 @@ class RealtimeTranslator: lock = self.ctx_locks[ctx_name] if not force: lock.acquire() + if ctx_name not in self.ctx_names: + logging.info('No context found, no action: {}'.format(ctx_name)) + if not force: + lock.release() + return logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) self.ctx_data.pop(ctx_name) @@ -176,7 +183,11 @@ class RealtimeTranslator: self.ctx_locks.pop(ctx_name) if not force: lock.release() - + + def list_ctx(self, ctx_name=None): + '''Return a string of active contexts''' + return 'ctx_name ||| {}'.format(' '.join(sorted(str(ctx_name) for ctx_name in self.ctx_names))) + def grammar(self, sentence, ctx_name=None): '''Extract a sentence-level grammar on demand (or return cached) Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock @@ -251,19 +262,23 @@ class RealtimeTranslator: return detok_line def command_line(self, line, ctx_name=None): + # COMMAND [ctx_name] ||| arg1 [||| arg2 ...] args = [f.strip() for f in line.split('|||')] - (command, nargs) = self.COMMANDS[args[0]] - # ctx_name provided - if len(args[1:]) + 1 in nargs: - logging.info('Context {}: {} ||| {}'.format(args[1], args[0], ' ||| '.join(args[2:]))) - return command(*args[2:], ctx_name=args[1]) - # No ctx_name, use default or passed - elif len(args[1:]) in nargs: - logging.info('Context {}: {} ||| {}'.format(ctx_name, args[0], ' ||| '.join(args[1:]))) - return command(*args[1:], ctx_name=ctx_name) - # nargs doesn't match - else: - logging.info('Command error: {}'.format(' ||| '.join(args))) + if args[-1] == '': + args = args[:-1] + if len(args) > 0: + cmd_name = args[0].split() + # ctx_name provided + if len(cmd_name) == 2: + (cmd_name, ctx_name) = cmd_name + # ctx_name default/passed + else: + cmd_name = cmd_name[0] + (command, nargs) = self.COMMANDS.get(cmd_name, (None, None)) + if command and len(args[1:]) in nargs: + logging.info('{} ({}) ||| {}'.format(cmd_name, ctx_name, ' ||| '.join(args[1:]))) + return command(*args[1:], ctx_name=ctx_name) + logging.info('ERROR: command: {}'.format(' ||| '.join(args))) def learn(self, source, target, ctx_name=None): '''Learn from training instance (inc extracting grammar if needed) @@ -272,7 +287,7 @@ class RealtimeTranslator: lock.acquire() self.lazy_ctx(ctx_name) if '' in (source.strip(), target.strip()): - logging.info('Error empty source or target: {} ||| {}'.format(source, target)) + logging.info('ERROR: empty source or target: {} ||| {}'.format(source, target)) lock.release() return if self.norm: @@ -300,49 +315,65 @@ class RealtimeTranslator: lock.release() def save_state(self, filename=None, ctx_name=None): - self.lazy_ctx(ctx_name) - out = open(filename, 'w') if filename else sys.stdout lock = self.ctx_locks[ctx_name] lock.acquire() + self.lazy_ctx(ctx_name) ctx_data = self.ctx_data[ctx_name] - logging.info('Saving state with {} sentences'.format(len(self.ctx_data))) + out = open(filename, 'w') if filename else sys.stdout + logging.info('Saving state for context ({}) with {} sentences'.format(ctx_name, len(ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) - lock.release() out.write('EOF\n') if filename: out.close() + lock.release() - def load_state(self, input=sys.stdin, ctx_name=None): - self.lazy_ctx(ctx_name) + def load_state(self, filename=None, ctx_name=None): lock = self.ctx_locks[ctx_name] lock.acquire() + self.lazy_ctx(ctx_name) ctx_data = self.ctx_data[ctx_name] decoder = self.decoders[ctx_name] - # Non-initial load error + input = open(filename) if filename else sys.stdin + # Non-initial load error if ctx_data: - logging.info('Error: Incremental data has already been added to decoder.') - logging.info(' State can only be loaded by a freshly started decoder.') + logging.info('ERROR: Incremental data has already been added to context ({})'.format(ctx_name)) + logging.info(' State can only be loaded to a new context.') + lock.release() return - # MIRA weights - line = input.readline().strip() - decoder.decoder.set_weights(line) - logging.info('Loading state...') - start_time = time.time() - # Lines source ||| target ||| alignment - while True: + # Many things can go wrong if bad state data is given + try: + # MIRA weights line = input.readline().strip() - if line == 'EOF': - break - (source, target, alignment) = line.split(' ||| ') - ctx_data.append((source, target, alignment)) - # Extractor - self.extractor.add_instance(source, target, alignment, ctx_name) - # HPYPLM - hyp = decoder.decoder.decode(LIKELY_OOV) - self.ref_fifo.write('{}\n'.format(target)) - self.ref_fifo.flush() - stop_time = time.time() - logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time)) - lock.release() + # Throws exception if bad line + decoder.decoder.set_weights(line) + logging.info('Loading state...') + start_time = time.time() + # Lines source ||| target ||| alignment + while True: + line = input.readline() + if not line: + raise Exception('End of file before EOF line') + line = line.strip() + if line == 'EOF': + break + (source, target, alignment) = line.split(' ||| ') + ctx_data.append((source, target, alignment)) + # Extractor + self.extractor.add_instance(source, target, alignment, ctx_name) + # HPYPLM + hyp = decoder.decoder.decode(LIKELY_OOV) + decoder.ref_fifo.write('{}\n'.format(target)) + decoder.ref_fifo.flush() + stop_time = time.time() + logging.info('Loaded state for context ({}) with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time)) + lock.release() + # Recover from bad load attempt by restarting context. + # Guaranteed not to cause data loss since only a new context can load state. + except: + logging.info('ERROR: could not load state, restarting context ({})'.format(ctx_name)) + # ctx_name is already owned and needs to be restarted before other blocking threads use + self.drop_ctx(ctx_name, force=True) + self.lazy_ctx(ctx_name) + lock.release() -- cgit v1.2.3 From 8eb6dc27e27d03eefdf29efad4b6092d56ad3006 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Mon, 30 Sep 2013 23:30:51 -0700 Subject: Release lock on exception --- realtime/rt/decoder.py | 1 + 1 file changed, 1 insertion(+) (limited to 'realtime/rt') diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 15f7db3f..1cee4610 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -63,6 +63,7 @@ class MIRADecoder(Decoder): self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) self.lock.release() except: + self.lock.release() raise Exception('Invalid weights line: {}'.format(w_line)) -- cgit v1.2.3 From 4830491825f0c8740d505c192d60b388f64acf1b Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Mon, 30 Sep 2013 23:34:04 -0700 Subject: Loading state moved to command, specific to context --- realtime/realtime.py | 1 - realtime/rt/rt.py | 7 +------ 2 files changed, 1 insertion(+), 7 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/realtime.py b/realtime/realtime.py index af3a3aba..be02d486 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -57,7 +57,6 @@ def main(): parser = Parser(description='Real-time adaptive translation with cdec. (See README.md)') parser.add_argument('-c', '--config', required=True, help='Config directory') - parser.add_argument('-s', '--state', help='Load state file (saved incremental data)') parser.add_argument('-n', '--normalize', help='Normalize text (tokenize, translate, detokenize)', action='store_true') parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 40305f66..43cc43b4 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -56,7 +56,7 @@ class RealtimeDecoder: class RealtimeTranslator: '''Main entry point into API: serves translations to any number of concurrent users''' - def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False, state=None): + def __init__(self, configdir, tmpdir='/tmp', cache_size=5, norm=False): # name -> (method, set of possible nargs) self.COMMANDS = { @@ -117,11 +117,6 @@ class RealtimeTranslator: self.decoders = {} - # Load state if given - if state: - with open(state) as input: - self.load_state(input) - def __enter__(self): return self -- cgit v1.2.3 From 51a83643d95ab0d7add9dd66b0b38044db10a797 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Tue, 1 Oct 2013 14:08:05 -0700 Subject: Better logging, save/load to default context --- realtime/realtime.py | 5 +++++ realtime/rt/aligner.py | 8 +++++--- realtime/rt/decoder.py | 6 ++++-- realtime/rt/rt.py | 50 ++++++++++++++++++++++++++------------------------ 4 files changed, 40 insertions(+), 29 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/realtime.py b/realtime/realtime.py index be02d486..6ee785f8 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -57,6 +57,7 @@ def main(): parser = Parser(description='Real-time adaptive translation with cdec. (See README.md)') parser.add_argument('-c', '--config', required=True, help='Config directory') + parser.add_argument('-s', '--state', help='Load state file to default context (saved incremental data)') parser.add_argument('-n', '--normalize', help='Normalize text (tokenize, translate, detokenize)', action='store_true') parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') @@ -74,6 +75,10 @@ def main(): debug(translator, args.debug_test) return + # Load state if given + if args.state: + rtd.load_state(state) + # Read lines and commands while True: line = sys.stdin.readline() diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index def3fcb5..bcc1ef87 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -6,6 +6,8 @@ import threading import util +logger = logging.getLogger('rt.aligner') + class ForceAligner: def __init__(self, fwd_params, fwd_err, rev_params, rev_err): @@ -21,13 +23,13 @@ class ForceAligner: rev_cmd = [fast_align, '-i', '-', '-d', '-T', rev_T, '-m', rev_m, '-f', rev_params, '-r'] tools_cmd = [atools, '-i', '-', '-j', '-', '-c', 'grow-diag-final-and'] - logging.info('Executing: {}'.format(' '.join(fwd_cmd))) + logger.info('Executing: {}'.format(' '.join(fwd_cmd))) self.fwd_align = util.popen_io(fwd_cmd) - logging.info('Executing: {}'.format(' '.join(rev_cmd))) + logger.info('Executing: {}'.format(' '.join(rev_cmd))) self.rev_align = util.popen_io(rev_cmd) - logging.info('Executing: {}'.format(' '.join(tools_cmd))) + logger.info('Executing: {}'.format(' '.join(tools_cmd))) self.tools = util.popen_io(tools_cmd) # Used to guarantee thread safety diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 1cee4610..e6e7489d 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -5,6 +5,8 @@ import threading import util +logger = logging.getLogger('rt.decoder') + class Decoder: def close(self, force=False): @@ -29,7 +31,7 @@ class CdecDecoder(Decoder): cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) decoder = os.path.join(cdec_root, 'decoder', 'cdec') decoder_cmd = [decoder, '-c', config, '-w', weights] - logging.info('Executing: {}'.format(' '.join(decoder_cmd))) + logger.info('Executing: {}'.format(' '.join(decoder_cmd))) self.decoder = util.popen_io(decoder_cmd) self.lock = util.FIFOLock() @@ -40,7 +42,7 @@ class MIRADecoder(Decoder): mira = os.path.join(cdec_root, 'training', 'mira', 'kbest_cut_mira') # optimizer=2 step=0.001 best=500, k=500, uniq, stream mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t'] - logging.info('Executing: {}'.format(' '.join(mira_cmd))) + logger.info('Executing: {}'.format(' '.join(mira_cmd))) self.decoder = util.popen_io(mira_cmd) self.lock = util.FIFOLock() diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 43cc43b4..db831712 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -19,6 +19,8 @@ import util # Dummy input token that is unlikely to appear in normalized data (but no fatal errors if it does) LIKELY_OOV = '(OOV)' +logger = logging.getLogger('rt') + class RealtimeDecoder: '''Do not use directly unless you know what you're doing. Use RealtimeTranslator.''' @@ -48,7 +50,7 @@ class RealtimeDecoder: self.decoder = decoder.MIRADecoder(decoder_config_file, decoder_weights) def close(self, force=False): - logging.info('Closing decoder and removing {}'.format(self.tmp)) + logger.info('Closing decoder and removing {}'.format(self.tmp)) self.decoder.close(force) self.ref_fifo.close() shutil.rmtree(self.tmp) @@ -75,7 +77,7 @@ class RealtimeTranslator: self.config = configdir # Temporary work dir self.tmp = tempfile.mkdtemp(dir=tmpdir, prefix='realtime.') - logging.info('Using temp dir {}'.format(self.tmp)) + logger.info('Using temp dir {}'.format(self.tmp)) # Normalization self.norm = norm @@ -127,10 +129,10 @@ class RealtimeTranslator: def close(self, force=False): '''Cleanup''' if force: - logging.info('Forced shutdown: stopping immediately') + logger.info('Forced shutdown: stopping immediately') for ctx_name in list(self.ctx_names): self.drop_ctx(ctx_name, force) - logging.info('Closing processes') + logger.info('Closing processes') self.aligner.close(force) if self.norm: if not force: @@ -141,7 +143,7 @@ class RealtimeTranslator: if not force: self.tokenizer_lock.release() self.detokenizer_lock.release() - logging.info('Deleting {}'.format(self.tmp)) + logger.info('Deleting {}'.format(self.tmp)) shutil.rmtree(self.tmp) def lazy_ctx(self, ctx_name): @@ -149,7 +151,7 @@ class RealtimeTranslator: NOT threadsafe, acquire ctx_name lock before calling.''' if ctx_name in self.ctx_names: return - logging.info('New context: {}'.format(ctx_name)) + logger.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) self.ctx_data[ctx_name] = [] self.grammar_files[ctx_name] = collections.deque() @@ -164,11 +166,11 @@ class RealtimeTranslator: if not force: lock.acquire() if ctx_name not in self.ctx_names: - logging.info('No context found, no action: {}'.format(ctx_name)) + logger.info('No context found, no action: {}'.format(ctx_name)) if not force: lock.release() return - logging.info('Dropping context: {}'.format(ctx_name)) + logger.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) self.ctx_data.pop(ctx_name) self.extractor.drop_ctx(ctx_name) @@ -193,7 +195,7 @@ class RealtimeTranslator: grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: - logging.info('Grammar cache hit: {}'.format(grammar_file)) + logger.info('Grammar cache hit: {}'.format(grammar_file)) self.extractor_lock.release() return grammar_file # Extract and cache @@ -226,18 +228,18 @@ class RealtimeTranslator: return '' if self.norm: sentence = self.tokenize(sentence) - logging.info('Normalized input: {}'.format(sentence)) + logger.info('Normalized input: {}'.format(sentence)) grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() - logging.info('Translation time: {} seconds'.format(stop_time - start_time)) + logger.info('Translation time: {} seconds'.format(stop_time - start_time)) # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() if self.norm: - logging.info('Normalized translation: {}'.format(hyp)) + logger.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) lock.release() return hyp @@ -271,9 +273,9 @@ class RealtimeTranslator: cmd_name = cmd_name[0] (command, nargs) = self.COMMANDS.get(cmd_name, (None, None)) if command and len(args[1:]) in nargs: - logging.info('{} ({}) ||| {}'.format(cmd_name, ctx_name, ' ||| '.join(args[1:]))) + logger.info('{} ({}) ||| {}'.format(cmd_name, ctx_name, ' ||| '.join(args[1:]))) return command(*args[1:], ctx_name=ctx_name) - logging.info('ERROR: command: {}'.format(' ||| '.join(args))) + logger.info('ERROR: command: {}'.format(' ||| '.join(args))) def learn(self, source, target, ctx_name=None): '''Learn from training instance (inc extracting grammar if needed) @@ -282,7 +284,7 @@ class RealtimeTranslator: lock.acquire() self.lazy_ctx(ctx_name) if '' in (source.strip(), target.strip()): - logging.info('ERROR: empty source or target: {} ||| {}'.format(source, target)) + logger.info('ERROR: empty source or target: {} ||| {}'.format(source, target)) lock.release() return if self.norm: @@ -294,15 +296,15 @@ class RealtimeTranslator: # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) - logging.info('MIRA: {}'.format(mira_log)) + logger.info('MIRA: {}'.format(mira_log)) # Add to HPYPLM by writing to fifo (read on next translation) - logging.info('Adding to HPYPLM: {}'.format(target)) + logger.info('Adding to HPYPLM: {}'.format(target)) decoder.ref_fifo.write('{}\n'.format(target)) decoder.ref_fifo.flush() # Store incremental data for save/load self.ctx_data[ctx_name].append((source, target, alignment)) # Add aligned sentence pair to grammar extractor - logging.info('Adding to bitext: {} ||| {} ||| {}'.format(source, target, alignment)) + logger.info('Adding to bitext: {} ||| {} ||| {}'.format(source, target, alignment)) self.extractor.add_instance(source, target, alignment, ctx_name) # Clear (old) cached grammar rm_grammar = self.grammar_dict[ctx_name].pop(source) @@ -315,7 +317,7 @@ class RealtimeTranslator: self.lazy_ctx(ctx_name) ctx_data = self.ctx_data[ctx_name] out = open(filename, 'w') if filename else sys.stdout - logging.info('Saving state for context ({}) with {} sentences'.format(ctx_name, len(ctx_data))) + logger.info('Saving state for context ({}) with {} sentences'.format(ctx_name, len(ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) @@ -333,8 +335,8 @@ class RealtimeTranslator: input = open(filename) if filename else sys.stdin # Non-initial load error if ctx_data: - logging.info('ERROR: Incremental data has already been added to context ({})'.format(ctx_name)) - logging.info(' State can only be loaded to a new context.') + logger.info('ERROR: Incremental data has already been added to context ({})'.format(ctx_name)) + logger.info(' State can only be loaded to a new context.') lock.release() return # Many things can go wrong if bad state data is given @@ -343,7 +345,7 @@ class RealtimeTranslator: line = input.readline().strip() # Throws exception if bad line decoder.decoder.set_weights(line) - logging.info('Loading state...') + logger.info('Loading state...') start_time = time.time() # Lines source ||| target ||| alignment while True: @@ -362,12 +364,12 @@ class RealtimeTranslator: decoder.ref_fifo.write('{}\n'.format(target)) decoder.ref_fifo.flush() stop_time = time.time() - logging.info('Loaded state for context ({}) with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time)) + logger.info('Loaded state for context ({}) with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time)) lock.release() # Recover from bad load attempt by restarting context. # Guaranteed not to cause data loss since only a new context can load state. except: - logging.info('ERROR: could not load state, restarting context ({})'.format(ctx_name)) + logger.info('ERROR: could not load state, restarting context ({})'.format(ctx_name)) # ctx_name is already owned and needs to be restarted before other blocking threads use self.drop_ctx(ctx_name, force=True) self.lazy_ctx(ctx_name) -- cgit v1.2.3 From 77aa94390b678dc6f2faad52ec52847104a5f9b6 Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Tue, 8 Oct 2013 17:25:00 -0400 Subject: Better logging --- realtime/rt/rt.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index db831712..f66d3a4d 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -151,7 +151,7 @@ class RealtimeTranslator: NOT threadsafe, acquire ctx_name lock before calling.''' if ctx_name in self.ctx_names: return - logger.info('New context: {}'.format(ctx_name)) + logger.info('({}) New context'.format(ctx_name)) self.ctx_names.add(ctx_name) self.ctx_data[ctx_name] = [] self.grammar_files[ctx_name] = collections.deque() @@ -166,11 +166,11 @@ class RealtimeTranslator: if not force: lock.acquire() if ctx_name not in self.ctx_names: - logger.info('No context found, no action: {}'.format(ctx_name)) + logger.info('({}) No context found, no action taken'.format(ctx_name)) if not force: lock.release() return - logger.info('Dropping context: {}'.format(ctx_name)) + logger.info('({}) Dropping context'.format(ctx_name)) self.ctx_names.remove(ctx_name) self.ctx_data.pop(ctx_name) self.extractor.drop_ctx(ctx_name) @@ -195,7 +195,7 @@ class RealtimeTranslator: grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: - logger.info('Grammar cache hit: {}'.format(grammar_file)) + logger.info('({}) Grammar cache hit: {}'.format(ctx_name, grammar_file)) self.extractor_lock.release() return grammar_file # Extract and cache @@ -228,18 +228,18 @@ class RealtimeTranslator: return '' if self.norm: sentence = self.tokenize(sentence) - logger.info('Normalized input: {}'.format(sentence)) + logger.info('({}) Normalized input: {}'.format(ctx_name, sentence)) grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() - logger.info('Translation time: {} seconds'.format(stop_time - start_time)) + logger.info('({}) Translation time: {} seconds'.format(ctx_name, stop_time - start_time)) # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() if self.norm: - logger.info('Normalized translation: {}'.format(hyp)) + logger.info('({}) Normalized translation: {}'.format(ctx_name, hyp)) hyp = self.detokenize(hyp) lock.release() return hyp @@ -273,7 +273,7 @@ class RealtimeTranslator: cmd_name = cmd_name[0] (command, nargs) = self.COMMANDS.get(cmd_name, (None, None)) if command and len(args[1:]) in nargs: - logger.info('{} ({}) ||| {}'.format(cmd_name, ctx_name, ' ||| '.join(args[1:]))) + logger.info('({}) {} ||| {}'.format(ctx_name, cmd_name, ' ||| '.join(args[1:]))) return command(*args[1:], ctx_name=ctx_name) logger.info('ERROR: command: {}'.format(' ||| '.join(args))) @@ -284,7 +284,7 @@ class RealtimeTranslator: lock.acquire() self.lazy_ctx(ctx_name) if '' in (source.strip(), target.strip()): - logger.info('ERROR: empty source or target: {} ||| {}'.format(source, target)) + logger.info('({}) ERROR: empty source or target: {} ||| {}'.format(ctx_name, source, target)) lock.release() return if self.norm: @@ -296,15 +296,15 @@ class RealtimeTranslator: # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) - logger.info('MIRA: {}'.format(mira_log)) + logger.info('({}) MIRA HBF: {}'.format(ctx_name, mira_log)) # Add to HPYPLM by writing to fifo (read on next translation) - logger.info('Adding to HPYPLM: {}'.format(target)) + logger.info('({}) Adding to HPYPLM: {}'.format(ctx_name, target)) decoder.ref_fifo.write('{}\n'.format(target)) decoder.ref_fifo.flush() # Store incremental data for save/load self.ctx_data[ctx_name].append((source, target, alignment)) # Add aligned sentence pair to grammar extractor - logger.info('Adding to bitext: {} ||| {} ||| {}'.format(source, target, alignment)) + logger.info('({}) Adding to bitext: {} ||| {} ||| {}'.format(ctx_name, source, target, alignment)) self.extractor.add_instance(source, target, alignment, ctx_name) # Clear (old) cached grammar rm_grammar = self.grammar_dict[ctx_name].pop(source) @@ -317,7 +317,7 @@ class RealtimeTranslator: self.lazy_ctx(ctx_name) ctx_data = self.ctx_data[ctx_name] out = open(filename, 'w') if filename else sys.stdout - logger.info('Saving state for context ({}) with {} sentences'.format(ctx_name, len(ctx_data))) + logger.info('({}) Saving state with {} sentences'.format(ctx_name, len(ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) @@ -335,8 +335,8 @@ class RealtimeTranslator: input = open(filename) if filename else sys.stdin # Non-initial load error if ctx_data: - logger.info('ERROR: Incremental data has already been added to context ({})'.format(ctx_name)) - logger.info(' State can only be loaded to a new context.') + logger.info('({}) ERROR: Incremental data has already been added to context'.format(ctx_name)) + logger.info(' State can only be loaded to a new context.') lock.release() return # Many things can go wrong if bad state data is given @@ -345,7 +345,7 @@ class RealtimeTranslator: line = input.readline().strip() # Throws exception if bad line decoder.decoder.set_weights(line) - logger.info('Loading state...') + logger.info('({}) Loading state...'.format(ctx_name)) start_time = time.time() # Lines source ||| target ||| alignment while True: @@ -364,12 +364,12 @@ class RealtimeTranslator: decoder.ref_fifo.write('{}\n'.format(target)) decoder.ref_fifo.flush() stop_time = time.time() - logger.info('Loaded state for context ({}) with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time)) + logger.info('({}) Loaded state with {} sentences in {} seconds'.format(ctx_name, len(ctx_data), stop_time - start_time)) lock.release() # Recover from bad load attempt by restarting context. # Guaranteed not to cause data loss since only a new context can load state. except: - logger.info('ERROR: could not load state, restarting context ({})'.format(ctx_name)) + logger.info('({}) ERROR: could not load state, restarting context'.format(ctx_name)) # ctx_name is already owned and needs to be restarted before other blocking threads use self.drop_ctx(ctx_name, force=True) self.lazy_ctx(ctx_name) -- cgit v1.2.3 From 3564060d2567816e526f5c002aee906e85f51d50 Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Mon, 14 Oct 2013 17:16:24 -0400 Subject: Save/load from StringIO --- realtime/rt/rt.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index f66d3a4d..7cc5bc10 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -5,8 +5,9 @@ import collections import logging import os import shutil -import sys +import StringIO import subprocess +import sys import tempfile import threading import time @@ -311,28 +312,46 @@ class RealtimeTranslator: os.remove(rm_grammar) lock.release() - def save_state(self, filename=None, ctx_name=None): + def save_state(self, file_or_stringio=None, ctx_name=None): + '''Write state (several lines terminated by EOF line) to file, buffer, or stdout''' lock = self.ctx_locks[ctx_name] lock.acquire() self.lazy_ctx(ctx_name) ctx_data = self.ctx_data[ctx_name] - out = open(filename, 'w') if filename else sys.stdout + # Filename, StringIO or None (stdout) + if file_or_stringio: + if isinstance(file_or_stringio, StringIO.StringIO): + out = file_or_stringio + else: + out = open(file_or_stringio, 'w') + else: + out = sys.stdout logger.info('({}) Saving state with {} sentences'.format(ctx_name, len(ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) out.write('EOF\n') - if filename: + # Close if file + if file_or_stringio and not isinstance(file_or_stringio, StringIO.StringIO): out.close() lock.release() - def load_state(self, filename=None, ctx_name=None): + def load_state(self, file_or_stringio=None, ctx_name=None): + '''Load state (several lines terminated by EOF line) from file, buffer, or stdin. + Restarts context on any error.''' lock = self.ctx_locks[ctx_name] lock.acquire() self.lazy_ctx(ctx_name) ctx_data = self.ctx_data[ctx_name] decoder = self.decoders[ctx_name] - input = open(filename) if filename else sys.stdin + # Filename, StringIO, or None (stdin) + if file_or_stringio: + if isinstance(file_or_stringio, StringIO.StringIO): + input = file_or_stringio.getvalue() + else: + input = open(file_or_stringio) + else: + input = sys.stdin # Non-initial load error if ctx_data: logger.info('({}) ERROR: Incremental data has already been added to context'.format(ctx_name)) -- cgit v1.2.3 From af80da3fecbd56554314b1135872272cc7d3793a Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Fri, 18 Oct 2013 16:18:04 -0400 Subject: wait() to avoid zombies --- realtime/rt/aligner.py | 3 +++ realtime/rt/decoder.py | 5 +++-- realtime/rt/rt.py | 2 ++ 3 files changed, 8 insertions(+), 2 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index bcc1ef87..c34805eb 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -57,8 +57,11 @@ class ForceAligner: if not force: self.lock.acquire() self.fwd_align.stdin.close() + self.fwd_align.wait() self.rev_align.stdin.close() + self.rev_align.wait() self.tools.stdin.close() + self.tools.wait() if not force: self.lock.release() diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index e6e7489d..ed45c248 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -13,13 +13,14 @@ class Decoder: if not force: self.lock.acquire() self.decoder.stdin.close() + self.decoder.wait() if not force: self.lock.release() def decode(self, sentence, grammar=None): '''Threadsafe, FIFO''' - input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) self.lock.acquire() + input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) self.decoder.stdin.write(input) hyp = self.decoder.stdout.readline().strip() self.lock.release() @@ -71,8 +72,8 @@ class MIRADecoder(Decoder): def update(self, sentence, grammar, reference): '''Threadsafe, FIFO''' - input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.lock.acquire() + input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.decoder.stdin.write(input) log = self.decoder.stdout.readline().strip() self.lock.release() diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 7cc5bc10..d1d01ad8 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -140,7 +140,9 @@ class RealtimeTranslator: self.tokenizer_lock.acquire() self.detokenizer_lock.acquire() self.tokenizer.stdin.close() + self.tokenizer.wait() self.detokenizer.stdin.close() + self.detokenizer.wait() if not force: self.tokenizer_lock.release() self.detokenizer_lock.release() -- cgit v1.2.3 From a1d78e9ba97af7d3b442a2b8759035a0c6f6b35a Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Tue, 22 Oct 2013 04:16:07 -0400 Subject: Don't getvalue() yet --- realtime/rt/rt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'realtime/rt') diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index d1d01ad8..c0aec410 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -349,7 +349,7 @@ class RealtimeTranslator: # Filename, StringIO, or None (stdin) if file_or_stringio: if isinstance(file_or_stringio, StringIO.StringIO): - input = file_or_stringio.getvalue() + input = file_or_stringio else: input = open(file_or_stringio) else: -- cgit v1.2.3 From 074fa88375967adababc632ea763e9dea389831e Mon Sep 17 00:00:00 2001 From: mjdenkowski Date: Wed, 30 Oct 2013 23:37:39 -0400 Subject: Specify heuristic for force alignment --- realtime/rt/aligner.py | 4 ++-- word-aligner/force_align.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'realtime/rt') diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index c34805eb..e1782496 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -10,7 +10,7 @@ logger = logging.getLogger('rt.aligner') class ForceAligner: - def __init__(self, fwd_params, fwd_err, rev_params, rev_err): + def __init__(self, fwd_params, fwd_err, rev_params, rev_err, heuristic='grow-diag-final-and'): cdec_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) fast_align = os.path.join(cdec_root, 'word-aligner', 'fast_align') @@ -21,7 +21,7 @@ class ForceAligner: fwd_cmd = [fast_align, '-i', '-', '-d', '-T', fwd_T, '-m', fwd_m, '-f', fwd_params] rev_cmd = [fast_align, '-i', '-', '-d', '-T', rev_T, '-m', rev_m, '-f', rev_params, '-r'] - tools_cmd = [atools, '-i', '-', '-j', '-', '-c', 'grow-diag-final-and'] + tools_cmd = [atools, '-i', '-', '-j', '-', '-c', heuristic] logger.info('Executing: {}'.format(' '.join(fwd_cmd))) self.fwd_align = util.popen_io(fwd_cmd) diff --git a/word-aligner/force_align.py b/word-aligner/force_align.py index 8b6ca224..b03d446e 100755 --- a/word-aligner/force_align.py +++ b/word-aligner/force_align.py @@ -5,13 +5,15 @@ import sys def main(): - if len(sys.argv[1:]) != 4: + if len(sys.argv[1:]) < 4: sys.stderr.write('run:\n') sys.stderr.write(' fast_align -i corpus.f-e -d -v -o -p fwd_params >fwd_align 2>fwd_err\n') sys.stderr.write(' fast_align -i corpus.f-e -r -d -v -o -p rev_params >rev_align 2>rev_err\n') sys.stderr.write('\n') sys.stderr.write('then run:\n') - sys.stderr.write(' {} fwd_params fwd_err rev_params rev_err out.f-e.gdfa\n'.format(sys.argv[0])) + sys.stderr.write(' {} fwd_params fwd_err rev_params rev_err [heuristic] out.f-e.gdfa\n'.format(sys.argv[0])) + sys.stderr.write('\n') + sys.stderr.write('where heuristic is one of: (intersect union grow-diag grow-diag-final grow-diag-final-and) default=grow-diag-final-and\n') sys.exit(2) # Hook into realtime -- cgit v1.2.3