diff options
author | Michael Denkowski <mdenkows@cs.cmu.edu> | 2013-09-27 13:39:24 -0700 |
---|---|---|
committer | Michael Denkowski <mdenkows@cs.cmu.edu> | 2013-09-27 13:39:24 -0700 |
commit | b8116c5c3c7e31a276ff38fc8173eab37f292364 (patch) | |
tree | 249a8a368680c177251f6b41689e5b5739ee5da8 /realtime/rt/rt.py | |
parent | cb718c763e07b8e1417383ef7ae5c1aca36d2a0a (diff) |
Decoding and learning with multiple contexts is threadsafe and FIFO.
Diffstat (limited to 'realtime/rt/rt.py')
-rw-r--r-- | realtime/rt/rt.py | 65 |
1 files changed, 37 insertions, 28 deletions
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 1e78e188..5ace5d59 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -129,18 +129,23 @@ class RealtimeTranslator: for ctx_name in list(self.ctx_names): self.drop_ctx(ctx_name, force) logging.info('Closing processes') - self.aligner.close() + self.aligner.close(force) if self.norm: + if not force: + self.tokenizer_lock.acquire() + self.detokenizer_lock.acquire() self.tokenizer.stdin.close() self.detokenizer.stdin.close() + if not force: + self.tokenizer_lock.release() + self.detokenizer_lock.release() logging.info('Deleting {}'.format(self.tmp)) shutil.rmtree(self.tmp) def lazy_ctx(self, ctx_name): - '''Initialize a context (inc starting a new decoder) if needed''' - self.ctx_locks[ctx_name].acquire() + '''Initialize a context (inc starting a new decoder) if needed. + NOT threadsafe, acquire ctx_name lock before calling.''' if ctx_name in self.ctx_names: - self.ctx_locks[ctx_name].release() return logging.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) @@ -149,12 +154,12 @@ class RealtimeTranslator: self.grammar_dict[ctx_name] = {} tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) - self.ctx_locks[ctx_name].release() - def drop_ctx(self, ctx_name, force=False): - '''Delete a context (inc stopping the decoder)''' + def drop_ctx(self, ctx_name=None, force=False): + '''Delete a context (inc stopping the decoder) + Threadsafe and FIFO unless forced.''' + lock = self.ctx_locks[ctx_name] if not force: - lock = self.ctx_locks[ctx_name] lock.acquire() logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) @@ -168,25 +173,24 @@ class RealtimeTranslator: lock.release() def grammar(self, sentence, ctx_name=None): - '''Extract a sentence-level grammar on demand (or return cached)''' + '''Extract a sentence-level grammar on demand (or return cached) + Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock + before calling.''' + self.extractor_lock.acquire() self.lazy_ctx(ctx_name) - lock = self.ctx_locks[ctx_name] - lock.acquire() grammar_dict = self.grammar_dict[ctx_name] grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: logging.info('Grammar cache hit: {}'.format(grammar_file)) - lock.release() + self.extractor_lock.release() return grammar_file # Extract and cache (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - self.extractor_lock.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - self.extractor_lock.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() @@ -196,23 +200,25 @@ class RealtimeTranslator: os.remove(rm_grammar) grammar_files.append(sentence) grammar_dict[sentence] = grammar_file - lock.release() + self.extractor_lock.release() return grammar_file def decode(self, sentence, ctx_name=None): - '''Decode a sentence (inc extracting a grammar if needed)''' + '''Decode a sentence (inc extracting a grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() self.lazy_ctx(ctx_name) + logging.info('DECODE: {}'.format(sentence)) # Empty in, empty out if sentence.strip() == '': + lock.release() return '' if self.norm: sentence = self.tokenize(sentence) logging.info('Normalized input: {}'.format(sentence)) - # grammar method is threadsafe grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] - lock = self.ctx_locks[ctx_name] - lock.acquire() start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() @@ -220,10 +226,10 @@ class RealtimeTranslator: # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() - lock.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) + lock.release() return hyp def tokenize(self, line): @@ -242,29 +248,32 @@ class RealtimeTranslator: # TODO def command_line(self, line, ctx_name=None): - args = [f.strip() for f in line.split('|||')] - try: + args = [f.strip() for f in line.split('|||')] + #try: if len(args) == 2 and not args[1]: self.commands[args[0]](ctx_name) else: self.commands[args[0]](*args[1:], ctx_name=ctx_name) - except: - logging.info('Command error: {}'.format(' ||| '.join(args))) + #except: + # logging.info('Command error: {}'.format(' ||| '.join(args))) def learn(self, source, target, ctx_name=None): + '''Learn from training instance (inc extracting grammar if needed) + Threadsafe, FIFO''' + lock = self.ctx_locks[ctx_name] + lock.acquire() self.lazy_ctx(ctx_name) + logging.info('LEARN: {}'.format(source)) if '' in (source.strip(), target.strip()): logging.info('Error empty source or target: {} ||| {}'.format(source, target)) + lock.release() return if self.norm: source = self.tokenize(source) target = self.tokenize(target) - # Align instance (threadsafe) + # Align instance alignment = self.aligner.align(source, target) - # grammar method is threadsafe grammar_file = self.grammar(source, ctx_name) - lock = self.ctx_locks[ctx_name] - lock.acquire() # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) |