From a9d3e4efef96ee97a3901e6c37bbebd546c66326 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Sun, 15 Sep 2013 12:01:44 -0700 Subject: Fixed pathes for mkconfig --- realtime/rt/util.py | 39 +++++++++++++++------------------------ 1 file changed, 15 insertions(+), 24 deletions(-) (limited to 'realtime/rt/util.py') diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 10e94909..b823e12f 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -14,28 +14,31 @@ SA_INI_FILES = set(( )) def cdec_ini_for_config(config): - cdec_ini_handle(config, os.path.basename, hpyplm_rm_ref) + # This is a list of (k, v), not a ConfigObj or dict + for i in range(len(config)): + if config[i][0] == 'feature_function': + if config[i][1].startswith('KLanguageModel'): + f = config[i][1].split() + f[-1] = 'mono.klm' + config[i][1] = ' '.join(f) + elif config[i][1].startswith('External'): + config[i][1] = 'External libcdec_ff_hpyplm.so corpus.hpyplm' def cdec_ini_for_realtime(config, path, ref_fifo): - cdec_ini_handle(config, lambda x: os.path.join(path, x), lambda x: hpyplm_add_ref(x, ref_fifo)) - -def cdec_ini_handle(config, path_fn, hpyplm_fn): # This is a list of (k, v), not a ConfigObj or dict for i in range(len(config)): if config[i][0] == 'feature_function': if config[i][1].startswith('KLanguageModel'): f = config[i][1].split() - f[-1] = path_fn(f[-1]) + f[-1] = os.path.join(path, f[-1]) config[i][1] = ' '.join(f) elif config[i][1].startswith('External'): f = config[i][1].split() - if f[1].endswith('libcdec_ff_hpyplm.so'): - # Modify paths - for j in range(1, len(f)): - if not f[j].startswith('-'): - f[j] = path_fn(f[j]) - # Modify hpyplm args - hpyplm_fn(f) + f[1] = os.path.join(path, f[1]) + f[2] = os.path.join(path, f[2]) + f.append('-r') + f.append(ref) + f.append('-t') config[i][1] = ' '.join(f) def consume_stream(stream): @@ -44,18 +47,6 @@ def consume_stream(stream): pass threading.Thread(target=consume, args=(stream,)).start() -def hpyplm_add_ref(f, ref): - f.append('-r') - f.append(ref) - f.append('-t') - -def hpyplm_rm_ref(f): - for i in range(1, len(f)): - if f[i] == '-r': - f.pop(i) - f.pop(i) - return - def popen_io(cmd): p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) consume_stream(p.stderr) -- cgit v1.2.3 From 15084a749ee241b261458898640ac43a459acb59 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Sun, 15 Sep 2013 19:51:08 -0700 Subject: Typo fix --- realtime/rt/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'realtime/rt/util.py') diff --git a/realtime/rt/util.py b/realtime/rt/util.py index b823e12f..6e07f116 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -37,7 +37,7 @@ def cdec_ini_for_realtime(config, path, ref_fifo): f[1] = os.path.join(path, f[1]) f[2] = os.path.join(path, f[2]) f.append('-r') - f.append(ref) + f.append(ref_fifo) f.append('-t') config[i][1] = ' '.join(f) -- cgit v1.2.3 From ce358ecd6f5132f8bdbbda2272ff4f04ff883e30 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Thu, 26 Sep 2013 14:28:42 -0700 Subject: FIFO Locks --- realtime/rt/aligner.py | 6 ++--- realtime/rt/decoder.py | 24 +++++++++--------- realtime/rt/rt.py | 66 +++++++++++++++++++++++++------------------------- realtime/rt/util.py | 19 +++++++++++++++ 4 files changed, 67 insertions(+), 48 deletions(-) (limited to 'realtime/rt/util.py') diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index a14121db..62ce32b8 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -31,7 +31,7 @@ class ForceAligner: self.tools = util.popen_io(tools_cmd) # Used to guarantee thread safety - self.semaphore = threading.Semaphore() + self.lock = util.FIFOLock() def align(self, source, target): '''Threadsafe''' @@ -39,7 +39,7 @@ class ForceAligner: def align_formatted(self, line): '''Threadsafe''' - self.semaphore.acquire() + self.lock.acquire() self.fwd_align.stdin.write('{}\n'.format(line)) self.rev_align.stdin.write('{}\n'.format(line)) # f words ||| e words ||| links ||| score @@ -48,7 +48,7 @@ class ForceAligner: self.tools.stdin.write('{}\n'.format(fwd_line)) self.tools.stdin.write('{}\n'.format(rev_line)) al_line = self.tools.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return al_line def close(self): diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 72b5b959..7c36b441 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -9,18 +9,18 @@ class Decoder: def close(self, force=False): if not force: - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.close() if not force: - self.semaphore.release() + self.lock.release() def decode(self, sentence, grammar=None): '''Threadsafe''' input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write(input) hyp = self.decoder.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return hyp class CdecDecoder(Decoder): @@ -31,7 +31,7 @@ class CdecDecoder(Decoder): decoder_cmd = [decoder, '-c', config, '-w', weights] logging.info('Executing: {}'.format(' '.join(decoder_cmd))) self.decoder = util.popen_io(decoder_cmd) - self.semaphore = threading.Semaphore() + self.lock = util.FIFOLock() class MIRADecoder(Decoder): @@ -42,27 +42,27 @@ class MIRADecoder(Decoder): mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t'] logging.info('Executing: {}'.format(' '.join(mira_cmd))) self.decoder = util.popen_io(mira_cmd) - self.semaphore = threading.Semaphore() + self.lock = util.FIFOLock() def get_weights(self): '''Threadsafe''' - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| WRITE\n') weights = self.decoder.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return weights def set_weights(self, w_line): '''Threadsafe''' - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) - self.semaphore.release() + self.lock.release() def update(self, sentence, grammar, reference): '''Threadsafe''' input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) - self.semaphore.acquire() + self.lock.acquire() self.decoder.stdin.write(input) log = self.decoder.stdout.readline().strip() - self.semaphore.release() + self.lock.release() return log diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index f8126283..1e78e188 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -74,9 +74,9 @@ class RealtimeTranslator: self.norm = norm if self.norm: self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u']) - self.tokenizer_sem = threading.Semaphore() + self.tokenizer_lock = util.FIFOLock() self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')]) - self.detokenizer_sem = threading.Semaphore() + self.detokenizer_lock = util.FIFOLock() # Word aligner fwd_params = os.path.join(configdir, 'a.fwd_params') @@ -97,12 +97,12 @@ class RealtimeTranslator: self.ctx_names = set() # All context-dependent operations are atomic - self.ctx_sems = collections.defaultdict(threading.Semaphore) + self.ctx_locks = collections.defaultdict(util.FIFOLock) # ctx -> list of (source, target, alignment) self.ctx_data = {} # Grammar extractor is not threadsafe - self.extractor_sem = threading.Semaphore() + self.extractor_lock = util.FIFOLock() # ctx -> deque of file self.grammar_files = {} # ctx -> dict of {sentence: file} @@ -138,9 +138,9 @@ class RealtimeTranslator: def lazy_ctx(self, ctx_name): '''Initialize a context (inc starting a new decoder) if needed''' - self.ctx_sems[ctx_name].acquire() + self.ctx_locks[ctx_name].acquire() if ctx_name in self.ctx_names: - self.ctx_sems[ctx_name].release() + self.ctx_locks[ctx_name].release() return logging.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) @@ -149,13 +149,13 @@ class RealtimeTranslator: self.grammar_dict[ctx_name] = {} tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) - self.ctx_sems[ctx_name].release() + self.ctx_locks[ctx_name].release() def drop_ctx(self, ctx_name, force=False): '''Delete a context (inc stopping the decoder)''' if not force: - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) self.ctx_data.pop(ctx_name) @@ -163,30 +163,30 @@ class RealtimeTranslator: self.grammar_files.pop(ctx_name) self.grammar_dict.pop(ctx_name) self.decoders.pop(ctx_name).close(force) - self.ctx_sems.pop(ctx_name) + self.ctx_locks.pop(ctx_name) if not force: - sem.release() + lock.release() def grammar(self, sentence, ctx_name=None): '''Extract a sentence-level grammar on demand (or return cached)''' self.lazy_ctx(ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() grammar_dict = self.grammar_dict[ctx_name] grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: logging.info('Grammar cache hit: {}'.format(grammar_file)) - sem.release() + lock.release() return grammar_file # Extract and cache (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - self.extractor_sem.acquire() + self.extractor_lock.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - self.extractor_sem.release() + self.extractor_lock.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() @@ -196,7 +196,7 @@ class RealtimeTranslator: os.remove(rm_grammar) grammar_files.append(sentence) grammar_dict[sentence] = grammar_file - sem.release() + lock.release() return grammar_file def decode(self, sentence, ctx_name=None): @@ -211,8 +211,8 @@ class RealtimeTranslator: # grammar method is threadsafe grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() @@ -220,24 +220,24 @@ class RealtimeTranslator: # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() - sem.release() + lock.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) return hyp def tokenize(self, line): - self.tokenizer_sem.acquire() + self.tokenizer_lock.acquire() self.tokenizer.stdin.write('{}\n'.format(line)) tok_line = self.tokenizer.stdout.readline().strip() - self.tokenizer_sem.release() + self.tokenizer_lock.release() return tok_line def detokenize(self, line): - self.detokenizer_sem.acquire() + self.detokenizer_lock.acquire() self.detokenizer.stdin.write('{}\n'.format(line)) detok_line = self.detokenizer.stdout.readline().strip() - self.detokenizer_sem.release() + self.detokenizer_lock.release() return detok_line # TODO @@ -263,8 +263,8 @@ class RealtimeTranslator: alignment = self.aligner.align(source, target) # grammar method is threadsafe grammar_file = self.grammar(source, ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) @@ -281,27 +281,27 @@ class RealtimeTranslator: # Clear (old) cached grammar rm_grammar = self.grammar_dict[ctx_name].pop(source) os.remove(rm_grammar) - sem.release() + lock.release() def save_state(self, filename=None, ctx_name=None): self.lazy_ctx(ctx_name) out = open(filename, 'w') if filename else sys.stdout - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() ctx_data = self.ctx_data[ctx_name] logging.info('Saving state with {} sentences'.format(len(self.ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) - sem.release() + lock.release() out.write('EOF\n') if filename: out.close() def load_state(self, input=sys.stdin, ctx_name=None): self.lazy_ctx(ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() ctx_data = self.ctx_data[ctx_name] decoder = self.decoders[ctx_name] # Non-initial load error @@ -329,4 +329,4 @@ class RealtimeTranslator: self.ref_fifo.flush() stop_time = time.time() logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time)) - sem.release() + lock.release() diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 6e07f116..05dcae96 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -1,4 +1,5 @@ import os +import Queue import subprocess import sys import threading @@ -13,6 +14,24 @@ SA_INI_FILES = set(( 'precompute_file', )) +class FIFOLock: + + def __init__(self): + self.q = Queue.Queue() + self.i = 0 + + def acquire(self): + self.i += 1 + if self.i > 1: + event = threading.Event() + self.q.put(event) + event.wait() + + def release(self): + self.i -= 1 + if self.i > 0: + self.q.get().set() + def cdec_ini_for_config(config): # This is a list of (k, v), not a ConfigObj or dict for i in range(len(config)): -- cgit v1.2.3 From 6fd8326590fbe02d97ca2e897a95131b47413b09 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Fri, 27 Sep 2013 13:39:24 -0700 Subject: Decoding and learning with multiple contexts is threadsafe and FIFO. --- realtime/realtime.py | 17 ++++++++----- realtime/rt/aligner.py | 10 +++++--- realtime/rt/decoder.py | 8 +++---- realtime/rt/rt.py | 65 ++++++++++++++++++++++++++++---------------------- realtime/rt/util.py | 8 +++++++ 5 files changed, 67 insertions(+), 41 deletions(-) (limited to 'realtime/rt/util.py') diff --git a/realtime/realtime.py b/realtime/realtime.py index bbec288b..38da4413 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -31,22 +31,27 @@ def test1(translator, input, output, ctx_name): out.close() def debug(translator, input): - # Test 1: identical output + # Test 1: multiple contexts threads = [] for i in range(4): t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i))) threads.append(t) t.start() time.sleep(30) - for t in threads: - t.join() - # Test 2: flood (same number of lines) - threads = [] + # Test 2: flood out = open('{}.out.flood'.format(input), 'w') - for line in open(input): + inp = open(input) + while True: + line = inp.readline() + if not line: + break + line = line.strip() t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None)) threads.append(t) t.start() + time.sleep(1) + translator.drop_ctx(None) + # Join test threads for t in threads: t.join() diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py index 62ce32b8..def3fcb5 100644 --- a/realtime/rt/aligner.py +++ b/realtime/rt/aligner.py @@ -34,11 +34,11 @@ class ForceAligner: self.lock = util.FIFOLock() def align(self, source, target): - '''Threadsafe''' + '''Threadsafe, FIFO''' return self.align_formatted('{} ||| {}'.format(source, target)) def align_formatted(self, line): - '''Threadsafe''' + '''Threadsafe, FIFO''' self.lock.acquire() self.fwd_align.stdin.write('{}\n'.format(line)) self.rev_align.stdin.write('{}\n'.format(line)) @@ -51,10 +51,14 @@ class ForceAligner: self.lock.release() return al_line - def close(self): + def close(self, force=False): + if not force: + self.lock.acquire() self.fwd_align.stdin.close() self.rev_align.stdin.close() self.tools.stdin.close() + if not force: + self.lock.release() def read_err(self, err): (T, m) = ('', '') diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py index 7c36b441..da646f68 100644 --- a/realtime/rt/decoder.py +++ b/realtime/rt/decoder.py @@ -15,7 +15,7 @@ class Decoder: self.lock.release() def decode(self, sentence, grammar=None): - '''Threadsafe''' + '''Threadsafe, FIFO''' input = '{s}\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence) self.lock.acquire() self.decoder.stdin.write(input) @@ -45,7 +45,7 @@ class MIRADecoder(Decoder): self.lock = util.FIFOLock() def get_weights(self): - '''Threadsafe''' + '''Threadsafe, FIFO''' self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| WRITE\n') weights = self.decoder.stdout.readline().strip() @@ -53,13 +53,13 @@ class MIRADecoder(Decoder): return weights def set_weights(self, w_line): - '''Threadsafe''' + '''Threadsafe, FIFO''' self.lock.acquire() self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line)) self.lock.release() def update(self, sentence, grammar, reference): - '''Threadsafe''' + '''Threadsafe, FIFO''' input = 'LEARN ||| {s} ||| {r}\n'.format(s=sentence, g=grammar, r=reference) self.lock.acquire() self.decoder.stdin.write(input) diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 1e78e188..5ace5d59 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -129,18 +129,23 @@ class RealtimeTranslator: for ctx_name in list(self.ctx_names): self.drop_ctx(ctx_name, force) logging.info('Closing processes') - self.aligner.close() + self.aligner.close(force) if self.norm: + if not force: + self.tokenizer_lock.acquire() + self.detokenizer_lock.acquire() self.tokenizer.stdin.close() self.detokenizer.stdin.close() + if not force: + self.tokenizer_lock.release() + self.detokenizer_lock.release() logging.info('Deleting {}'.format(self.tmp)) shutil.rmtree(self.tmp) def lazy_ctx(self, ctx_name): - '''Initialize a context (inc starting a new decoder) if needed''' - self.ctx_locks[ctx_name].acquire() + '''Initialize a context (inc starting a new decoder) if needed. + NOT threadsafe, acquire ctx_name lock before calling.''' if ctx_name in self.ctx_names: - self.ctx_locks[ctx_name].release() return logging.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) @@ -149,12 +154,12 @@ class RealtimeTranslator: self.grammar_dict[ctx_name] = {} tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) - self.ctx_locks[ctx_name].release() - def drop_ctx(self, ctx_name, force=False): - '''Delete a context (inc stopping the decoder)''' + def drop_ctx(self, ctx_name=None, force=False): + '''Delete a context (inc stopping the decoder) + Threadsafe and FIFO unless forced.''' + lock = self.ctx_locks[ctx_name] if not force: - lock = self.ctx_locks[ctx_name] lock.acquire() logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) @@ -168,25 +173,24 @@ class RealtimeTranslator: lock.release() def grammar(self, sentence, ctx_name=None): - '''Extract a sentence-level grammar on demand (or return cached)''' + '''Extract a sentence-level grammar on demand (or return cached) + Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock + before calling.''' + self.extractor_lock.acquire() self.lazy_ctx(ctx_name) - lock = self.ctx_locks[ctx_name] - lock.acquire() grammar_dict = self.grammar_dict[ctx_name] grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: logging.info('Grammar cache hit: {}'.format(grammar_file)) - lock.release() + self.extractor_lock.release() return grammar_file # Extract and cache (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - self.extractor_lock.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - self.extractor_lock.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() @@ -196,23 +200,25 @@ class RealtimeTranslator: os.remove(rm_grammar) grammar_files.append(sentence) grammar_dict[sentence] = grammar_file - lock.release() + self.extractor_lock.release() return grammar_file def decode(self, sentence, ctx_name=None): - '''Decode a sentence (inc extracting a grammar if needed)''' + '''Decode a sentence (inc extracting a grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() self.lazy_ctx(ctx_name) + logging.info('DECODE: {}'.format(sentence)) # Empty in, empty out if sentence.strip() == '': + lock.release() return '' if self.norm: sentence = self.tokenize(sentence) logging.info('Normalized input: {}'.format(sentence)) - # grammar method is threadsafe grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] - lock = self.ctx_locks[ctx_name] - lock.acquire() start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() @@ -220,10 +226,10 @@ class RealtimeTranslator: # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() - lock.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) + lock.release() return hyp def tokenize(self, line): @@ -242,29 +248,32 @@ class RealtimeTranslator: # TODO def command_line(self, line, ctx_name=None): - args = [f.strip() for f in line.split('|||')] - try: + args = [f.strip() for f in line.split('|||')] + #try: if len(args) == 2 and not args[1]: self.commands[args[0]](ctx_name) else: self.commands[args[0]](*args[1:], ctx_name=ctx_name) - except: - logging.info('Command error: {}'.format(' ||| '.join(args))) + #except: + # logging.info('Command error: {}'.format(' ||| '.join(args))) def learn(self, source, target, ctx_name=None): + '''Learn from training instance (inc extracting grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() self.lazy_ctx(ctx_name) + logging.info('LEARN: {}'.format(source)) if '' in (source.strip(), target.strip()): logging.info('Error empty source or target: {} ||| {}'.format(source, target)) + lock.release() return if self.norm: source = self.tokenize(source) target = self.tokenize(target) - # Align instance (threadsafe) + # Align instance alignment = self.aligner.align(source, target) - # grammar method is threadsafe grammar_file = self.grammar(source, ctx_name) - lock = self.ctx_locks[ctx_name] - lock.acquire() # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) diff --git a/realtime/rt/util.py b/realtime/rt/util.py index 05dcae96..52767dac 100644 --- a/realtime/rt/util.py +++ b/realtime/rt/util.py @@ -15,22 +15,30 @@ SA_INI_FILES = set(( )) class FIFOLock: + '''Lock that preserves FIFO order of blocking threads''' def __init__(self): self.q = Queue.Queue() self.i = 0 + self.lock = threading.Lock() def acquire(self): + self.lock.acquire() self.i += 1 if self.i > 1: event = threading.Event() self.q.put(event) + self.lock.release() event.wait() + return + self.lock.release() def release(self): + self.lock.acquire() self.i -= 1 if self.i > 0: self.q.get().set() + self.lock.release() def cdec_ini_for_config(config): # This is a list of (k, v), not a ConfigObj or dict -- cgit v1.2.3