summaryrefslogtreecommitdiff
path: root/realtime
diff options
context:
space:
mode:
authorMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-26 14:28:42 -0700
committerMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-26 14:28:42 -0700
commitce358ecd6f5132f8bdbbda2272ff4f04ff883e30 (patch)
treeb5f2e114df1c8f23e9727c113e3ccf6d2964da29 /realtime
parent0d0db26fff5cec36397d81a4f1d15e4efea29f73 (diff)
FIFO Locks
Diffstat (limited to 'realtime')
-rw-r--r--realtime/rt/aligner.py6
-rw-r--r--realtime/rt/decoder.py24
-rw-r--r--realtime/rt/rt.py66
-rw-r--r--realtime/rt/util.py19
4 files changed, 67 insertions, 48 deletions
diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py
index a14121db..62ce32b8 100644
--- a/realtime/rt/aligner.py
+++ b/realtime/rt/aligner.py
@@ -31,7 +31,7 @@ class ForceAligner:
self.tools = util.popen_io(tools_cmd)
# Used to guarantee thread safety
- self.semaphore = threading.Semaphore()
+ self.lock = util.FIFOLock()
def align(self, source, target):
'''Threadsafe'''
@@ -39,7 +39,7 @@ class ForceAligner:
def align_formatted(self, line):
'''Threadsafe'''
- self.semaphore.acquire()
+ self.lock.acquire()
self.fwd_align.stdin.write('{}\n'.format(line))
self.rev_align.stdin.write('{}\n'.format(line))
# f words ||| e words ||| links ||| score
@@ -48,7 +48,7 @@ class ForceAligner:
self.tools.stdin.write('{}\n'.format(fwd_line))
self.tools.stdin.write('{}\n'.format(rev_line))
al_line = self.tools.stdout.readline().strip()
- self.semaphore.release()
+ self.lock.release()
return al_line
def close(self):
diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py
index 72b5b959..7c36b441 100644
--- a/realtime/rt/decoder.py
+++ b/realtime/rt/decoder.py
@@ -9,18 +9,18 @@ class Decoder:
def close(self, force=False):
if not force:
- self.semaphore.acquire()
+ self.lock.acquire()
self.decoder.stdin.close()
if not force:
- self.semaphore.release()
+ self.lock.release()
def decode(self, sentence, grammar=None):
'''Threadsafe'''
input = '<seg grammar="{g}">{s}</seg>\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence)
- self.semaphore.acquire()
+ self.lock.acquire()
self.decoder.stdin.write(input)
hyp = self.decoder.stdout.readline().strip()
- self.semaphore.release()
+ self.lock.release()
return hyp
class CdecDecoder(Decoder):
@@ -31,7 +31,7 @@ class CdecDecoder(Decoder):
decoder_cmd = [decoder, '-c', config, '-w', weights]
logging.info('Executing: {}'.format(' '.join(decoder_cmd)))
self.decoder = util.popen_io(decoder_cmd)
- self.semaphore = threading.Semaphore()
+ self.lock = util.FIFOLock()
class MIRADecoder(Decoder):
@@ -42,27 +42,27 @@ class MIRADecoder(Decoder):
mira_cmd = [mira, '-c', config, '-w', weights, '-o', '2', '-C', '0.001', '-b', '500', '-k', '500', '-u', '-t']
logging.info('Executing: {}'.format(' '.join(mira_cmd)))
self.decoder = util.popen_io(mira_cmd)
- self.semaphore = threading.Semaphore()
+ self.lock = util.FIFOLock()
def get_weights(self):
'''Threadsafe'''
- self.semaphore.acquire()
+ self.lock.acquire()
self.decoder.stdin.write('WEIGHTS ||| WRITE\n')
weights = self.decoder.stdout.readline().strip()
- self.semaphore.release()
+ self.lock.release()
return weights
def set_weights(self, w_line):
'''Threadsafe'''
- self.semaphore.acquire()
+ self.lock.acquire()
self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line))
- self.semaphore.release()
+ self.lock.release()
def update(self, sentence, grammar, reference):
'''Threadsafe'''
input = 'LEARN ||| <seg grammar="{g}">{s}</seg> ||| {r}\n'.format(s=sentence, g=grammar, r=reference)
- self.semaphore.acquire()
+ self.lock.acquire()
self.decoder.stdin.write(input)
log = self.decoder.stdout.readline().strip()
- self.semaphore.release()
+ self.lock.release()
return log
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py
index f8126283..1e78e188 100644
--- a/realtime/rt/rt.py
+++ b/realtime/rt/rt.py
@@ -74,9 +74,9 @@ class RealtimeTranslator:
self.norm = norm
if self.norm:
self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u'])
- self.tokenizer_sem = threading.Semaphore()
+ self.tokenizer_lock = util.FIFOLock()
self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')])
- self.detokenizer_sem = threading.Semaphore()
+ self.detokenizer_lock = util.FIFOLock()
# Word aligner
fwd_params = os.path.join(configdir, 'a.fwd_params')
@@ -97,12 +97,12 @@ class RealtimeTranslator:
self.ctx_names = set()
# All context-dependent operations are atomic
- self.ctx_sems = collections.defaultdict(threading.Semaphore)
+ self.ctx_locks = collections.defaultdict(util.FIFOLock)
# ctx -> list of (source, target, alignment)
self.ctx_data = {}
# Grammar extractor is not threadsafe
- self.extractor_sem = threading.Semaphore()
+ self.extractor_lock = util.FIFOLock()
# ctx -> deque of file
self.grammar_files = {}
# ctx -> dict of {sentence: file}
@@ -138,9 +138,9 @@ class RealtimeTranslator:
def lazy_ctx(self, ctx_name):
'''Initialize a context (inc starting a new decoder) if needed'''
- self.ctx_sems[ctx_name].acquire()
+ self.ctx_locks[ctx_name].acquire()
if ctx_name in self.ctx_names:
- self.ctx_sems[ctx_name].release()
+ self.ctx_locks[ctx_name].release()
return
logging.info('New context: {}'.format(ctx_name))
self.ctx_names.add(ctx_name)
@@ -149,13 +149,13 @@ class RealtimeTranslator:
self.grammar_dict[ctx_name] = {}
tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name))
self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir)
- self.ctx_sems[ctx_name].release()
+ self.ctx_locks[ctx_name].release()
def drop_ctx(self, ctx_name, force=False):
'''Delete a context (inc stopping the decoder)'''
if not force:
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
logging.info('Dropping context: {}'.format(ctx_name))
self.ctx_names.remove(ctx_name)
self.ctx_data.pop(ctx_name)
@@ -163,30 +163,30 @@ class RealtimeTranslator:
self.grammar_files.pop(ctx_name)
self.grammar_dict.pop(ctx_name)
self.decoders.pop(ctx_name).close(force)
- self.ctx_sems.pop(ctx_name)
+ self.ctx_locks.pop(ctx_name)
if not force:
- sem.release()
+ lock.release()
def grammar(self, sentence, ctx_name=None):
'''Extract a sentence-level grammar on demand (or return cached)'''
self.lazy_ctx(ctx_name)
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
grammar_dict = self.grammar_dict[ctx_name]
grammar_file = grammar_dict.get(sentence, None)
# Cache hit
if grammar_file:
logging.info('Grammar cache hit: {}'.format(grammar_file))
- sem.release()
+ lock.release()
return grammar_file
# Extract and cache
(fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.')
os.close(fid)
with open(grammar_file, 'w') as output:
- self.extractor_sem.acquire()
+ self.extractor_lock.acquire()
for rule in self.extractor.grammar(sentence, ctx_name):
output.write('{}\n'.format(str(rule)))
- self.extractor_sem.release()
+ self.extractor_lock.release()
grammar_files = self.grammar_files[ctx_name]
if len(grammar_files) == self.cache_size:
rm_sent = grammar_files.popleft()
@@ -196,7 +196,7 @@ class RealtimeTranslator:
os.remove(rm_grammar)
grammar_files.append(sentence)
grammar_dict[sentence] = grammar_file
- sem.release()
+ lock.release()
return grammar_file
def decode(self, sentence, ctx_name=None):
@@ -211,8 +211,8 @@ class RealtimeTranslator:
# grammar method is threadsafe
grammar_file = self.grammar(sentence, ctx_name)
decoder = self.decoders[ctx_name]
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
start_time = time.time()
hyp = decoder.decoder.decode(sentence, grammar_file)
stop_time = time.time()
@@ -220,24 +220,24 @@ class RealtimeTranslator:
# Empty reference: HPYPLM does not learn prior to next translation
decoder.ref_fifo.write('\n')
decoder.ref_fifo.flush()
- sem.release()
+ lock.release()
if self.norm:
logging.info('Normalized translation: {}'.format(hyp))
hyp = self.detokenize(hyp)
return hyp
def tokenize(self, line):
- self.tokenizer_sem.acquire()
+ self.tokenizer_lock.acquire()
self.tokenizer.stdin.write('{}\n'.format(line))
tok_line = self.tokenizer.stdout.readline().strip()
- self.tokenizer_sem.release()
+ self.tokenizer_lock.release()
return tok_line
def detokenize(self, line):
- self.detokenizer_sem.acquire()
+ self.detokenizer_lock.acquire()
self.detokenizer.stdin.write('{}\n'.format(line))
detok_line = self.detokenizer.stdout.readline().strip()
- self.detokenizer_sem.release()
+ self.detokenizer_lock.release()
return detok_line
# TODO
@@ -263,8 +263,8 @@ class RealtimeTranslator:
alignment = self.aligner.align(source, target)
# grammar method is threadsafe
grammar_file = self.grammar(source, ctx_name)
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
# MIRA update before adding data to grammar extractor
decoder = self.decoders[ctx_name]
mira_log = decoder.decoder.update(source, grammar_file, target)
@@ -281,27 +281,27 @@ class RealtimeTranslator:
# Clear (old) cached grammar
rm_grammar = self.grammar_dict[ctx_name].pop(source)
os.remove(rm_grammar)
- sem.release()
+ lock.release()
def save_state(self, filename=None, ctx_name=None):
self.lazy_ctx(ctx_name)
out = open(filename, 'w') if filename else sys.stdout
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
ctx_data = self.ctx_data[ctx_name]
logging.info('Saving state with {} sentences'.format(len(self.ctx_data)))
out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights()))
for (source, target, alignment) in ctx_data:
out.write('{} ||| {} ||| {}\n'.format(source, target, alignment))
- sem.release()
+ lock.release()
out.write('EOF\n')
if filename:
out.close()
def load_state(self, input=sys.stdin, ctx_name=None):
self.lazy_ctx(ctx_name)
- sem = self.ctx_sems[ctx_name]
- sem.acquire()
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
ctx_data = self.ctx_data[ctx_name]
decoder = self.decoders[ctx_name]
# Non-initial load error
@@ -329,4 +329,4 @@ class RealtimeTranslator:
self.ref_fifo.flush()
stop_time = time.time()
logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time))
- sem.release()
+ lock.release()
diff --git a/realtime/rt/util.py b/realtime/rt/util.py
index 6e07f116..05dcae96 100644
--- a/realtime/rt/util.py
+++ b/realtime/rt/util.py
@@ -1,4 +1,5 @@
import os
+import Queue
import subprocess
import sys
import threading
@@ -13,6 +14,24 @@ SA_INI_FILES = set((
'precompute_file',
))
+class FIFOLock:
+
+ def __init__(self):
+ self.q = Queue.Queue()
+ self.i = 0
+
+ def acquire(self):
+ self.i += 1
+ if self.i > 1:
+ event = threading.Event()
+ self.q.put(event)
+ event.wait()
+
+ def release(self):
+ self.i -= 1
+ if self.i > 0:
+ self.q.get().set()
+
def cdec_ini_for_config(config):
# This is a list of (k, v), not a ConfigObj or dict
for i in range(len(config)):