mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-03 11:00:40 +00:00
fix(python): route sync namespace connections through rust (#3598)
Summary: - Route built-in sync namespace connections through the Rust namespace connector. - Keep custom namespace clients on the existing Python fallback. - Preserve namespace-backed to_lance compatibility with lazy Python client construction and add regressions.
This commit is contained in:
@@ -282,6 +282,23 @@ async def connect(
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
oauth_config: Optional[Any] = None,
|
||||
) -> Connection: ...
|
||||
def connect_namespace(
|
||||
namespace_client_impl: str,
|
||||
namespace_client_properties: Dict[str, str],
|
||||
read_consistency_interval: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
) -> Connection: ...
|
||||
def connect_namespace_client(
|
||||
namespace_client: Any,
|
||||
read_consistency_interval: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
) -> Connection: ...
|
||||
|
||||
class RecordBatchStream:
|
||||
@property
|
||||
|
||||
@@ -38,8 +38,11 @@ from lance_namespace_urllib3_client.models.query_table_request_vector import (
|
||||
QueryTableRequestVector,
|
||||
)
|
||||
from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery
|
||||
from lance_namespace.errors import TableNotFoundError
|
||||
from lancedb._lancedb import connect_namespace_client as _connect_namespace_client
|
||||
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
|
||||
from lancedb._lancedb import (
|
||||
connect_namespace as _connect_namespace,
|
||||
connect_namespace_client as _connect_namespace_client,
|
||||
)
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.db import AsyncConnection, DBConnection
|
||||
from lancedb.namespace_utils import (
|
||||
@@ -386,6 +389,10 @@ def _builds_namespace_natively(
|
||||
return namespace_client_impl == "rest" and bool(namespace_client_properties)
|
||||
|
||||
|
||||
def _supports_native_sync_namespace(namespace_client_impl: str) -> bool:
|
||||
return namespace_client_impl in {"dir", "rest"}
|
||||
|
||||
|
||||
class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
A LanceDB connection that uses a namespace for table management.
|
||||
@@ -396,7 +403,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
namespace_client: LanceNamespace,
|
||||
namespace_client: Optional[LanceNamespace] = None,
|
||||
*,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
@@ -404,6 +411,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
_inner: Optional[AsyncConnection] = None,
|
||||
_route_pushdown_to_rust: Optional[bool] = None,
|
||||
):
|
||||
"""
|
||||
Initialize a namespace-based LanceDB connection.
|
||||
@@ -449,26 +458,36 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
# ``build_namespace_natively``), the underlying Rust table performs
|
||||
# QueryTable pushdown through the read-freshness context provider, which
|
||||
# the pure-Python ``query_table`` path bypasses.
|
||||
self._route_pushdown_to_rust = _builds_namespace_natively(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=self.storage_options or None,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=(
|
||||
list(self._namespace_client_pushdown_operations)
|
||||
),
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
self._route_pushdown_to_rust = (
|
||||
_route_pushdown_to_rust
|
||||
if _route_pushdown_to_rust is not None
|
||||
else _builds_namespace_natively(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
)
|
||||
if _inner is not None:
|
||||
self._inner = _inner
|
||||
else:
|
||||
if namespace_client is None:
|
||||
raise ValueError("namespace_client is required without a native _inner")
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=self.storage_options or None,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=(
|
||||
list(self._namespace_client_pushdown_operations)
|
||||
),
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
)
|
||||
)
|
||||
self._uri = self._inner.uri
|
||||
|
||||
@override
|
||||
def serialize(self) -> str:
|
||||
@@ -514,11 +533,11 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
return LOOP.run(
|
||||
self._inner.table_names(
|
||||
namespace_path=namespace_path, start_after=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return response.tables if response.tables else []
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
@@ -589,8 +608,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "Table not found" in str(e):
|
||||
except (RuntimeError, ValueError) as e:
|
||||
if "Table not found" in str(e) or "was not found" in str(e):
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
@@ -612,12 +631,9 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
|
||||
@override
|
||||
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
# Use namespace drop_table directly
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
table_id = namespace_path + [name]
|
||||
request = DropTableRequest(id=table_id)
|
||||
self._namespace_client.drop_table(request)
|
||||
LOOP.run(self._inner.drop_table(name, namespace_path=namespace_path))
|
||||
|
||||
@override
|
||||
def rename_table(
|
||||
@@ -631,14 +647,19 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
cur_namespace_path = []
|
||||
if new_namespace_path is None:
|
||||
new_namespace_path = []
|
||||
cur_table_id = cur_namespace_path + [cur_name]
|
||||
new_namespace_id = new_namespace_path if new_namespace_path else None
|
||||
request = RenameTableRequest(
|
||||
id=cur_table_id,
|
||||
new_table_name=new_name,
|
||||
new_namespace_id=new_namespace_id,
|
||||
)
|
||||
self._namespace_client.rename_table(request)
|
||||
try:
|
||||
LOOP.run(
|
||||
self._inner.rename_table(
|
||||
cur_name,
|
||||
new_name,
|
||||
cur_namespace_path=cur_namespace_path,
|
||||
new_namespace_path=new_namespace_path,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "rename_table not implemented" in str(e):
|
||||
raise NotImplementedError("rename_table not implemented") from e
|
||||
raise
|
||||
|
||||
@override
|
||||
def drop_database(self):
|
||||
@@ -650,8 +671,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
def drop_all_tables(self, namespace_path: Optional[List[str]] = None):
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
for table_name in self.table_names(namespace_path=namespace_path):
|
||||
self.drop_table(table_name, namespace_path=namespace_path)
|
||||
LOOP.run(self._inner.drop_all_tables(namespace_path=namespace_path))
|
||||
|
||||
@override
|
||||
def list_namespaces(
|
||||
@@ -681,13 +701,10 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListNamespacesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_namespaces(request)
|
||||
return ListNamespacesResponse(
|
||||
namespaces=response.namespaces if response.namespaces else [],
|
||||
page_token=response.page_token,
|
||||
return LOOP.run(
|
||||
self._inner.list_namespaces(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -715,14 +732,12 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
CreateNamespaceResponse
|
||||
Response containing the properties of the created namespace.
|
||||
"""
|
||||
request = CreateNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_create_namespace_mode(mode),
|
||||
properties=properties,
|
||||
)
|
||||
response = self._namespace_client.create_namespace(request)
|
||||
return CreateNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
return LOOP.run(
|
||||
self._inner.create_namespace(
|
||||
namespace_path=namespace_path,
|
||||
mode=mode,
|
||||
properties=properties,
|
||||
)
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -750,20 +765,18 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
DropNamespaceResponse
|
||||
Response containing properties and transaction_id if applicable.
|
||||
"""
|
||||
request = DropNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_drop_namespace_mode(mode),
|
||||
behavior=_normalize_drop_namespace_behavior(behavior),
|
||||
)
|
||||
response = self._namespace_client.drop_namespace(request)
|
||||
return DropNamespaceResponse(
|
||||
properties=(
|
||||
response.properties if hasattr(response, "properties") else None
|
||||
),
|
||||
transaction_id=(
|
||||
response.transaction_id if hasattr(response, "transaction_id") else None
|
||||
),
|
||||
)
|
||||
try:
|
||||
return LOOP.run(
|
||||
self._inner.drop_namespace(
|
||||
namespace_path=namespace_path,
|
||||
mode=mode,
|
||||
behavior=behavior,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "Namespace not empty" in str(e):
|
||||
raise NamespaceNotEmptyError(str(e)) from e
|
||||
raise
|
||||
|
||||
@override
|
||||
def describe_namespace(
|
||||
@@ -782,11 +795,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
DescribeNamespaceResponse
|
||||
Response containing the namespace properties.
|
||||
"""
|
||||
request = DescribeNamespaceRequest(id=namespace_path)
|
||||
response = self._namespace_client.describe_namespace(request)
|
||||
return DescribeNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
return LOOP.run(self._inner.describe_namespace(namespace_path))
|
||||
|
||||
@override
|
||||
def list_tables(
|
||||
@@ -816,13 +825,10 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return ListTablesResponse(
|
||||
tables=response.tables if response.tables else [],
|
||||
page_token=response.page_token,
|
||||
return LOOP.run(
|
||||
self._inner.list_tables(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
def _lance_table_from_uri(
|
||||
@@ -878,6 +884,18 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
LanceNamespace
|
||||
The namespace client for this connection.
|
||||
"""
|
||||
if self._namespace_client is None:
|
||||
if (
|
||||
self._namespace_client_impl is None
|
||||
or self._namespace_client_properties is None
|
||||
):
|
||||
raise ValueError(
|
||||
"Cannot construct a Python namespace client without "
|
||||
"namespace implementation properties"
|
||||
)
|
||||
self._namespace_client = namespace_connect(
|
||||
self._namespace_client_impl, self._namespace_client_properties
|
||||
)
|
||||
return self._namespace_client
|
||||
|
||||
|
||||
@@ -1342,6 +1360,33 @@ def connect_namespace(
|
||||
LanceNamespaceDBConnection
|
||||
A namespace-based connection to LanceDB
|
||||
"""
|
||||
if _supports_native_sync_namespace(namespace_client_impl):
|
||||
inner = AsyncConnection(
|
||||
_connect_namespace(
|
||||
namespace_client_impl,
|
||||
namespace_client_properties,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
)
|
||||
)
|
||||
return LanceNamespaceDBConnection(
|
||||
namespace_client=None,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
_inner=inner,
|
||||
_route_pushdown_to_rust=True,
|
||||
)
|
||||
|
||||
namespace_client = namespace_connect(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
|
||||
@@ -2142,12 +2142,19 @@ class LanceTable(Table):
|
||||
|
||||
branch = self.current_branch()
|
||||
version = None if branch is not None else self.version
|
||||
if self._namespace_client is not None:
|
||||
namespace_client = self._namespace_client
|
||||
if namespace_client is None:
|
||||
conn_uri = getattr(self._conn, "uri", "")
|
||||
if get_uri_scheme(conn_uri) == "namespace":
|
||||
namespace_client = self._conn.namespace_client()
|
||||
self._namespace_client = namespace_client
|
||||
|
||||
if namespace_client is not None:
|
||||
table_id = self._namespace_path + [self.name]
|
||||
ds = lance.dataset(
|
||||
version=version,
|
||||
storage_options=self._conn.storage_options,
|
||||
namespace_client=self._namespace_client,
|
||||
namespace_client=namespace_client,
|
||||
table_id=table_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
import tempfile
|
||||
import shutil
|
||||
import importlib
|
||||
import pytest
|
||||
import pyarrow as pa
|
||||
import lancedb
|
||||
@@ -103,6 +104,40 @@ class TestNamespaceConnection:
|
||||
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
|
||||
assert len(list(db.table_names())) == 0
|
||||
|
||||
def test_sync_builtin_namespace_uses_rust_without_python_client(self, monkeypatch):
|
||||
"""Built-in sync namespace connections should not construct or call the
|
||||
Python namespace client for normal namespace/table management."""
|
||||
namespace_module = importlib.import_module("lancedb.namespace")
|
||||
|
||||
def fail_namespace_connect(*args, **kwargs):
|
||||
raise AssertionError("Python namespace client should not be constructed")
|
||||
|
||||
monkeypatch.setattr(
|
||||
namespace_module, "namespace_connect", fail_namespace_connect
|
||||
)
|
||||
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
|
||||
assert db._namespace_client is None
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
db.create_namespace(["test_ns"])
|
||||
assert "test_ns" in db.list_namespaces().namespaces
|
||||
|
||||
schema = pa.schema([pa.field("id", pa.int64())])
|
||||
table = db.create_table("test_table", schema=schema, namespace_path=["test_ns"])
|
||||
assert table.namespace == ["test_ns"]
|
||||
assert "test_table" in db.table_names(namespace_path=["test_ns"])
|
||||
assert "test_table" in db.list_tables(namespace_path=["test_ns"]).tables
|
||||
|
||||
opened = db.open_table("test_table", namespace_path=["test_ns"])
|
||||
assert opened.namespace == ["test_ns"]
|
||||
|
||||
db.drop_table("test_table", namespace_path=["test_ns"])
|
||||
assert db.list_tables(namespace_path=["test_ns"]).tables == []
|
||||
db.drop_namespace(["test_ns"])
|
||||
assert "test_ns" not in db.list_namespaces().namespaces
|
||||
|
||||
def test_create_table_through_namespace(self):
|
||||
"""Test creating a table through namespace."""
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
@@ -818,10 +853,11 @@ class TestPushdownOperations:
|
||||
)
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
def test_route_pushdown_to_rust_false_for_dir(self):
|
||||
"""A non-native (dir) connection keeps the Python pushdown path."""
|
||||
def test_route_pushdown_to_rust_for_native_dir(self):
|
||||
"""The sync dir connection is natively built and defers QueryTable
|
||||
pushdown to Rust."""
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
assert db._route_pushdown_to_rust is False
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
def test_async_route_pushdown_to_rust_for_native_rest(self):
|
||||
"""The async connection must not silently bypass the read-freshness fix:
|
||||
|
||||
@@ -1137,6 +1137,16 @@ def test_namespace_open_table_with_branch_version(tmp_path):
|
||||
assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3
|
||||
|
||||
|
||||
def test_namespace_root_table_to_lance_uses_namespace_client(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace("dir", {"root": str(tmp_path)})
|
||||
table = db.create_table("t", [{"i": 0}])
|
||||
|
||||
assert table._namespace_client is None
|
||||
assert table.to_lance().count_rows() == 1
|
||||
assert table._namespace_client is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch_version(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
|
||||
@@ -655,6 +655,46 @@ pub fn connect_namespace_client(
|
||||
)))
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
#[pyo3(signature = (
|
||||
namespace_client_impl,
|
||||
namespace_client_properties,
|
||||
read_consistency_interval=None,
|
||||
storage_options=None,
|
||||
session=None,
|
||||
namespace_client_pushdown_operations=None,
|
||||
))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn connect_namespace(
|
||||
namespace_client_impl: String,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
read_consistency_interval: Option<f64>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
session: Option<crate::session::Session>,
|
||||
namespace_client_pushdown_operations: Option<Vec<String>>,
|
||||
) -> PyResult<Connection> {
|
||||
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
||||
let namespace_client_pushdown_operations =
|
||||
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
||||
|
||||
let mut builder =
|
||||
lancedb::connect_namespace(&namespace_client_impl, namespace_client_properties)
|
||||
.pushdown_operations(namespace_client_pushdown_operations);
|
||||
if let Some(storage_options) = storage_options {
|
||||
builder = builder.storage_options(storage_options);
|
||||
}
|
||||
if let Some(read_consistency_interval) = read_consistency_interval {
|
||||
builder = builder.read_consistency_interval(read_consistency_interval);
|
||||
}
|
||||
if let Some(session) = session {
|
||||
builder = builder.session(session.inner.clone());
|
||||
}
|
||||
|
||||
Ok(Connection::new(
|
||||
crate::runtime::block_on(builder.execute()).infer_error()?,
|
||||
))
|
||||
}
|
||||
|
||||
/// Whether to build the namespace natively (from impl + properties) instead of
|
||||
/// wrapping a pre-built client. Native construction is required for the
|
||||
/// read-freshness provider to be installed
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use arrow::RecordBatchStream;
|
||||
use connection::{Connection, connect, connect_namespace_client};
|
||||
use connection::{Connection, connect, connect_namespace, connect_namespace_client};
|
||||
use env_logger::Env;
|
||||
use expr::{PyExpr, expr_col, expr_func, expr_lit};
|
||||
use index::IndexConfig;
|
||||
@@ -62,6 +62,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<PyPermutationReader>()?;
|
||||
m.add_class::<PyExpr>()?;
|
||||
m.add_function(wrap_pyfunction!(connect, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(connect_namespace, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(connect_namespace_client, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(permutation::async_permutation_builder, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(util::validate_table_name, m)?)?;
|
||||
|
||||
Reference in New Issue
Block a user