summaryrefslogtreecommitdiff
path: root/realtime
diff options
context:
space:
mode:
authorMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-27 13:39:24 -0700
committerMichael Denkowski <mdenkows@cs.cmu.edu>2013-09-27 13:39:24 -0700
commitb8116c5c3c7e31a276ff38fc8173eab37f292364 (patch)
tree249a8a368680c177251f6b41689e5b5739ee5da8 /realtime
parentcb718c763e07b8e1417383ef7ae5c1aca36d2a0a (diff)
Decoding and learning with multiple contexts is threadsafe and FIFO.
Diffstat (limited to 'realtime')
-rwxr-xr-xrealtime/realtime.py17
-rw-r--r--realtime/rt/aligner.py10
-rw-r--r--realtime/rt/decoder.py8
-rw-r--r--realtime/rt/rt.py65
-rw-r--r--realtime/rt/util.py8
5 files changed, 67 insertions, 41 deletions
diff --git a/realtime/realtime.py b/realtime/realtime.py
index bbec288b..38da4413 100755
--- a/realtime/realtime.py
+++ b/realtime/realtime.py
@@ -31,22 +31,27 @@ def test1(translator, input, output, ctx_name):
out.close()
def debug(translator, input):
- # Test 1: identical output
+ # Test 1: multiple contexts
threads = []
for i in range(4):
t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i)))
threads.append(t)
t.start()
time.sleep(30)
- for t in threads:
- t.join()
- # Test 2: flood (same number of lines)
- threads = []
+ # Test 2: flood
out = open('{}.out.flood'.format(input), 'w')
- for line in open(input):
+ inp = open(input)
+ while True:
+ line = inp.readline()
+ if not line:
+ break
+ line = line.strip()
t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None))
threads.append(t)
t.start()
+ time.sleep(1)
+ translator.drop_ctx(None)
+ # Join test threads
for t in threads:
t.join()
diff --git a/realtime/rt/aligner.py b/realtime/rt/aligner.py
index 62ce32b8..def3fcb5 100644
--- a/realtime/rt/aligner.py
+++ b/realtime/rt/aligner.py
@@ -34,11 +34,11 @@ class ForceAligner:
self.lock = util.FIFOLock()
def align(self, source, target):
- '''Threadsafe'''
+ '''Threadsafe, FIFO'''
return self.align_formatted('{} ||| {}'.format(source, target))
def align_formatted(self, line):
- '''Threadsafe'''
+ '''Threadsafe, FIFO'''
self.lock.acquire()
self.fwd_align.stdin.write('{}\n'.format(line))
self.rev_align.stdin.write('{}\n'.format(line))
@@ -51,10 +51,14 @@ class ForceAligner:
self.lock.release()
return al_line
- def close(self):
+ def close(self, force=False):
+ if not force:
+ self.lock.acquire()
self.fwd_align.stdin.close()
self.rev_align.stdin.close()
self.tools.stdin.close()
+ if not force:
+ self.lock.release()
def read_err(self, err):
(T, m) = ('', '')
diff --git a/realtime/rt/decoder.py b/realtime/rt/decoder.py
index 7c36b441..da646f68 100644
--- a/realtime/rt/decoder.py
+++ b/realtime/rt/decoder.py
@@ -15,7 +15,7 @@ class Decoder:
self.lock.release()
def decode(self, sentence, grammar=None):
- '''Threadsafe'''
+ '''Threadsafe, FIFO'''
input = '<seg grammar="{g}">{s}</seg>\n'.format(s=sentence, g=grammar) if grammar else '{}\n'.format(sentence)
self.lock.acquire()
self.decoder.stdin.write(input)
@@ -45,7 +45,7 @@ class MIRADecoder(Decoder):
self.lock = util.FIFOLock()
def get_weights(self):
- '''Threadsafe'''
+ '''Threadsafe, FIFO'''
self.lock.acquire()
self.decoder.stdin.write('WEIGHTS ||| WRITE\n')
weights = self.decoder.stdout.readline().strip()
@@ -53,13 +53,13 @@ class MIRADecoder(Decoder):
return weights
def set_weights(self, w_line):
- '''Threadsafe'''
+ '''Threadsafe, FIFO'''
self.lock.acquire()
self.decoder.stdin.write('WEIGHTS ||| {}\n'.format(w_line))
self.lock.release()
def update(self, sentence, grammar, reference):
- '''Threadsafe'''
+ '''Threadsafe, FIFO'''
input = 'LEARN ||| <seg grammar="{g}">{s}</seg> ||| {r}\n'.format(s=sentence, g=grammar, r=reference)
self.lock.acquire()
self.decoder.stdin.write(input)
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py
index 1e78e188..5ace5d59 100644
--- a/realtime/rt/rt.py
+++ b/realtime/rt/rt.py
@@ -129,18 +129,23 @@ class RealtimeTranslator:
for ctx_name in list(self.ctx_names):
self.drop_ctx(ctx_name, force)
logging.info('Closing processes')
- self.aligner.close()
+ self.aligner.close(force)
if self.norm:
+ if not force:
+ self.tokenizer_lock.acquire()
+ self.detokenizer_lock.acquire()
self.tokenizer.stdin.close()
self.detokenizer.stdin.close()
+ if not force:
+ self.tokenizer_lock.release()
+ self.detokenizer_lock.release()
logging.info('Deleting {}'.format(self.tmp))
shutil.rmtree(self.tmp)
def lazy_ctx(self, ctx_name):
- '''Initialize a context (inc starting a new decoder) if needed'''
- self.ctx_locks[ctx_name].acquire()
+ '''Initialize a context (inc starting a new decoder) if needed.
+ NOT threadsafe, acquire ctx_name lock before calling.'''
if ctx_name in self.ctx_names:
- self.ctx_locks[ctx_name].release()
return
logging.info('New context: {}'.format(ctx_name))
self.ctx_names.add(ctx_name)
@@ -149,12 +154,12 @@ class RealtimeTranslator:
self.grammar_dict[ctx_name] = {}
tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name))
self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir)
- self.ctx_locks[ctx_name].release()
- def drop_ctx(self, ctx_name, force=False):
- '''Delete a context (inc stopping the decoder)'''
+ def drop_ctx(self, ctx_name=None, force=False):
+ '''Delete a context (inc stopping the decoder)
+ Threadsafe and FIFO unless forced.'''
+ lock = self.ctx_locks[ctx_name]
if not force:
- lock = self.ctx_locks[ctx_name]
lock.acquire()
logging.info('Dropping context: {}'.format(ctx_name))
self.ctx_names.remove(ctx_name)
@@ -168,25 +173,24 @@ class RealtimeTranslator:
lock.release()
def grammar(self, sentence, ctx_name=None):
- '''Extract a sentence-level grammar on demand (or return cached)'''
+ '''Extract a sentence-level grammar on demand (or return cached)
+ Threadsafe wrt extractor but NOT decoder. Acquire ctx_name lock
+ before calling.'''
+ self.extractor_lock.acquire()
self.lazy_ctx(ctx_name)
- lock = self.ctx_locks[ctx_name]
- lock.acquire()
grammar_dict = self.grammar_dict[ctx_name]
grammar_file = grammar_dict.get(sentence, None)
# Cache hit
if grammar_file:
logging.info('Grammar cache hit: {}'.format(grammar_file))
- lock.release()
+ self.extractor_lock.release()
return grammar_file
# Extract and cache
(fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.')
os.close(fid)
with open(grammar_file, 'w') as output:
- self.extractor_lock.acquire()
for rule in self.extractor.grammar(sentence, ctx_name):
output.write('{}\n'.format(str(rule)))
- self.extractor_lock.release()
grammar_files = self.grammar_files[ctx_name]
if len(grammar_files) == self.cache_size:
rm_sent = grammar_files.popleft()
@@ -196,23 +200,25 @@ class RealtimeTranslator:
os.remove(rm_grammar)
grammar_files.append(sentence)
grammar_dict[sentence] = grammar_file
- lock.release()
+ self.extractor_lock.release()
return grammar_file
def decode(self, sentence, ctx_name=None):
- '''Decode a sentence (inc extracting a grammar if needed)'''
+ '''Decode a sentence (inc extracting a grammar if needed)
+ Threadsafe, FIFO'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
self.lazy_ctx(ctx_name)
+ logging.info('DECODE: {}'.format(sentence))
# Empty in, empty out
if sentence.strip() == '':
+ lock.release()
return ''
if self.norm:
sentence = self.tokenize(sentence)
logging.info('Normalized input: {}'.format(sentence))
- # grammar method is threadsafe
grammar_file = self.grammar(sentence, ctx_name)
decoder = self.decoders[ctx_name]
- lock = self.ctx_locks[ctx_name]
- lock.acquire()
start_time = time.time()
hyp = decoder.decoder.decode(sentence, grammar_file)
stop_time = time.time()
@@ -220,10 +226,10 @@ class RealtimeTranslator:
# Empty reference: HPYPLM does not learn prior to next translation
decoder.ref_fifo.write('\n')
decoder.ref_fifo.flush()
- lock.release()
if self.norm:
logging.info('Normalized translation: {}'.format(hyp))
hyp = self.detokenize(hyp)
+ lock.release()
return hyp
def tokenize(self, line):
@@ -242,29 +248,32 @@ class RealtimeTranslator:
# TODO
def command_line(self, line, ctx_name=None):
- args = [f.strip() for f in line.split('|||')]
- try:
+ args = [f.strip() for f in line.split('|||')]
+ #try:
if len(args) == 2 and not args[1]:
self.commands[args[0]](ctx_name)
else:
self.commands[args[0]](*args[1:], ctx_name=ctx_name)
- except:
- logging.info('Command error: {}'.format(' ||| '.join(args)))
+ #except:
+ # logging.info('Command error: {}'.format(' ||| '.join(args)))
def learn(self, source, target, ctx_name=None):
+ '''Learn from training instance (inc extracting grammar if needed)
+ Threadsafe, FIFO'''
+ lock = self.ctx_locks[ctx_name]
+ lock.acquire()
self.lazy_ctx(ctx_name)
+ logging.info('LEARN: {}'.format(source))
if '' in (source.strip(), target.strip()):
logging.info('Error empty source or target: {} ||| {}'.format(source, target))
+ lock.release()
return
if self.norm:
source = self.tokenize(source)
target = self.tokenize(target)
- # Align instance (threadsafe)
+ # Align instance
alignment = self.aligner.align(source, target)
- # grammar method is threadsafe
grammar_file = self.grammar(source, ctx_name)
- lock = self.ctx_locks[ctx_name]
- lock.acquire()
# MIRA update before adding data to grammar extractor
decoder = self.decoders[ctx_name]
mira_log = decoder.decoder.update(source, grammar_file, target)
diff --git a/realtime/rt/util.py b/realtime/rt/util.py
index 05dcae96..52767dac 100644
--- a/realtime/rt/util.py
+++ b/realtime/rt/util.py
@@ -15,22 +15,30 @@ SA_INI_FILES = set((
))
class FIFOLock:
+ '''Lock that preserves FIFO order of blocking threads'''
def __init__(self):
self.q = Queue.Queue()
self.i = 0
+ self.lock = threading.Lock()
def acquire(self):
+ self.lock.acquire()
self.i += 1
if self.i > 1:
event = threading.Event()
self.q.put(event)
+ self.lock.release()
event.wait()
+ return
+ self.lock.release()
def release(self):
+ self.lock.acquire()
self.i -= 1
if self.i > 0:
self.q.get().set()
+ self.lock.release()
def cdec_ini_for_config(config):
# This is a list of (k, v), not a ConfigObj or dict