From 0d0db26fff5cec36397d81a4f1d15e4efea29f73 Mon Sep 17 00:00:00 2001 From: Michael Denkowski Date: Wed, 25 Sep 2013 18:45:28 -0700 Subject: Threading tests --- realtime/realtime.py | 79 +++++++++++++++++++++++++++++++++++----------------- realtime/rt/rt.py | 4 +++ 2 files changed, 58 insertions(+), 25 deletions(-) diff --git a/realtime/realtime.py b/realtime/realtime.py index 282d3311..bbec288b 100755 --- a/realtime/realtime.py +++ b/realtime/realtime.py @@ -2,9 +2,9 @@ import argparse import logging -import signal import sys import threading +import time import rt @@ -15,6 +15,41 @@ class Parser(argparse.ArgumentParser): sys.stderr.write('\n{}\n'.format(message)) sys.exit(2) +def handle_line(translator, line, output, ctx_name): + if '|||' in line: + translator.command_line(line, ctx_name) + else: + hyp = translator.decode(line, ctx_name) + output.write('{}\n'.format(hyp)) + output.flush() + +def test1(translator, input, output, ctx_name): + inp = open(input) + out = open(output, 'w') + for line in inp: + handle_line(translator, line.strip(), out, ctx_name) + out.close() + +def debug(translator, input): + # Test 1: identical output + threads = [] + for i in range(4): + t = threading.Thread(target=test1, args=(translator, input, '{}.out.{}'.format(input, i), str(i))) + threads.append(t) + t.start() + time.sleep(30) + for t in threads: + t.join() + # Test 2: flood (same number of lines) + threads = [] + out = open('{}.out.flood'.format(input), 'w') + for line in open(input): + t = threading.Thread(target=handle_line, args=(translator, line.strip(), out, None)) + threads.append(t) + t.start() + for t in threads: + t.join() + def main(): parser = Parser(description='Real-time adaptive translation with cdec. (See README.md)') @@ -24,7 +59,7 @@ def main(): parser.add_argument('-T', '--temp', help='Temp directory (default /tmp)', default='/tmp') parser.add_argument('-a', '--cache', help='Grammar cache size (default 5)', default='5') parser.add_argument('-v', '--verbose', help='Info to stderr', action='store_true') - parser.add_argument('-D', '--debug-test', help='Test thread safety (debug use only)', action='store_true') + parser.add_argument('-D', '--debug-test', help='Run debug tests on input file') args = parser.parse_args() if args.verbose: @@ -32,29 +67,23 @@ def main(): with rt.RealtimeTranslator(args.config, tmpdir=args.temp, cache_size=int(args.cache), norm=args.normalize) as translator: - # Load state if given - if args.state: - with open(args.state) as input: - rtd.load_state(input) - if not args.debug_test: - run(translator) + # Debugging + if args.debug_test: + debug(translator, args.debug_test) + return + + # Read lines and commands + while True: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if '|||' in line: + translator.command_line(line) else: - # TODO: write test - run(translator) - -def run(translator, input=sys.stdin, output=sys.stdout, ctx_name=None): - # Read lines and commands - while True: - line = input.readline() - if not line: - break - line = line.strip() - if '|||' in line: - translator.command_line(line, ctx_name) - else: - hyp = translator.decode(line, ctx_name) - output.write('{}\n'.format(hyp)) - output.flush() - + hyp = translator.decode(line) + sys.stdout.write('{}\n'.format(hyp)) + sys.stdout.flush() + if __name__ == '__main__': main() diff --git a/realtime/rt/rt.py b/realtime/rt/rt.py index 6f1fb70f..f8126283 100644 --- a/realtime/rt/rt.py +++ b/realtime/rt/rt.py @@ -101,6 +101,8 @@ class RealtimeTranslator: # ctx -> list of (source, target, alignment) self.ctx_data = {} + # Grammar extractor is not threadsafe + self.extractor_sem = threading.Semaphore() # ctx -> deque of file self.grammar_files = {} # ctx -> dict of {sentence: file} @@ -181,8 +183,10 @@ class RealtimeTranslator: (fid, grammar_file) = tempfile.mkstemp(dir=self.decoders[ctx_name].tmp, prefix='grammar.') os.close(fid) with open(grammar_file, 'w') as output: + self.extractor_sem.acquire() for rule in self.extractor.grammar(sentence, ctx_name): output.write('{}\n'.format(str(rule))) + self.extractor_sem.release() grammar_files = self.grammar_files[ctx_name] if len(grammar_files) == self.cache_size: rm_sent = grammar_files.popleft() -- cgit v1.2.3