mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 10:00:38 +00:00
work_queue: detect exception raise in work_fn
This commit is contained in:
@@ -14,13 +14,19 @@ def do(nthreads: int, inputs: List[T], work_fn: Callable[[T], U]) -> List[U]:
|
||||
def __init__(self, item: V):
|
||||
self._item = item
|
||||
|
||||
work_fn_raised = threading.Event()
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
def worker(input_queue: queue.Queue[Item[T]], output_queue: queue.Queue[U]):
|
||||
while True:
|
||||
item = input_queue.get()
|
||||
if item is None:
|
||||
return
|
||||
output = work_fn(item._item)
|
||||
try:
|
||||
output = work_fn(item._item)
|
||||
except Exception:
|
||||
work_fn_raised.set()
|
||||
raise
|
||||
output_queue.put(output)
|
||||
|
||||
input_queue: queue.Queue[Optional["Item[T]"]] = queue.Queue()
|
||||
@@ -36,6 +42,9 @@ def do(nthreads: int, inputs: List[T], work_fn: Callable[[T], U]) -> List[U]:
|
||||
for w in workers:
|
||||
w.join()
|
||||
|
||||
if work_fn_raised.is_set():
|
||||
raise Exception("one of the work_fn's raised an exception, don't do that")
|
||||
|
||||
outputs = []
|
||||
while True:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user