mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-21 22:10:40 +00:00
## Summary
PyTorch's `DataLoader` uses fork-based multiprocessing by default on
Linux, but threads do not survive `fork()`. LanceDB's Python bindings
drive async work through two threaded layers, both of which become inert
in a forked child:
- `BackgroundEventLoop` runs an asyncio loop on a Python
`threading.Thread`.
- `pyo3-async-runtimes::tokio` holds a global multi-threaded tokio
runtime whose worker threads also die on fork — and its runtime lives in
a `OnceLock` that cannot be replaced after first use.
As a result, any `Permutation` (or other async API) used inside a
fork-based `DataLoader` worker hangs indefinitely. This PR makes both
layers fork-safe so `Permutation` works as a `torch.utils.data.Dataset`
with `num_workers > 0`.
## Approach
### Rust — new `python/src/runtime.rs`
Mirrors the pattern used in [Lance's Python
bindings](456198cd6f/python/src/lib.rs (L139)),
adapted for the async-bridge use case.
- `LanceRuntime` implements `pyo3_async_runtimes::generic::Runtime +
ContextExt`, backed by an `AtomicPtr<tokio::runtime::Runtime>` we own
(sidestepping `pyo3-async-runtimes`'s frozen `OnceLock` global).
- A `pthread_atfork(after_in_child)` handler nulls the pointer; the next
`spawn` rebuilds the runtime in the child. The previous runtime is
intentionally **leaked** — calling `Drop` would try to join now-dead
worker threads and hang.
- `runtime::future_into_py` is a drop-in for
`pyo3_async_runtimes::tokio::future_into_py`. All ~80 call sites in
`arrow.rs` / `connection.rs` / `permutation.rs` / `query.rs` /
`table.rs` are updated to route through it.
- `python/Cargo.toml` adds `libc = "0.2"` and the tokio
`rt-multi-thread` feature.
### Python — `lancedb/background_loop.py`
- Refactors `BackgroundEventLoop.__init__` to a reusable `_start()`
method.
- An `os.register_at_fork(after_in_child=…)` hook calls `LOOP._start()`
to give the singleton a fresh asyncio loop and thread **in place**. This
matters because the rest of the codebase imports `LOOP` via `from
.background_loop import LOOP` — rebinding the module attribute would
leave those references holding the dead loop.
### Python — `lancedb/__init__.py`
Removes the `__warn_on_fork` pre-fork warning (and the now-unused
`import warnings`). Fork is supported.
## Test plan
- [x] New `test_permutation_dataloader_fork_workers` in
`python/tests/test_torch.py`: runs a `Permutation` through
`torch.utils.data.DataLoader(num_workers=2,
multiprocessing_context="fork")` inside a spawn-isolated child with a
30s hang detector. **Pre-fix**: timed out at 36s. **Post-fix**: passes
in ~3.6s.
- [x] New `test_remote_connection_after_fork` in
`python/tests/test_remote_db.py`: forks a child that creates a fresh
`lancedb.connect(...)` against a mock HTTP server and calls
`table_names()`; passes in <1s, validates the runtime reset is
sufficient for fresh remote clients.
- [x] All 62 tests in `test_torch.py` + `test_permutation.py` pass.
- [x] All 35 tests in `test_remote_db.py` pass.
- [x] `test_table.py` (87) + `test_db.py` + `test_query.py` (157, minus
one unrelated `sentence_transformers` import skip) — 244 passing.
- [x] `cargo clippy -p lancedb-python --tests` clean.
- [x] `cargo fmt`, `ruff check`, `ruff format` all clean.
## Known limitation (follow-up)
This PR makes a **freshly-built** `lancedb.connect(...)` work in a
forked child. An **inherited** `Connection` from the parent still
carries an inherited `reqwest::Client` whose hyper connection pool
references socket FDs and TCP/TLS state shared with the parent — using
it from the child after fork is unsafe (especially with HTTP/1.1
keep-alive). The recommended pattern for fork-based `DataLoader` workers
that hit a remote DB is to construct a new connection inside the worker.
Auto-clearing inherited HTTP client pools on fork would require tracking
live `Connection` instances in `lancedb` core and is left for a
follow-up PR.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
66 lines
1.7 KiB
Rust
66 lines
1.7 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
use std::sync::Arc;
|
|
|
|
use crate::error::PythonErrorExt;
|
|
use crate::runtime::future_into_py;
|
|
use arrow::{
|
|
datatypes::SchemaRef,
|
|
pyarrow::{IntoPyArrow, ToPyArrow},
|
|
};
|
|
use futures::stream::StreamExt;
|
|
use lancedb::arrow::SendableRecordBatchStream;
|
|
use pyo3::{
|
|
Bound, Py, PyAny, PyRef, PyResult, Python, exceptions::PyStopAsyncIteration, pyclass, pymethods,
|
|
};
|
|
|
|
#[pyclass]
|
|
pub struct RecordBatchStream {
|
|
schema: SchemaRef,
|
|
inner: Arc<tokio::sync::Mutex<SendableRecordBatchStream>>,
|
|
}
|
|
|
|
impl RecordBatchStream {
|
|
pub fn new(inner: SendableRecordBatchStream) -> Self {
|
|
let schema = inner.schema().clone();
|
|
Self {
|
|
schema,
|
|
inner: Arc::new(tokio::sync::Mutex::new(inner)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[pymethods]
|
|
impl RecordBatchStream {
|
|
#[getter]
|
|
pub fn schema(&self, py: Python) -> PyResult<Py<PyAny>> {
|
|
(*self.schema)
|
|
.clone()
|
|
.into_pyarrow(py)
|
|
.map(|obj| obj.unbind())
|
|
}
|
|
|
|
pub fn __aiter__(self_: PyRef<'_, Self>) -> PyRef<'_, Self> {
|
|
self_
|
|
}
|
|
|
|
pub fn __anext__(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.inner.clone();
|
|
future_into_py(self_.py(), async move {
|
|
let inner_next = inner
|
|
.lock()
|
|
.await
|
|
.next()
|
|
.await
|
|
.ok_or_else(|| PyStopAsyncIteration::new_err(""))?;
|
|
Python::attach(|py| {
|
|
inner_next
|
|
.infer_error()?
|
|
.to_pyarrow(py)
|
|
.map(|obj| obj.unbind())
|
|
})
|
|
})
|
|
}
|
|
}
|