summaryrefslogtreecommitdiff
path: root/realtime/rt/rt.py
diff options
context:
space:
mode:
authorMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-27 13:39:24 -0700
committerMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-27 13:39:24 -0700
commitb8116c5c3c7e31a276ff38fc8173eab37f292364 (patch)
tree249a8a368680c177251f6b41689e5b5739ee5da8 /realtime/rt/rt.py
parentcb718c763e07b8e1417383ef7ae5c1aca36d2a0a (diff)
Decoding and learning with multiple contexts is threadsafe and FIFO.
Diffstat (limited to 'realtime/rt/rt.py')
-rw-r--r--realtime/rt/rt.py65
1 files changed, 37 insertions, 28 deletions
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py
index 1e78e188..5ace5d59 100644
--- a/realtime/rt/rt.py
+++ b/realtime/rt/rt.py
@@ -129,18 +129,23 @@ class RealtimeTranslator:
for ctx_name in list(self.ctx_names):
self.drop_ctx(ctx_name, force)
logging.info('Closing processes')
- self.aligner.close()
+ self.aligner.close(force)
if self.norm:
+ if not force:
+ self.tokenizer_lock.acquire()
+ self.detokenizer_lock.acquire()
self.tokenizer.stdin.close()
self.detokenizer.stdin.close()
+ if not force:
+ self.tokenizer_lock.release()
+ self.detokenizer_lock.release()
logging.info('Deleting {}'.format(self.tmp))
shutil.rmtree(self.tmp)
def lazy_ctx(self, ctx_name):
- '''Initialize a context (inc starting a new decoder) if needed'''
- self.ctx_locks[ctx_name].acquire()
+ '''Initialize a context (inc starting a new decoder) if needed.
+ NOT threadsafe, acquire ctx_name lock before calling.'''
if ctx_name in self.ctx_names:
- self.ctx_locks[ctx_name].release()
return
logging.info('New context: {}'.format(ctx_name))
self.ctx_names.add(ctx_name)
@@ -149,12 +154,12 @@ class RealtimeTranslator:
self.grammar_dict[ctx_name] = {}
tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name))
self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir)
- self.ctx_locks[ctx_name].release()
- def drop_ctx(self, ctx_name, force=False):
- '''Delete a context (inc stopping the decoder)'''
+ def drop_ctx(self, ctx_name=None, force=False):
+ '''Delete a context (inc stopping the decoder)
+ Threadsafe and FIFO unless forced.'''
+ lock = self.ctx_locks[ctx_name]
if not force:
- lock = self.ctx_locks[ctx_name]
lock.acquire()
logging.info('Dropping context: {}'.format(ctx_name))
self.ctx_names.remove(ctx_name)
@@ -168,25 +173,24 @@ class RealtimeTranslator:
lock.release()
def grammar(self, sentence, ctx_name=None):
- '''Extract a sentence-level grammar on demand (or return cached)'''
+ '''Extract a sentence-level grammar on demand (or return cached)
+ Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock
+ before calling.'''
+ self.extractor_lock.acquire()
self.lazy_ctx(ctx_name)
- lock = self.ctx_locks[ctx_name]
- lock.acquire()
grammar_dict = self.grammar_dict[ctx_name]
grammar_file = grammar_dict.get(sentence, None)
# Cache hit
if grammar_file:
logging.info('Grammar cache hit: {}'.format(grammar_file))
- lock.release()
+ self.extractor_lock.release()
return grammar_file
# Extract and cache
(fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.')
os.close(fid)
with open(grammar_file, 'w') as output:
- self.extractor_lock.acquire()
for rule in self.extractor.grammar(sentence, ctx_name):
output.write('{}\n'.format(str(rule)))
- self.extractor_lock.release()
grammar_files = self.grammar_files[ctx_name]
if len(grammar_files) == self.cache_size:
rm_sent = grammar_files.popleft()
@@ -196,23 +200,25 @@ class RealtimeTranslator:
os.remove(rm_grammar)
grammar_files.append(sentence)
grammar_dict[sentence] = grammar_file
- lock.release()
+ self.extractor_lock.release()
return grammar_file
def decode(self, sentence, ctx_name=None):
- '''Decode a sentence (inc extracting a grammar if needed)'''
+ '''Decode a sentence (inc extracting a grammar if needed)
+ Threadsafe, FIFO'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
self.lazy_ctx(ctx_name)
+ logging.info('DECODE: {}'.format(sentence))
# Empty in, empty out
if sentence.strip() == '':
+ lock.release()
return ''
if self.norm:
sentence = self.tokenize(sentence)
logging.info('Normalized input: {}'.format(sentence))
- # grammar method is threadsafe
grammar_file = self.grammar(sentence, ctx_name)
decoder = self.decoders[ctx_name]
- lock = self.ctx_locks[ctx_name]
- lock.acquire()
start_time = time.time()
hyp = decoder.decoder.decode(sentence, grammar_file)
stop_time = time.time()
@@ -220,10 +226,10 @@ class RealtimeTranslator:
# Empty reference: HPYPLM does not learn prior to next translation
decoder.ref_fifo.write('\n')
decoder.ref_fifo.flush()
- lock.release()
if self.norm:
logging.info('Normalized translation: {}'.format(hyp))
hyp = self.detokenize(hyp)
+ lock.release()
return hyp
def tokenize(self, line):
@@ -242,29 +248,32 @@ class RealtimeTranslator:
# TODO
def command_line(self, line, ctx_name=None):
- args = [f.strip() for f in line.split('|||')]
- try:
+ args = [f.strip() for f in line.split('|||')]
+ #try:
if len(args) == 2 and not args[1]:
self.commands[args[0]](ctx_name)
else:
self.commands[args[0]](*args[1:], ctx_name=ctx_name)
- except:
- logging.info('Command error: {}'.format(' ||| '.join(args)))
+ #except:
+ # logging.info('Command error: {}'.format(' ||| '.join(args)))
def learn(self, source, target, ctx_name=None):
+ '''Learn from training instance (inc extracting grammar if needed)
+ Threadsafe, FIFO'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
self.lazy_ctx(ctx_name)
+ logging.info('LEARN: {}'.format(source))
if '' in (source.strip(), target.strip()):
logging.info('Error empty source or target: {} ||| {}'.format(source, target))
+ lock.release()
return
if self.norm:
source = self.tokenize(source)
target = self.tokenize(target)
- # Align instance (threadsafe)
+ # Align instance
alignment = self.aligner.align(source, target)
- # grammar method is threadsafe
grammar_file = self.grammar(source, ctx_name)
- lock = self.ctx_locks[ctx_name]
- lock.acquire()
# MIRA update before adding data to grammar extractor
decoder = self.decoders[ctx_name]
mira_log = decoder.decoder.update(source, grammar_file, target)