From 7721f1b9c607f7330f477f5cd3bf4d5bf1e25306 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 9 Jan 2024 15:04:46 +0000 Subject: [PATCH] work_queue: detect exception raise in work_fn --- test_runner/fixtures/work_queue.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test_runner/fixtures/work_queue.py b/test_runner/fixtures/work_queue.py index 4e844436d8..05b8b06768 100644 --- a/test_runner/fixtures/work_queue.py +++ b/test_runner/fixtures/work_queue.py @@ -14,13 +14,19 @@ def do(nthreads: int, inputs: List[T], work_fn: Callable[[T], U]) -> List[U]: def __init__(self, item: V): self._item = item + work_fn_raised = threading.Event() + # duplicate the tenant in remote storage def worker(input_queue: queue.Queue[Item[T]], output_queue: queue.Queue[U]): while True: item = input_queue.get() if item is None: return - output = work_fn(item._item) + try: + output = work_fn(item._item) + except Exception: + work_fn_raised.set() + raise output_queue.put(output) input_queue: queue.Queue[Optional["Item[T]"]] = queue.Queue() @@ -36,6 +42,9 @@ def do(nthreads: int, inputs: List[T], work_fn: Callable[[T], U]) -> List[U]: for w in workers: w.join() + if work_fn_raised.is_set(): + raise Exception("one of the work_fn's raised an exception, don't do that") + outputs = [] while True: try: