mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 02:20:40 +00:00
## Summary
PyTorch's `DataLoader` uses fork-based multiprocessing by default on
Linux, but threads do not survive `fork()`. LanceDB's Python bindings
drive async work through two threaded layers, both of which become inert
in a forked child:
- `BackgroundEventLoop` runs an asyncio loop on a Python
`threading.Thread`.
- `pyo3-async-runtimes::tokio` holds a global multi-threaded tokio
runtime whose worker threads also die on fork — and its runtime lives in
a `OnceLock` that cannot be replaced after first use.
As a result, any `Permutation` (or other async API) used inside a
fork-based `DataLoader` worker hangs indefinitely. This PR makes both
layers fork-safe so `Permutation` works as a `torch.utils.data.Dataset`
with `num_workers > 0`.
## Approach
### Rust — new `python/src/runtime.rs`
Mirrors the pattern used in [Lance's Python
bindings](456198cd6f/python/src/lib.rs (L139)),
adapted for the async-bridge use case.
- `LanceRuntime` implements `pyo3_async_runtimes::generic::Runtime +
ContextExt`, backed by an `AtomicPtr<tokio::runtime::Runtime>` we own
(sidestepping `pyo3-async-runtimes`'s frozen `OnceLock` global).
- A `pthread_atfork(after_in_child)` handler nulls the pointer; the next
`spawn` rebuilds the runtime in the child. The previous runtime is
intentionally **leaked** — calling `Drop` would try to join now-dead
worker threads and hang.
- `runtime::future_into_py` is a drop-in for
`pyo3_async_runtimes::tokio::future_into_py`. All ~80 call sites in
`arrow.rs` / `connection.rs` / `permutation.rs` / `query.rs` /
`table.rs` are updated to route through it.
- `python/Cargo.toml` adds `libc = "0.2"` and the tokio
`rt-multi-thread` feature.
### Python — `lancedb/background_loop.py`
- Refactors `BackgroundEventLoop.__init__` to a reusable `_start()`
method.
- An `os.register_at_fork(after_in_child=…)` hook calls `LOOP._start()`
to give the singleton a fresh asyncio loop and thread **in place**. This
matters because the rest of the codebase imports `LOOP` via `from
.background_loop import LOOP` — rebinding the module attribute would
leave those references holding the dead loop.
### Python — `lancedb/__init__.py`
Removes the `__warn_on_fork` pre-fork warning (and the now-unused
`import warnings`). Fork is supported.
## Test plan
- [x] New `test_permutation_dataloader_fork_workers` in
`python/tests/test_torch.py`: runs a `Permutation` through
`torch.utils.data.DataLoader(num_workers=2,
multiprocessing_context="fork")` inside a spawn-isolated child with a
30s hang detector. **Pre-fix**: timed out at 36s. **Post-fix**: passes
in ~3.6s.
- [x] New `test_remote_connection_after_fork` in
`python/tests/test_remote_db.py`: forks a child that creates a fresh
`lancedb.connect(...)` against a mock HTTP server and calls
`table_names()`; passes in <1s, validates the runtime reset is
sufficient for fresh remote clients.
- [x] All 62 tests in `test_torch.py` + `test_permutation.py` pass.
- [x] All 35 tests in `test_remote_db.py` pass.
- [x] `test_table.py` (87) + `test_db.py` + `test_query.py` (157, minus
one unrelated `sentence_transformers` import skip) — 244 passing.
- [x] `cargo clippy -p lancedb-python --tests` clean.
- [x] `cargo fmt`, `ruff check`, `ruff format` all clean.
## Known limitation (follow-up)
This PR makes a **freshly-built** `lancedb.connect(...)` work in a
forked child. An **inherited** `Connection` from the parent still
carries an inherited `reqwest::Client` whose hyper connection pool
references socket FDs and TCP/TLS state shared with the parent — using
it from the child after fork is unsafe (especially with HTTP/1.1
keep-alive). The recommended pattern for fork-based `DataLoader` workers
that hit a remote DB is to construct a new connection inside the worker.
Auto-clearing inherited HTTP client pools on fork would require tracking
live `Connection` instances in `lancedb` core and is left for a
follow-up PR.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
721 lines
26 KiB
Rust
721 lines
26 KiB
Rust
// SPDX-License-Identifier: Apache-2.0
|
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
|
|
|
use std::{
|
|
collections::{HashMap, HashSet},
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use crate::{
|
|
error::PythonErrorExt,
|
|
namespace::{create_namespace_storage_options_provider, extract_namespace_arc},
|
|
runtime::future_into_py,
|
|
table::Table,
|
|
};
|
|
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
|
|
use lancedb::{
|
|
connection::Connection as LanceConnection,
|
|
connection::NamespaceClientPushdownOperation,
|
|
database::namespace::LanceNamespaceDatabase,
|
|
database::{CreateTableMode, Database, ReadConsistency},
|
|
};
|
|
use pyo3::{
|
|
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
|
exceptions::{PyRuntimeError, PyValueError},
|
|
pyclass, pyfunction, pymethods,
|
|
types::{PyDict, PyDictMethods},
|
|
};
|
|
|
|
#[pyclass]
|
|
pub struct Connection {
|
|
inner: Option<LanceConnection>,
|
|
}
|
|
|
|
impl Connection {
|
|
pub(crate) fn new(inner: LanceConnection) -> Self {
|
|
Self { inner: Some(inner) }
|
|
}
|
|
|
|
pub(crate) fn get_inner(&self) -> PyResult<&LanceConnection> {
|
|
self.inner
|
|
.as_ref()
|
|
.ok_or_else(|| PyRuntimeError::new_err("Connection is closed"))
|
|
}
|
|
}
|
|
|
|
fn parse_namespace_client_pushdown_operations(
|
|
operations: Option<Vec<String>>,
|
|
) -> PyResult<HashSet<NamespaceClientPushdownOperation>> {
|
|
let mut parsed = HashSet::new();
|
|
for operation in operations.unwrap_or_default() {
|
|
match operation.as_str() {
|
|
"QueryTable" => {
|
|
parsed.insert(NamespaceClientPushdownOperation::QueryTable);
|
|
}
|
|
"CreateTable" => {
|
|
parsed.insert(NamespaceClientPushdownOperation::CreateTable);
|
|
}
|
|
_ => {
|
|
return Err(PyValueError::new_err(format!(
|
|
"Invalid pushdown operation: {}",
|
|
operation
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
Ok(parsed)
|
|
}
|
|
|
|
impl Connection {
|
|
fn parse_create_mode_str(mode: &str) -> PyResult<CreateTableMode> {
|
|
match mode {
|
|
"create" => Ok(CreateTableMode::Create),
|
|
"overwrite" => Ok(CreateTableMode::Overwrite),
|
|
"exist_ok" => Ok(CreateTableMode::exist_ok(|builder| builder)),
|
|
_ => Err(PyValueError::new_err(format!("Invalid mode {}", mode))),
|
|
}
|
|
}
|
|
|
|
pub fn database(&self) -> PyResult<Arc<dyn Database>> {
|
|
Ok(self.get_inner()?.database().clone())
|
|
}
|
|
}
|
|
|
|
#[pymethods]
|
|
impl Connection {
|
|
fn __repr__(&self) -> String {
|
|
match &self.inner {
|
|
Some(inner) => inner.to_string(),
|
|
None => "ClosedConnection".to_string(),
|
|
}
|
|
}
|
|
|
|
fn is_open(&self) -> bool {
|
|
self.inner.is_some()
|
|
}
|
|
|
|
fn close(&mut self) {
|
|
self.inner.take();
|
|
}
|
|
|
|
#[getter]
|
|
pub fn uri(&self) -> PyResult<String> {
|
|
self.get_inner().map(|inner| inner.uri().to_string())
|
|
}
|
|
|
|
#[pyo3(signature = ())]
|
|
pub fn get_read_consistency_interval(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
future_into_py(self_.py(), async move {
|
|
Ok(match inner.read_consistency().await.infer_error()? {
|
|
ReadConsistency::Manual => None,
|
|
ReadConsistency::Eventual(duration) => Some(duration.as_secs_f64()),
|
|
ReadConsistency::Strong => Some(0.0_f64),
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path=None, start_after=None, limit=None))]
|
|
pub fn table_names(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
start_after: Option<String>,
|
|
limit: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let mut op = inner.table_names();
|
|
op = op.namespace(namespace_path.unwrap_or_default());
|
|
if let Some(start_after) = start_after {
|
|
op = op.start_after(start_after);
|
|
}
|
|
if let Some(limit) = limit {
|
|
op = op.limit(limit);
|
|
}
|
|
future_into_py(self_.py(), async move { op.execute().await.infer_error() })
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[pyo3(signature = (name, mode, data, namespace_path=None, storage_options=None, location=None, namespace_client=None))]
|
|
pub fn create_table<'a>(
|
|
self_: PyRef<'a, Self>,
|
|
name: String,
|
|
mode: &str,
|
|
data: Bound<'_, PyAny>,
|
|
namespace_path: Option<Vec<String>>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
location: Option<String>,
|
|
namespace_client: Option<Py<PyAny>>,
|
|
) -> PyResult<Bound<'a, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
|
|
let mode = Self::parse_create_mode_str(mode)?;
|
|
|
|
let batches: Box<dyn arrow::array::RecordBatchReader + Send> =
|
|
Box::new(ArrowArrayStreamReader::from_pyarrow_bound(&data)?);
|
|
|
|
let ns_path = namespace_path.clone().unwrap_or_default();
|
|
let mut builder = inner.create_table(name.clone(), batches).mode(mode);
|
|
|
|
builder = builder.namespace(ns_path.clone());
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
|
|
// Auto-create storage options provider from namespace_client
|
|
if let Some(ns_obj) = namespace_client {
|
|
let ns_client = extract_namespace_arc(py, ns_obj)?;
|
|
// Create table_id by combining namespace_path with table name
|
|
let mut table_id = ns_path;
|
|
table_id.push(name);
|
|
let provider = create_namespace_storage_options_provider(ns_client, table_id);
|
|
builder = builder.storage_options_provider(provider);
|
|
}
|
|
|
|
if let Some(location) = location {
|
|
builder = builder.location(location);
|
|
}
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[pyo3(signature = (name, mode, schema, namespace_path=None, storage_options=None, location=None, namespace_client=None))]
|
|
pub fn create_empty_table<'a>(
|
|
self_: PyRef<'a, Self>,
|
|
name: String,
|
|
mode: &str,
|
|
schema: Bound<'_, PyAny>,
|
|
namespace_path: Option<Vec<String>>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
location: Option<String>,
|
|
namespace_client: Option<Py<PyAny>>,
|
|
) -> PyResult<Bound<'a, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
|
|
let mode = Self::parse_create_mode_str(mode)?;
|
|
|
|
let schema = Schema::from_pyarrow_bound(&schema)?;
|
|
|
|
let ns_path = namespace_path.clone().unwrap_or_default();
|
|
let mut builder = inner
|
|
.create_empty_table(name.clone(), Arc::new(schema))
|
|
.mode(mode);
|
|
|
|
builder = builder.namespace(ns_path.clone());
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
|
|
// Auto-create storage options provider from namespace_client
|
|
if let Some(ns_obj) = namespace_client {
|
|
let ns_client = extract_namespace_arc(py, ns_obj)?;
|
|
// Create table_id by combining namespace_path with table name
|
|
let mut table_id = ns_path;
|
|
table_id.push(name);
|
|
let provider = create_namespace_storage_options_provider(ns_client, table_id);
|
|
builder = builder.storage_options_provider(provider);
|
|
}
|
|
|
|
if let Some(location) = location {
|
|
builder = builder.location(location);
|
|
}
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
#[pyo3(signature = (name, namespace_path=None, storage_options=None, index_cache_size=None, location=None, namespace_client=None, managed_versioning=None))]
|
|
pub fn open_table(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
namespace_path: Option<Vec<String>>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
index_cache_size: Option<u32>,
|
|
location: Option<String>,
|
|
namespace_client: Option<Py<PyAny>>,
|
|
managed_versioning: Option<bool>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
|
|
let ns_path = namespace_path.clone().unwrap_or_default();
|
|
let mut builder = inner.open_table(name.clone());
|
|
builder = builder.namespace(ns_path.clone());
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
|
|
// Auto-create storage options provider from namespace_client
|
|
if let Some(ns_obj) = namespace_client {
|
|
let ns_client = extract_namespace_arc(py, ns_obj)?;
|
|
// Create table_id by combining namespace_path with table name
|
|
let mut table_id = ns_path;
|
|
table_id.push(name);
|
|
let provider = create_namespace_storage_options_provider(ns_client.clone(), table_id);
|
|
builder = builder.storage_options_provider(provider);
|
|
builder = builder.namespace_client(ns_client);
|
|
}
|
|
|
|
if let Some(index_cache_size) = index_cache_size {
|
|
builder = builder.index_cache_size(index_cache_size);
|
|
}
|
|
if let Some(location) = location {
|
|
builder = builder.location(location);
|
|
}
|
|
// Pass managed_versioning if provided to avoid redundant describe_table call
|
|
if let Some(enabled) = managed_versioning {
|
|
builder = builder.managed_versioning(enabled);
|
|
}
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (target_table_name, source_uri, target_namespace_path=None, source_version=None, source_tag=None, is_shallow=true))]
|
|
pub fn clone_table(
|
|
self_: PyRef<'_, Self>,
|
|
target_table_name: String,
|
|
source_uri: String,
|
|
target_namespace_path: Option<Vec<String>>,
|
|
source_version: Option<u64>,
|
|
source_tag: Option<String>,
|
|
is_shallow: bool,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
|
|
let mut builder = inner.clone_table(target_table_name, source_uri);
|
|
builder = builder.target_namespace(target_namespace_path.unwrap_or_default());
|
|
if let Some(version) = source_version {
|
|
builder = builder.source_version(version);
|
|
}
|
|
if let Some(tag) = source_tag {
|
|
builder = builder.source_tag(tag);
|
|
}
|
|
builder = builder.is_shallow(is_shallow);
|
|
|
|
future_into_py(self_.py(), async move {
|
|
let table = builder.execute().await.infer_error()?;
|
|
Ok(Table::new(table))
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (cur_name, new_name, cur_namespace_path=None, new_namespace_path=None))]
|
|
pub fn rename_table(
|
|
self_: PyRef<'_, Self>,
|
|
cur_name: String,
|
|
new_name: String,
|
|
cur_namespace_path: Option<Vec<String>>,
|
|
new_namespace_path: Option<Vec<String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let cur_ns_path = cur_namespace_path.unwrap_or_default();
|
|
let new_ns_path = new_namespace_path.unwrap_or_default();
|
|
future_into_py(self_.py(), async move {
|
|
inner
|
|
.rename_table(cur_name, new_name, &cur_ns_path, &new_ns_path)
|
|
.await
|
|
.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (name, namespace_path=None))]
|
|
pub fn drop_table(
|
|
self_: PyRef<'_, Self>,
|
|
name: String,
|
|
namespace_path: Option<Vec<String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let ns_path = namespace_path.unwrap_or_default();
|
|
future_into_py(self_.py(), async move {
|
|
inner.drop_table(name, &ns_path).await.infer_error()
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path=None,))]
|
|
pub fn drop_all_tables(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let ns_path = namespace_path.unwrap_or_default();
|
|
future_into_py(self_.py(), async move {
|
|
inner.drop_all_tables(&ns_path).await.infer_error()
|
|
})
|
|
}
|
|
|
|
// Namespace management methods
|
|
|
|
#[pyo3(signature = (namespace_path=None, page_token=None, limit=None))]
|
|
pub fn list_namespaces(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
page_token: Option<String>,
|
|
limit: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::ListNamespacesRequest;
|
|
let request = ListNamespacesRequest {
|
|
id: namespace_path,
|
|
page_token,
|
|
limit: limit.map(|l| l as i32),
|
|
..Default::default()
|
|
};
|
|
let response = inner.list_namespaces(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("namespaces", response.namespaces)?;
|
|
dict.set_item("page_token", response.page_token)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path, mode=None, properties=None))]
|
|
pub fn create_namespace(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Vec<String>,
|
|
mode: Option<String>,
|
|
properties: Option<std::collections::HashMap<String, String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::CreateNamespaceRequest;
|
|
// Mode is now a string field
|
|
let mode_str = mode.and_then(|m| match m.to_lowercase().as_str() {
|
|
"create" => Some("Create".to_string()),
|
|
"exist_ok" => Some("ExistOk".to_string()),
|
|
"overwrite" => Some("Overwrite".to_string()),
|
|
_ => None,
|
|
});
|
|
let request = CreateNamespaceRequest {
|
|
id: Some(namespace_path),
|
|
mode: mode_str,
|
|
properties,
|
|
..Default::default()
|
|
};
|
|
let response = inner.create_namespace(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("properties", response.properties)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path, mode=None, behavior=None))]
|
|
pub fn drop_namespace(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Vec<String>,
|
|
mode: Option<String>,
|
|
behavior: Option<String>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::DropNamespaceRequest;
|
|
// Mode and Behavior are now string fields
|
|
let mode_str = mode.and_then(|m| match m.to_uppercase().as_str() {
|
|
"SKIP" => Some("Skip".to_string()),
|
|
"FAIL" => Some("Fail".to_string()),
|
|
_ => None,
|
|
});
|
|
let behavior_str = behavior.and_then(|b| match b.to_uppercase().as_str() {
|
|
"RESTRICT" => Some("Restrict".to_string()),
|
|
"CASCADE" => Some("Cascade".to_string()),
|
|
_ => None,
|
|
});
|
|
let request = DropNamespaceRequest {
|
|
id: Some(namespace_path),
|
|
mode: mode_str,
|
|
behavior: behavior_str,
|
|
..Default::default()
|
|
};
|
|
let response = inner.drop_namespace(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("properties", response.properties)?;
|
|
dict.set_item("transaction_id", response.transaction_id)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path,))]
|
|
pub fn describe_namespace(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Vec<String>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::DescribeNamespaceRequest;
|
|
let request = DescribeNamespaceRequest {
|
|
id: Some(namespace_path),
|
|
..Default::default()
|
|
};
|
|
let response = inner.describe_namespace(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("properties", response.properties)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
#[pyo3(signature = (namespace_path=None, page_token=None, limit=None))]
|
|
pub fn list_tables(
|
|
self_: PyRef<'_, Self>,
|
|
namespace_path: Option<Vec<String>>,
|
|
page_token: Option<String>,
|
|
limit: Option<u32>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
use lance_namespace::models::ListTablesRequest;
|
|
let request = ListTablesRequest {
|
|
id: namespace_path,
|
|
page_token,
|
|
limit: limit.map(|l| l as i32),
|
|
..Default::default()
|
|
};
|
|
let response = inner.list_tables(request).await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("tables", response.tables)?;
|
|
dict.set_item("page_token", response.page_token)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
|
|
/// Get the configuration for constructing an equivalent namespace client.
|
|
/// Returns a dict with:
|
|
/// - "impl": "dir" for DirectoryNamespace, "rest" for RestNamespace
|
|
/// - "properties": configuration properties for the namespace
|
|
#[pyo3(signature = ())]
|
|
pub fn namespace_client_config(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
|
let inner = self_.get_inner()?.clone();
|
|
let py = self_.py();
|
|
future_into_py(py, async move {
|
|
let (impl_type, properties) = inner.namespace_client_config().await.infer_error()?;
|
|
Python::attach(|py| -> PyResult<Py<PyDict>> {
|
|
let dict = PyDict::new(py);
|
|
dict.set_item("impl", impl_type)?;
|
|
dict.set_item("properties", properties)?;
|
|
Ok(dict.unbind())
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
#[pyfunction]
|
|
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub fn connect(
|
|
py: Python<'_>,
|
|
uri: String,
|
|
api_key: Option<String>,
|
|
region: Option<String>,
|
|
host_override: Option<String>,
|
|
read_consistency_interval: Option<f64>,
|
|
client_config: Option<PyClientConfig>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
session: Option<crate::session::Session>,
|
|
manifest_enabled: bool,
|
|
namespace_client_properties: Option<HashMap<String, String>>,
|
|
) -> PyResult<Bound<'_, PyAny>> {
|
|
future_into_py(py, async move {
|
|
let mut builder = lancedb::connect(&uri);
|
|
if let Some(api_key) = api_key {
|
|
builder = builder.api_key(&api_key);
|
|
}
|
|
if let Some(region) = region {
|
|
builder = builder.region(®ion);
|
|
}
|
|
if let Some(host_override) = host_override {
|
|
builder = builder.host_override(&host_override);
|
|
}
|
|
if let Some(read_consistency_interval) = read_consistency_interval {
|
|
let read_consistency_interval = Duration::from_secs_f64(read_consistency_interval);
|
|
builder = builder.read_consistency_interval(read_consistency_interval);
|
|
}
|
|
if let Some(storage_options) = storage_options {
|
|
builder = builder.storage_options(storage_options);
|
|
}
|
|
if manifest_enabled {
|
|
builder = builder.manifest_enabled(true);
|
|
}
|
|
if let Some(namespace_client_properties) = namespace_client_properties {
|
|
builder = builder.namespace_client_properties(namespace_client_properties);
|
|
}
|
|
#[cfg(feature = "remote")]
|
|
if let Some(client_config) = client_config {
|
|
builder = builder.client_config(client_config.into());
|
|
}
|
|
if let Some(session) = session {
|
|
builder = builder.session(session.inner.clone());
|
|
}
|
|
Ok(Connection::new(builder.execute().await.infer_error()?))
|
|
})
|
|
}
|
|
|
|
#[pyfunction]
|
|
#[pyo3(signature = (
|
|
namespace_client,
|
|
read_consistency_interval=None,
|
|
storage_options=None,
|
|
session=None,
|
|
namespace_client_pushdown_operations=None,
|
|
namespace_client_impl=None,
|
|
namespace_client_properties=None,
|
|
))]
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub fn connect_namespace_client(
|
|
py: Python<'_>,
|
|
namespace_client: Py<PyAny>,
|
|
read_consistency_interval: Option<f64>,
|
|
storage_options: Option<HashMap<String, String>>,
|
|
session: Option<crate::session::Session>,
|
|
namespace_client_pushdown_operations: Option<Vec<String>>,
|
|
namespace_client_impl: Option<String>,
|
|
namespace_client_properties: Option<HashMap<String, String>>,
|
|
) -> PyResult<Connection> {
|
|
let namespace_client = extract_namespace_arc(py, namespace_client)?;
|
|
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
|
let namespace_client_pushdown_operations =
|
|
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
|
let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string());
|
|
let ns_properties = namespace_client_properties.unwrap_or_default();
|
|
let storage_options = storage_options.unwrap_or_default();
|
|
let session = session.map(|s| s.inner.clone());
|
|
|
|
let database = LanceNamespaceDatabase::from_namespace_client(
|
|
namespace_client,
|
|
ns_impl,
|
|
ns_properties,
|
|
storage_options,
|
|
read_consistency_interval,
|
|
session,
|
|
namespace_client_pushdown_operations,
|
|
);
|
|
|
|
Ok(Connection::new(LanceConnection::new(
|
|
Arc::new(database),
|
|
Arc::new(lancedb::embeddings::MemoryRegistry::new()),
|
|
)))
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientConfig {
|
|
user_agent: String,
|
|
retry_config: Option<PyClientRetryConfig>,
|
|
timeout_config: Option<PyClientTimeoutConfig>,
|
|
extra_headers: Option<HashMap<String, String>>,
|
|
id_delimiter: Option<String>,
|
|
tls_config: Option<PyClientTlsConfig>,
|
|
header_provider: Option<Py<PyAny>>,
|
|
user_id: Option<String>,
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientRetryConfig {
|
|
retries: Option<u8>,
|
|
connect_retries: Option<u8>,
|
|
read_retries: Option<u8>,
|
|
backoff_factor: Option<f32>,
|
|
backoff_jitter: Option<f32>,
|
|
statuses: Option<Vec<u16>>,
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientTimeoutConfig {
|
|
timeout: Option<Duration>,
|
|
connect_timeout: Option<Duration>,
|
|
read_timeout: Option<Duration>,
|
|
pool_idle_timeout: Option<Duration>,
|
|
}
|
|
|
|
#[derive(FromPyObject)]
|
|
pub struct PyClientTlsConfig {
|
|
cert_file: Option<String>,
|
|
key_file: Option<String>,
|
|
ssl_ca_cert: Option<String>,
|
|
assert_hostname: bool,
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientRetryConfig> for lancedb::remote::RetryConfig {
|
|
fn from(value: PyClientRetryConfig) -> Self {
|
|
Self {
|
|
retries: value.retries,
|
|
connect_retries: value.connect_retries,
|
|
read_retries: value.read_retries,
|
|
backoff_factor: value.backoff_factor,
|
|
backoff_jitter: value.backoff_jitter,
|
|
statuses: value.statuses,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientTimeoutConfig> for lancedb::remote::TimeoutConfig {
|
|
fn from(value: PyClientTimeoutConfig) -> Self {
|
|
Self {
|
|
timeout: value.timeout,
|
|
connect_timeout: value.connect_timeout,
|
|
read_timeout: value.read_timeout,
|
|
pool_idle_timeout: value.pool_idle_timeout,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientTlsConfig> for lancedb::remote::TlsConfig {
|
|
fn from(value: PyClientTlsConfig) -> Self {
|
|
Self {
|
|
cert_file: value.cert_file,
|
|
key_file: value.key_file,
|
|
ssl_ca_cert: value.ssl_ca_cert,
|
|
assert_hostname: value.assert_hostname,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "remote")]
|
|
impl From<PyClientConfig> for lancedb::remote::ClientConfig {
|
|
fn from(value: PyClientConfig) -> Self {
|
|
use crate::header::PyHeaderProvider;
|
|
|
|
let header_provider = value.header_provider.map(|provider| {
|
|
let py_provider = PyHeaderProvider::new(provider);
|
|
Arc::new(py_provider) as Arc<dyn lancedb::remote::HeaderProvider>
|
|
});
|
|
|
|
Self {
|
|
user_agent: value.user_agent,
|
|
retry_config: value.retry_config.map(Into::into).unwrap_or_default(),
|
|
timeout_config: value.timeout_config.map(Into::into).unwrap_or_default(),
|
|
extra_headers: value.extra_headers.unwrap_or_default(),
|
|
id_delimiter: value.id_delimiter,
|
|
tls_config: value.tls_config.map(Into::into),
|
|
header_provider,
|
|
user_id: value.user_id,
|
|
}
|
|
}
|
|
}
|