mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-17 19:20:41 +00:00
## Summary `AsyncTable.search()` computes the query embedding with `loop.run_in_executor(None, ...)`, which uses asyncio's **default** `ThreadPoolExecutor`. That pool is shared with all other `run_in_executor(None, ...)` work, so a slow embedding call — a heavy local model or an HTTP request to an embeddings API — ties up those threads and starves unrelated async I/O under concurrent load. This moves the (potentially blocking) embedding call onto a **dedicated executor**, isolating it from the default pool. Closes #3310. ## Problem `python/lancedb/table.py`, `AsyncTable.search()`: ```python return ( await loop.run_in_executor( None, # asyncio's default executor, shared with other blocking I/O embedding.function.compute_query_embeddings_with_retry, query, ) )[0] ``` Under load, concurrent searches whose embeddings block (or any other code using the default executor) contend for the same small thread pool. ## Change - Add a dedicated `ThreadPoolExecutor(thread_name_prefix="lancedb-embedding")` in `background_loop.py`, exposed via `embedding_executor()`. - Use it in `AsyncTable.search()`'s `make_embedding` instead of the default executor. - Reset the executor in the existing `_reset_after_fork` hook — its worker threads don't survive `fork()`, same as the background event loop. It's recreated lazily, so this is cheap. ## Design notes The issue asked whether maintainers preferred a configurable executor, a dedicated internal one, or another approach (no response in the thread). I went with a **dedicated internal executor**: it fixes the starvation with no public API change and stays consistent with the existing `LOOP` singleton. Making the pool size configurable would be an easy follow-up if preferred. Scope is limited to `search()`. The broader "embedding functions need real async support" (including `add()`) is tracked separately in #3268. ## Testing - Added `test_async_search_runs_embedding_on_dedicated_executor`: patches the embedding function to record the executing thread during an async search and asserts it runs on a `lancedb-embedding` thread. Verified it **fails** against the previous `run_in_executor(None, ...)` and passes with the fix. - `ruff format`, `ruff check`, and `pyright` pass on the changed files.
91 lines
3.0 KiB
Python
91 lines
3.0 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
import asyncio
|
|
import concurrent.futures
|
|
import os
|
|
import threading
|
|
import warnings
|
|
|
|
|
|
class BackgroundEventLoop:
|
|
"""
|
|
A background event loop that can run futures.
|
|
|
|
Used to bridge sync and async code, without messing with users event loops.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._start()
|
|
|
|
def _start(self):
|
|
self.loop = asyncio.new_event_loop()
|
|
self.thread = threading.Thread(
|
|
target=self.loop.run_forever,
|
|
name="LanceDBBackgroundEventLoop",
|
|
daemon=True,
|
|
)
|
|
self.thread.start()
|
|
|
|
def run(self, future):
|
|
concurrent_future = asyncio.run_coroutine_threadsafe(future, self.loop)
|
|
try:
|
|
return concurrent_future.result()
|
|
except BaseException:
|
|
concurrent_future.cancel()
|
|
raise
|
|
|
|
|
|
LOOP = BackgroundEventLoop()
|
|
|
|
|
|
def _new_embedding_executor() -> concurrent.futures.ThreadPoolExecutor:
|
|
return concurrent.futures.ThreadPoolExecutor(thread_name_prefix="lancedb-embedding")
|
|
|
|
|
|
# Embedding functions can block for a long time -- a heavy local model or an
|
|
# HTTP request to a remote embeddings API. Running them on asyncio's default
|
|
# executor lets them starve the unrelated blocking I/O that shares that pool,
|
|
# so they get a dedicated one. See
|
|
# https://github.com/lancedb/lancedb/issues/3310.
|
|
_EMBEDDING_EXECUTOR = _new_embedding_executor()
|
|
|
|
|
|
def embedding_executor() -> concurrent.futures.ThreadPoolExecutor:
|
|
"""Return the executor dedicated to running blocking embedding calls."""
|
|
return _EMBEDDING_EXECUTOR
|
|
|
|
|
|
_FORK_WARNED = False
|
|
|
|
|
|
def _reset_after_fork():
|
|
# Threads do not survive fork(), so the asyncio loop in LOOP.thread is
|
|
# dead in the child. Re-initialize the singleton in place so existing
|
|
# `from .background_loop import LOOP` references in other modules see
|
|
# the new state. The Rust-side tokio runtime is reset analogously by a
|
|
# pthread_atfork hook installed in the _lancedb extension.
|
|
LOOP._start()
|
|
# The embedding executor's worker threads are dead in the child as well.
|
|
# Replace it with a fresh pool (threads are spawned lazily, so this is
|
|
# cheap); we don't shut down the old one, since joining its dead workers
|
|
# could hang.
|
|
global _EMBEDDING_EXECUTOR
|
|
_EMBEDDING_EXECUTOR = _new_embedding_executor()
|
|
global _FORK_WARNED
|
|
if not _FORK_WARNED:
|
|
_FORK_WARNED = True
|
|
warnings.warn(
|
|
"lancedb fork support is experimental: the internal async "
|
|
"runtime has been reset in the forked child, but a small chance "
|
|
"of deadlock remains if other state was mid-operation at fork "
|
|
"time. The 'forkserver' or 'spawn' multiprocessing start method "
|
|
"is likely a safer alternative.",
|
|
RuntimeWarning,
|
|
stacklevel=2,
|
|
)
|
|
|
|
|
|
if hasattr(os, "register_at_fork"):
|
|
os.register_at_fork(after_in_child=_reset_after_fork)
|