diff options
author | Michael Denkowski <mdenkows@cs.cmu.edu> | 2013-09-26 14:28:42 -0700 |
---|---|---|
committer | Michael Denkowski <mdenkows@cs.cmu.edu> | 2013-09-26 14:28:42 -0700 |
commit | ce358ecd6f5132f8bdbbda2272ff4f04ff883e30 (patch) | |
tree | b5f2e114df1c8f23e9727c113e3ccf6d2964da29 /realtime/rt/rt.py | |
parent | 0d0db26fff5cec36397d81a4f1d15e4efea29f73 (diff) |
FIFO Locks
Diffstat (limited to 'realtime/rt/rt.py')
-rw-r--r-- | realtime/rt/rt.py | 66 |
1 files changed, 33 insertions, 33 deletions
diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index f8126283..1e78e188 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -74,9 +74,9 @@ class RealtimeTranslator: self.norm = norm if self.norm: self.tokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'tokenize-anything.sh'), '-u']) - self.tokenizer_sem = threading.Semaphore() + self.tokenizer_lock = util.FIFOLock() self.detokenizer = util.popen_io([os.path.join(cdec_root, 'corpus', 'untok.pl')]) - self.detokenizer_sem = threading.Semaphore() + self.detokenizer_lock = util.FIFOLock() # Word aligner fwd_params = os.path.join(configdir, 'a.fwd_params') @@ -97,12 +97,12 @@ class RealtimeTranslator: self.ctx_names = set() # All context-dependent operations are atomic - self.ctx_sems = collections.defaultdict(threading.Semaphore) + self.ctx_locks = collections.defaultdict(util.FIFOLock) # ctx -> list of (source, target, alignment) self.ctx_data = {} # Grammar extractor is not threadsafe - self.extractor_sem = threading.Semaphore() + self.extractor_lock = util.FIFOLock() # ctx -> deque of file self.grammar_files = {} # ctx -> dict of {sentence: file} @@ -138,9 +138,9 @@ class RealtimeTranslator: def lazy_ctx(self, ctx_name): '''Initialize a context (inc starting a new decoder) if needed''' - self.ctx_sems[ctx_name].acquire() + self.ctx_locks[ctx_name].acquire() if ctx_name in self.ctx_names: - self.ctx_sems[ctx_name].release() + self.ctx_locks[ctx_name].release() return logging.info('New context: {}'.format(ctx_name)) self.ctx_names.add(ctx_name) @@ -149,13 +149,13 @@ class RealtimeTranslator: self.grammar_dict[ctx_name] = {} tmpdir = os.path.join(self.tmp, 'decoder.{}'.format(ctx_name)) self.decoders[ctx_name] = RealtimeDecoder(self.config, tmpdir) - self.ctx_sems[ctx_name].release() + self.ctx_locks[ctx_name].release() def drop_ctx(self, ctx_name, force=False): '''Delete a context (inc stopping the decoder)''' if not force: - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() logging.info('Dropping context: {}'.format(ctx_name)) self.ctx_names.remove(ctx_name) self.ctx_data.pop(ctx_name) @@ -163,30 +163,30 @@ class RealtimeTranslator: self.grammar_files.pop(ctx_name) self.grammar_dict.pop(ctx_name) self.decoders.pop(ctx_name).close(force) - self.ctx_sems.pop(ctx_name) + self.ctx_locks.pop(ctx_name) if not force: - sem.release() + lock.release() def grammar(self, sentence, ctx_name=None): '''Extract a sentence-level grammar on demand (or return cached)''' self.lazy_ctx(ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() grammar_dict = self.grammar_dict[ctx_name] grammar_file = grammar_dict.get(sentence, None) # Cache hit if grammar_file: logging.info('Grammar cache hit: {}'.format(grammar_file)) - sem.release() + lock.release() return grammar_file # Extract and cache (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: - self.extractor_sem.acquire() + self.extractor_lock.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) - self.extractor_sem.release() + self.extractor_lock.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() @@ -196,7 +196,7 @@ class RealtimeTranslator: os.remove(rm_grammar) grammar_files.append(sentence) grammar_dict[sentence] = grammar_file - sem.release() + lock.release() return grammar_file def decode(self, sentence, ctx_name=None): @@ -211,8 +211,8 @@ class RealtimeTranslator: # grammar method is threadsafe grammar_file = self.grammar(sentence, ctx_name) decoder = self.decoders[ctx_name] - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() start_time = time.time() hyp = decoder.decoder.decode(sentence, grammar_file) stop_time = time.time() @@ -220,24 +220,24 @@ class RealtimeTranslator: # Empty reference: HPYPLM does not learn prior to next translation decoder.ref_fifo.write('\n') decoder.ref_fifo.flush() - sem.release() + lock.release() if self.norm: logging.info('Normalized translation: {}'.format(hyp)) hyp = self.detokenize(hyp) return hyp def tokenize(self, line): - self.tokenizer_sem.acquire() + self.tokenizer_lock.acquire() self.tokenizer.stdin.write('{}\n'.format(line)) tok_line = self.tokenizer.stdout.readline().strip() - self.tokenizer_sem.release() + self.tokenizer_lock.release() return tok_line def detokenize(self, line): - self.detokenizer_sem.acquire() + self.detokenizer_lock.acquire() self.detokenizer.stdin.write('{}\n'.format(line)) detok_line = self.detokenizer.stdout.readline().strip() - self.detokenizer_sem.release() + self.detokenizer_lock.release() return detok_line # TODO @@ -263,8 +263,8 @@ class RealtimeTranslator: alignment = self.aligner.align(source, target) # grammar method is threadsafe grammar_file = self.grammar(source, ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() # MIRA update before adding data to grammar extractor decoder = self.decoders[ctx_name] mira_log = decoder.decoder.update(source, grammar_file, target) @@ -281,27 +281,27 @@ class RealtimeTranslator: # Clear (old) cached grammar rm_grammar = self.grammar_dict[ctx_name].pop(source) os.remove(rm_grammar) - sem.release() + lock.release() def save_state(self, filename=None, ctx_name=None): self.lazy_ctx(ctx_name) out = open(filename, 'w') if filename else sys.stdout - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() ctx_data = self.ctx_data[ctx_name] logging.info('Saving state with {} sentences'.format(len(self.ctx_data))) out.write('{}\n'.format(self.decoders[ctx_name].decoder.get_weights())) for (source, target, alignment) in ctx_data: out.write('{} ||| {} ||| {}\n'.format(source, target, alignment)) - sem.release() + lock.release() out.write('EOF\n') if filename: out.close() def load_state(self, input=sys.stdin, ctx_name=None): self.lazy_ctx(ctx_name) - sem = self.ctx_sems[ctx_name] - sem.acquire() + lock = self.ctx_locks[ctx_name] + lock.acquire() ctx_data = self.ctx_data[ctx_name] decoder = self.decoders[ctx_name] # Non-initial load error @@ -329,4 +329,4 @@ class RealtimeTranslator: self.ref_fifo.flush() stop_time = time.time() logging.info('Loaded state with {} sentences in {} seconds'.format(len(ctx_data), stop_time - start_time)) - sem.release() + lock.release() |