diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index 3f3c986dd..bb10e2c17 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -282,6 +282,23 @@ async def connect( namespace_client_properties: Optional[Dict[str, str]] = None, oauth_config: Optional[Any] = None, ) -> Connection: ... +def connect_namespace( + namespace_client_impl: str, + namespace_client_properties: Dict[str, str], + read_consistency_interval: Optional[float] = None, + storage_options: Optional[Dict[str, str]] = None, + session: Optional[Session] = None, + namespace_client_pushdown_operations: Optional[List[str]] = None, +) -> Connection: ... +def connect_namespace_client( + namespace_client: Any, + read_consistency_interval: Optional[float] = None, + storage_options: Optional[Dict[str, str]] = None, + session: Optional[Session] = None, + namespace_client_pushdown_operations: Optional[List[str]] = None, + namespace_client_impl: Optional[str] = None, + namespace_client_properties: Optional[Dict[str, str]] = None, +) -> Connection: ... class RecordBatchStream: @property diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 23d4f7885..6e0c94fb5 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -38,8 +38,11 @@ from lance_namespace_urllib3_client.models.query_table_request_vector import ( QueryTableRequestVector, ) from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery -from lance_namespace.errors import TableNotFoundError -from lancedb._lancedb import connect_namespace_client as _connect_namespace_client +from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError +from lancedb._lancedb import ( + connect_namespace as _connect_namespace, + connect_namespace_client as _connect_namespace_client, +) from lancedb.background_loop import LOOP from lancedb.db import AsyncConnection, DBConnection from lancedb.namespace_utils import ( @@ -386,6 +389,10 @@ def _builds_namespace_natively( return namespace_client_impl == "rest" and bool(namespace_client_properties) +def _supports_native_sync_namespace(namespace_client_impl: str) -> bool: + return namespace_client_impl in {"dir", "rest"} + + class LanceNamespaceDBConnection(DBConnection): """ A LanceDB connection that uses a namespace for table management. @@ -396,7 +403,7 @@ class LanceNamespaceDBConnection(DBConnection): def __init__( self, - namespace_client: LanceNamespace, + namespace_client: Optional[LanceNamespace] = None, *, read_consistency_interval: Optional[timedelta] = None, storage_options: Optional[Dict[str, str]] = None, @@ -404,6 +411,8 @@ class LanceNamespaceDBConnection(DBConnection): namespace_client_pushdown_operations: Optional[List[str]] = None, namespace_client_impl: Optional[str] = None, namespace_client_properties: Optional[Dict[str, str]] = None, + _inner: Optional[AsyncConnection] = None, + _route_pushdown_to_rust: Optional[bool] = None, ): """ Initialize a namespace-based LanceDB connection. @@ -449,26 +458,36 @@ class LanceNamespaceDBConnection(DBConnection): # ``build_namespace_natively``), the underlying Rust table performs # QueryTable pushdown through the read-freshness context provider, which # the pure-Python ``query_table`` path bypasses. - self._route_pushdown_to_rust = _builds_namespace_natively( - namespace_client_impl, namespace_client_properties - ) - self._inner = AsyncConnection( - _connect_namespace_client( - namespace_client, - read_consistency_interval=( - read_consistency_interval.total_seconds() - if read_consistency_interval is not None - else None - ), - storage_options=self.storage_options or None, - session=session, - namespace_client_pushdown_operations=( - list(self._namespace_client_pushdown_operations) - ), - namespace_client_impl=namespace_client_impl, - namespace_client_properties=namespace_client_properties, + self._route_pushdown_to_rust = ( + _route_pushdown_to_rust + if _route_pushdown_to_rust is not None + else _builds_namespace_natively( + namespace_client_impl, namespace_client_properties ) ) + if _inner is not None: + self._inner = _inner + else: + if namespace_client is None: + raise ValueError("namespace_client is required without a native _inner") + self._inner = AsyncConnection( + _connect_namespace_client( + namespace_client, + read_consistency_interval=( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ), + storage_options=self.storage_options or None, + session=session, + namespace_client_pushdown_operations=( + list(self._namespace_client_pushdown_operations) + ), + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, + ) + ) + self._uri = self._inner.uri @override def serialize(self) -> str: @@ -514,11 +533,11 @@ class LanceNamespaceDBConnection(DBConnection): ) if namespace_path is None: namespace_path = [] - request = ListTablesRequest( - id=namespace_path, page_token=page_token, limit=limit + return LOOP.run( + self._inner.table_names( + namespace_path=namespace_path, start_after=page_token, limit=limit + ) ) - response = self._namespace_client.list_tables(request) - return response.tables if response.tables else [] @override def create_table( @@ -589,8 +608,8 @@ class LanceNamespaceDBConnection(DBConnection): index_cache_size=index_cache_size, ) ) - except RuntimeError as e: - if "Table not found" in str(e): + except (RuntimeError, ValueError) as e: + if "Table not found" in str(e) or "was not found" in str(e): table_id = namespace_path + [name] raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}") raise @@ -612,12 +631,9 @@ class LanceNamespaceDBConnection(DBConnection): @override def drop_table(self, name: str, namespace_path: Optional[List[str]] = None): - # Use namespace drop_table directly if namespace_path is None: namespace_path = [] - table_id = namespace_path + [name] - request = DropTableRequest(id=table_id) - self._namespace_client.drop_table(request) + LOOP.run(self._inner.drop_table(name, namespace_path=namespace_path)) @override def rename_table( @@ -631,14 +647,19 @@ class LanceNamespaceDBConnection(DBConnection): cur_namespace_path = [] if new_namespace_path is None: new_namespace_path = [] - cur_table_id = cur_namespace_path + [cur_name] - new_namespace_id = new_namespace_path if new_namespace_path else None - request = RenameTableRequest( - id=cur_table_id, - new_table_name=new_name, - new_namespace_id=new_namespace_id, - ) - self._namespace_client.rename_table(request) + try: + LOOP.run( + self._inner.rename_table( + cur_name, + new_name, + cur_namespace_path=cur_namespace_path, + new_namespace_path=new_namespace_path, + ) + ) + except RuntimeError as e: + if "rename_table not implemented" in str(e): + raise NotImplementedError("rename_table not implemented") from e + raise @override def drop_database(self): @@ -650,8 +671,7 @@ class LanceNamespaceDBConnection(DBConnection): def drop_all_tables(self, namespace_path: Optional[List[str]] = None): if namespace_path is None: namespace_path = [] - for table_name in self.table_names(namespace_path=namespace_path): - self.drop_table(table_name, namespace_path=namespace_path) + LOOP.run(self._inner.drop_all_tables(namespace_path=namespace_path)) @override def list_namespaces( @@ -681,13 +701,10 @@ class LanceNamespaceDBConnection(DBConnection): """ if namespace_path is None: namespace_path = [] - request = ListNamespacesRequest( - id=namespace_path, page_token=page_token, limit=limit - ) - response = self._namespace_client.list_namespaces(request) - return ListNamespacesResponse( - namespaces=response.namespaces if response.namespaces else [], - page_token=response.page_token, + return LOOP.run( + self._inner.list_namespaces( + namespace_path=namespace_path, page_token=page_token, limit=limit + ) ) @override @@ -715,14 +732,12 @@ class LanceNamespaceDBConnection(DBConnection): CreateNamespaceResponse Response containing the properties of the created namespace. """ - request = CreateNamespaceRequest( - id=namespace_path, - mode=_normalize_create_namespace_mode(mode), - properties=properties, - ) - response = self._namespace_client.create_namespace(request) - return CreateNamespaceResponse( - properties=response.properties if hasattr(response, "properties") else None + return LOOP.run( + self._inner.create_namespace( + namespace_path=namespace_path, + mode=mode, + properties=properties, + ) ) @override @@ -750,20 +765,18 @@ class LanceNamespaceDBConnection(DBConnection): DropNamespaceResponse Response containing properties and transaction_id if applicable. """ - request = DropNamespaceRequest( - id=namespace_path, - mode=_normalize_drop_namespace_mode(mode), - behavior=_normalize_drop_namespace_behavior(behavior), - ) - response = self._namespace_client.drop_namespace(request) - return DropNamespaceResponse( - properties=( - response.properties if hasattr(response, "properties") else None - ), - transaction_id=( - response.transaction_id if hasattr(response, "transaction_id") else None - ), - ) + try: + return LOOP.run( + self._inner.drop_namespace( + namespace_path=namespace_path, + mode=mode, + behavior=behavior, + ) + ) + except RuntimeError as e: + if "Namespace not empty" in str(e): + raise NamespaceNotEmptyError(str(e)) from e + raise @override def describe_namespace( @@ -782,11 +795,7 @@ class LanceNamespaceDBConnection(DBConnection): DescribeNamespaceResponse Response containing the namespace properties. """ - request = DescribeNamespaceRequest(id=namespace_path) - response = self._namespace_client.describe_namespace(request) - return DescribeNamespaceResponse( - properties=response.properties if hasattr(response, "properties") else None - ) + return LOOP.run(self._inner.describe_namespace(namespace_path)) @override def list_tables( @@ -816,13 +825,10 @@ class LanceNamespaceDBConnection(DBConnection): """ if namespace_path is None: namespace_path = [] - request = ListTablesRequest( - id=namespace_path, page_token=page_token, limit=limit - ) - response = self._namespace_client.list_tables(request) - return ListTablesResponse( - tables=response.tables if response.tables else [], - page_token=response.page_token, + return LOOP.run( + self._inner.list_tables( + namespace_path=namespace_path, page_token=page_token, limit=limit + ) ) def _lance_table_from_uri( @@ -878,6 +884,18 @@ class LanceNamespaceDBConnection(DBConnection): LanceNamespace The namespace client for this connection. """ + if self._namespace_client is None: + if ( + self._namespace_client_impl is None + or self._namespace_client_properties is None + ): + raise ValueError( + "Cannot construct a Python namespace client without " + "namespace implementation properties" + ) + self._namespace_client = namespace_connect( + self._namespace_client_impl, self._namespace_client_properties + ) return self._namespace_client @@ -1342,6 +1360,33 @@ def connect_namespace( LanceNamespaceDBConnection A namespace-based connection to LanceDB """ + if _supports_native_sync_namespace(namespace_client_impl): + inner = AsyncConnection( + _connect_namespace( + namespace_client_impl, + namespace_client_properties, + read_consistency_interval=( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ), + storage_options=storage_options, + session=session, + namespace_client_pushdown_operations=namespace_client_pushdown_operations, + ) + ) + return LanceNamespaceDBConnection( + namespace_client=None, + read_consistency_interval=read_consistency_interval, + storage_options=storage_options, + session=session, + namespace_client_pushdown_operations=namespace_client_pushdown_operations, + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, + _inner=inner, + _route_pushdown_to_rust=True, + ) + namespace_client = namespace_connect( namespace_client_impl, namespace_client_properties ) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index 25df6b554..28ee37fbf 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -2142,12 +2142,19 @@ class LanceTable(Table): branch = self.current_branch() version = None if branch is not None else self.version - if self._namespace_client is not None: + namespace_client = self._namespace_client + if namespace_client is None: + conn_uri = getattr(self._conn, "uri", "") + if get_uri_scheme(conn_uri) == "namespace": + namespace_client = self._conn.namespace_client() + self._namespace_client = namespace_client + + if namespace_client is not None: table_id = self._namespace_path + [self.name] ds = lance.dataset( version=version, storage_options=self._conn.storage_options, - namespace_client=self._namespace_client, + namespace_client=namespace_client, table_id=table_id, **kwargs, ) diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index 7f20bf459..d0219534a 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -5,6 +5,7 @@ import tempfile import shutil +import importlib import pytest import pyarrow as pa import lancedb @@ -103,6 +104,40 @@ class TestNamespaceConnection: assert isinstance(db, lancedb.LanceNamespaceDBConnection) assert len(list(db.table_names())) == 0 + def test_sync_builtin_namespace_uses_rust_without_python_client(self, monkeypatch): + """Built-in sync namespace connections should not construct or call the + Python namespace client for normal namespace/table management.""" + namespace_module = importlib.import_module("lancedb.namespace") + + def fail_namespace_connect(*args, **kwargs): + raise AssertionError("Python namespace client should not be constructed") + + monkeypatch.setattr( + namespace_module, "namespace_connect", fail_namespace_connect + ) + + db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) + assert isinstance(db, lancedb.LanceNamespaceDBConnection) + assert db._namespace_client is None + assert db._route_pushdown_to_rust is True + + db.create_namespace(["test_ns"]) + assert "test_ns" in db.list_namespaces().namespaces + + schema = pa.schema([pa.field("id", pa.int64())]) + table = db.create_table("test_table", schema=schema, namespace_path=["test_ns"]) + assert table.namespace == ["test_ns"] + assert "test_table" in db.table_names(namespace_path=["test_ns"]) + assert "test_table" in db.list_tables(namespace_path=["test_ns"]).tables + + opened = db.open_table("test_table", namespace_path=["test_ns"]) + assert opened.namespace == ["test_ns"] + + db.drop_table("test_table", namespace_path=["test_ns"]) + assert db.list_tables(namespace_path=["test_ns"]).tables == [] + db.drop_namespace(["test_ns"]) + assert "test_ns" not in db.list_namespaces().namespaces + def test_create_table_through_namespace(self): """Test creating a table through namespace.""" db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) @@ -818,10 +853,11 @@ class TestPushdownOperations: ) assert db._route_pushdown_to_rust is True - def test_route_pushdown_to_rust_false_for_dir(self): - """A non-native (dir) connection keeps the Python pushdown path.""" + def test_route_pushdown_to_rust_for_native_dir(self): + """The sync dir connection is natively built and defers QueryTable + pushdown to Rust.""" db = lancedb.connect_namespace("dir", {"root": self.temp_dir}) - assert db._route_pushdown_to_rust is False + assert db._route_pushdown_to_rust is True def test_async_route_pushdown_to_rust_for_native_rest(self): """The async connection must not silently bypass the read-freshness fix: diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index ff363932e..dae9d5b2e 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -1137,6 +1137,16 @@ def test_namespace_open_table_with_branch_version(tmp_path): assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3 +def test_namespace_root_table_to_lance_uses_namespace_client(tmp_path): + pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace + db = lancedb.connect_namespace("dir", {"root": str(tmp_path)}) + table = db.create_table("t", [{"i": 0}]) + + assert table._namespace_client is None + assert table.to_lance().count_rows() == 1 + assert table._namespace_client is not None + + @pytest.mark.asyncio async def test_async_namespace_open_table_with_branch_version(tmp_path): pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace diff --git a/python/src/connection.rs b/python/src/connection.rs index 1bba3cefc..16bb71f80 100644 --- a/python/src/connection.rs +++ b/python/src/connection.rs @@ -655,6 +655,46 @@ pub fn connect_namespace_client( ))) } +#[pyfunction] +#[pyo3(signature = ( + namespace_client_impl, + namespace_client_properties, + read_consistency_interval=None, + storage_options=None, + session=None, + namespace_client_pushdown_operations=None, +))] +#[allow(clippy::too_many_arguments)] +pub fn connect_namespace( + namespace_client_impl: String, + namespace_client_properties: HashMap, + read_consistency_interval: Option, + storage_options: Option>, + session: Option, + namespace_client_pushdown_operations: Option>, +) -> PyResult { + let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64); + let namespace_client_pushdown_operations = + parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?; + + let mut builder = + lancedb::connect_namespace(&namespace_client_impl, namespace_client_properties) + .pushdown_operations(namespace_client_pushdown_operations); + if let Some(storage_options) = storage_options { + builder = builder.storage_options(storage_options); + } + if let Some(read_consistency_interval) = read_consistency_interval { + builder = builder.read_consistency_interval(read_consistency_interval); + } + if let Some(session) = session { + builder = builder.session(session.inner.clone()); + } + + Ok(Connection::new( + crate::runtime::block_on(builder.execute()).infer_error()?, + )) +} + /// Whether to build the namespace natively (from impl + properties) instead of /// wrapping a pre-built client. Native construction is required for the /// read-freshness provider to be installed diff --git a/python/src/lib.rs b/python/src/lib.rs index 72043c484..5f3039e25 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright The LanceDB Authors use arrow::RecordBatchStream; -use connection::{Connection, connect, connect_namespace_client}; +use connection::{Connection, connect, connect_namespace, connect_namespace_client}; use env_logger::Env; use expr::{PyExpr, expr_col, expr_func, expr_lit}; use index::IndexConfig; @@ -62,6 +62,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_function(wrap_pyfunction!(connect, m)?)?; + m.add_function(wrap_pyfunction!(connect_namespace, m)?)?; m.add_function(wrap_pyfunction!(connect_namespace_client, m)?)?; m.add_function(wrap_pyfunction!(permutation::async_permutation_builder, m)?)?; m.add_function(wrap_pyfunction!(util::validate_table_name, m)?)?;