summaryrefslogtreecommitdiff
path: root/realtime/rt/rt.py
diff options
context:
space:
mode:
authorMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-26 14:28:42 -0700
committerMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-26 14:28:42 -0700
commitcb718c763e07b8e1417383ef7ae5c1aca36d2a0a (patch)
tree2c4495650f59ab6b085ff95cef25bb05776232a8 /realtime/rt/rt.py
parent49ddc45e717ac495c6adf80a20d02a1672a069df (diff)
FIFO Locks
Diffstat (limited to 'realtime/rt/rt.py')
-rw-r--r--realtime/rt/rt.py66
1 files changed, 33 insertions, 33 deletions
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py
index f8126283..1e78e188 100644
--- a/realtime/rt/rt.py
+++ b/realtime/rt/rt.py
@@ -74,9 +74,9 @@ class RealtimeTranslator:
self.norm = norm
if self.norm:
self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u'])
- self.tokenizer_sem = threading.Semaphore()
+ self.tokenizer_lock = util.FIFOLock()
self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')])
- self.detokenizer_sem = threading.Semaphore()
+ self.detokenizer_lock = util.FIFOLock()
# Word aligner
fwd_params = os.path.join(configdir, 'a.fwd_params')
@@ -97,12 +97,12 @@ class RealtimeTranslator:
self.ctx_names = set()
# All context-dependent operations are atomic
- self.ctx_sems = collections.defaultdict(threading.Semaphore)
+ self.ctx_locks = collections.defaultdict(util.FIFOLock)
# ctx -> list of (source, target, alignment)
self.ctx_data = {}
# Grammar extractor is not threadsafe
- self.extractor_sem = threading.Semaphore()
+ self.extractor_lock = util.FIFOLock()
# ctx -> deque of file
self.grammar_files = {}
# ctx -> dict of {sentence: file}
@@ -138,9 +138,9 @@ class RealtimeTranslator:
def lazy_ctx(self, ctx_name):
'''Initialize a context (inc starting a new decoder) if needed'''
- self.ctx_sems[ctx_name].acquire()
+ self.ctx_locks[ctx_name].acquire()
if ctx_name in self.ctx_names:
- self.ctx_sems[ctx_name].release()
+ self.ctx_locks[ctx_name].release()
return
logging.info('New context: {}'.format(ctx_name))
self.ctx_names.add(ctx_name)
@@ -149,13 +149,13 @@ class RealtimeTranslator:
self.grammar_dict[ctx_name] = {}
tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name))
self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir)
- self.ctx_sems[ctx_name].release()
+ self.ctx_locks[ctx_name].release()
def drop_ctx(self, ctx_name, force=False):
'''Delete a context (inc stopping the decoder)'''
if not force:
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
logging.info('Dropping context: {}'.format(ctx_name))
self.ctx_names.remove(ctx_name)
self.ctx_data.pop(ctx_name)
@@ -163,30 +163,30 @@ class RealtimeTranslator:
self.grammar_files.pop(ctx_name)
self.grammar_dict.pop(ctx_name)
self.decoders.pop(ctx_name).close(force)
- self.ctx_sems.pop(ctx_name)
+ self.ctx_locks.pop(ctx_name)
if not force:
- sem.release()
+ lock.release()
def grammar(self, sentence, ctx_name=None):
'''Extract a sentence-level grammar on demand (or return cached)'''
self.lazy_ctx(ctx_name)
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
grammar_dict = self.grammar_dict[ctx_name]
grammar_file = grammar_dict.get(sentence, None)
# Cache hit
if grammar_file:
logging.info('Grammar cache hit: {}'.format(grammar_file))
- sem.release()
+ lock.release()
return grammar_file
# Extract and cache
(fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.')
os.close(fid)
with open(grammar_file, 'w') as output:
- self.extractor_sem.acquire()
+ self.extractor_lock.acquire()
for rule in self.extractor.grammar(sentence, ctx_name):
output.write('{}\n'.format(str(rule)))
- self.extractor_sem.release()
+ self.extractor_lock.release()
grammar_files = self.grammar_files[ctx_name]
if len(grammar_files) == self.cache_size:
rm_sent = grammar_files.popleft()
@@ -196,7 +196,7 @@ class RealtimeTranslator:
os.remove(rm_grammar)
grammar_files.append(sentence)
grammar_dict[sentence] = grammar_file
- sem.release()
+ lock.release()
return grammar_file
def decode(self, sentence, ctx_name=None):
@@ -211,8 +211,8 @@ class RealtimeTranslator:
# grammar method is threadsafe
grammar_file = self.grammar(sentence, ctx_name)
decoder = self.decoders[ctx_name]
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
start_time = time.time()
hyp = decoder.decoder.decode(sentence, grammar_file)
stop_time = time.time()
@@ -220,24 +220,24 @@ class RealtimeTranslator:
# Empty reference: HPYPLM does not learn prior to next translation
decoder.ref_fifo.write('\n')
decoder.ref_fifo.flush()
- sem.release()
+ lock.release()
if self.norm:
logging.info('Normalized translation: {}'.format(hyp))
hyp = self.detokenize(hyp)
return hyp
def tokenize(self, line):
- self.tokenizer_sem.acquire()
+ self.tokenizer_lock.acquire()
self.tokenizer.stdin.write('{}\n'.format(line))
tok_line = self.tokenizer.stdout.readline().strip()
- self.tokenizer_sem.release()
+ self.tokenizer_lock.release()
return tok_line
def detokenize(self, line):
- self.detokenizer_sem.acquire()
+ self.detokenizer_lock.acquire()
self.detokenizer.stdin.write('{}\n'.format(line))
detok_line = self.detokenizer.stdout.readline().strip()
- self.detokenizer_sem.release()
+ self.detokenizer_lock.release()
return detok_line
# TODO
@@ -263,8 +263,8 @@ class RealtimeTranslator:
alignment = self.aligner.align(source, target)
# grammar method is threadsafe
grammar_file = self.grammar(source, ctx_name)
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
# MIRA update before adding data to grammar extractor
decoder = self.decoders[ctx_name]
mira_log = decoder.decoder.update(source, grammar_file, target)
@@ -281,27 +281,27 @@ class RealtimeTranslator:
# Clear (old) cached grammar
rm_grammar = self.grammar_dict[ctx_name].pop(source)
os.remove(rm_grammar)
- sem.release()
+ lock.release()
def save_state(self, filename=None, ctx_name=None):
self.lazy_ctx(ctx_name)
out = open(filename, 'w') if filename else sys.stdout
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
ctx_data = self.ctx_data[ctx_name]
logging.info('Saving state with {} sentences'.format(len(self.ctx_data)))
out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights()))
for (source, target, alignment) in ctx_data:
out.write('{} ||| {} ||| {}\n'.format(source, target, alignment))
- sem.release()
+ lock.release()
out.write('EOF\n')
if filename:
out.close()
def load_state(self, input=sys.stdin, ctx_name=None):
self.lazy_ctx(ctx_name)
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
ctx_data = self.ctx_data[ctx_name]
decoder = self.decoders[ctx_name]
# Non-initial load error
@@ -329,4 +329,4 @@ class RealtimeTranslator:
self.ref_fifo.flush()
stop_time = time.time()
logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time))
- sem.release()
+ lock.release()