diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 6e0c94fb5..2759f6e43 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -45,11 +45,6 @@ from lancedb._lancedb import ( ) from lancedb.background_loop import LOOP from lancedb.db import AsyncConnection, DBConnection -from lancedb.namespace_utils import ( - _normalize_create_namespace_mode, - _normalize_drop_namespace_mode, - _normalize_drop_namespace_behavior, -) from lance_namespace import ( LanceNamespace, connect as namespace_connect, @@ -58,13 +53,6 @@ from lance_namespace import ( DropNamespaceResponse, ListNamespacesResponse, ListTablesResponse, - ListTablesRequest, - DescribeNamespaceRequest, - DropTableRequest, - RenameTableRequest, - ListNamespacesRequest, - CreateNamespaceRequest, - DropNamespaceRequest, ) from lancedb.table import AsyncTable, LanceTable, Table from lancedb.util import validate_table_name @@ -389,7 +377,7 @@ def _builds_namespace_natively( return namespace_client_impl == "rest" and bool(namespace_client_properties) -def _supports_native_sync_namespace(namespace_client_impl: str) -> bool: +def _supports_native_namespace(namespace_client_impl: str) -> bool: return namespace_client_impl in {"dir", "rest"} @@ -412,7 +400,6 @@ class LanceNamespaceDBConnection(DBConnection): namespace_client_impl: Optional[str] = None, namespace_client_properties: Optional[Dict[str, str]] = None, _inner: Optional[AsyncConnection] = None, - _route_pushdown_to_rust: Optional[bool] = None, ): """ Initialize a namespace-based LanceDB connection. @@ -454,16 +441,12 @@ class LanceNamespaceDBConnection(DBConnection): ) self._namespace_client_impl = namespace_client_impl self._namespace_client_properties = namespace_client_properties - # When the namespace client is built natively (see Rust - # ``build_namespace_natively``), the underlying Rust table performs - # QueryTable pushdown through the read-freshness context provider, which - # the pure-Python ``query_table`` path bypasses. - self._route_pushdown_to_rust = ( - _route_pushdown_to_rust - if _route_pushdown_to_rust is not None - else _builds_namespace_natively( - namespace_client_impl, namespace_client_properties - ) + # When the namespace connection or client is built natively in Rust, the + # underlying Rust table performs QueryTable pushdown through the + # read-freshness context provider, which the pure-Python ``query_table`` + # path bypasses. + self._route_pushdown_to_rust = _inner is not None or _builds_namespace_natively( + namespace_client_impl, namespace_client_properties ) if _inner is not None: self._inner = _inner @@ -909,7 +892,7 @@ class AsyncLanceNamespaceDBConnection: def __init__( self, - namespace_client: LanceNamespace, + namespace_client: Optional[LanceNamespace] = None, *, read_consistency_interval: Optional[timedelta] = None, storage_options: Optional[Dict[str, str]] = None, @@ -917,6 +900,7 @@ class AsyncLanceNamespaceDBConnection: namespace_client_pushdown_operations: Optional[List[str]] = None, namespace_client_impl: Optional[str] = None, namespace_client_properties: Optional[Dict[str, str]] = None, + _inner: Optional[AsyncConnection] = None, ): """ Initialize an async namespace-based LanceDB connection. @@ -958,29 +942,35 @@ class AsyncLanceNamespaceDBConnection: ) self._namespace_client_impl = namespace_client_impl self._namespace_client_properties = namespace_client_properties - # See LanceNamespaceDBConnection: when built natively the Rust table runs - # QueryTable pushdown through the read-freshness provider, so defer to it - # rather than the urllib3 client (which omits x-lancedb-min-timestamp). - self._route_pushdown_to_rust = _builds_namespace_natively( + # See LanceNamespaceDBConnection: when Rust owns the namespace + # connection/client, its table performs QueryTable pushdown through the + # read-freshness provider, so defer to it rather than the urllib3 client + # path (which omits x-lancedb-min-timestamp). + self._route_pushdown_to_rust = _inner is not None or _builds_namespace_natively( namespace_client_impl, namespace_client_properties ) - self._inner = AsyncConnection( - _connect_namespace_client( - namespace_client, - read_consistency_interval=( - read_consistency_interval.total_seconds() - if read_consistency_interval is not None - else None - ), - storage_options=self.storage_options or None, - session=session, - namespace_client_pushdown_operations=( - list(self._namespace_client_pushdown_operations) - ), - namespace_client_impl=namespace_client_impl, - namespace_client_properties=namespace_client_properties, + if _inner is not None: + self._inner = _inner + else: + if namespace_client is None: + raise ValueError("namespace_client is required without a native _inner") + self._inner = AsyncConnection( + _connect_namespace_client( + namespace_client, + read_consistency_interval=( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ), + storage_options=self.storage_options or None, + session=session, + namespace_client_pushdown_operations=( + list(self._namespace_client_pushdown_operations) + ), + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, + ) ) - ) async def table_names( self, @@ -1004,11 +994,9 @@ class AsyncLanceNamespaceDBConnection: ) if namespace_path is None: namespace_path = [] - request = ListTablesRequest( - id=namespace_path, page_token=page_token, limit=limit + return await self._inner.table_names( + namespace_path=namespace_path, start_after=page_token, limit=limit ) - response = self._namespace_client.list_tables(request) - return response.tables if response.tables else [] async def create_table( self, @@ -1071,8 +1059,8 @@ class AsyncLanceNamespaceDBConnection: storage_options=storage_options, index_cache_size=index_cache_size, ) - except RuntimeError as e: - if "Table not found" in str(e): + except (RuntimeError, ValueError) as e: + if "Table not found" in str(e) or "was not found" in str(e): table_id = namespace_path + [name] raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}") raise @@ -1093,9 +1081,7 @@ class AsyncLanceNamespaceDBConnection: """Drop a table from the namespace.""" if namespace_path is None: namespace_path = [] - table_id = namespace_path + [name] - request = DropTableRequest(id=table_id) - self._namespace_client.drop_table(request) + await self._inner.drop_table(name, namespace_path=namespace_path) async def rename_table( self, @@ -1109,14 +1095,17 @@ class AsyncLanceNamespaceDBConnection: cur_namespace_path = [] if new_namespace_path is None: new_namespace_path = [] - cur_table_id = cur_namespace_path + [cur_name] - new_namespace_id = new_namespace_path if new_namespace_path else None - request = RenameTableRequest( - id=cur_table_id, - new_table_name=new_name, - new_namespace_id=new_namespace_id, - ) - self._namespace_client.rename_table(request) + try: + await self._inner.rename_table( + cur_name, + new_name, + cur_namespace_path=cur_namespace_path, + new_namespace_path=new_namespace_path, + ) + except RuntimeError as e: + if "rename_table not implemented" in str(e): + raise NotImplementedError("rename_table not implemented") from e + raise async def drop_database(self): """Deprecated method.""" @@ -1128,9 +1117,7 @@ class AsyncLanceNamespaceDBConnection: """Drop all tables in the namespace.""" if namespace_path is None: namespace_path = [] - table_names = await self.table_names(namespace_path=namespace_path) - for table_name in table_names: - await self.drop_table(table_name, namespace_path=namespace_path) + await self._inner.drop_all_tables(namespace_path=namespace_path) async def list_namespaces( self, @@ -1159,13 +1146,8 @@ class AsyncLanceNamespaceDBConnection: """ if namespace_path is None: namespace_path = [] - request = ListNamespacesRequest( - id=namespace_path, page_token=page_token, limit=limit - ) - response = self._namespace_client.list_namespaces(request) - return ListNamespacesResponse( - namespaces=response.namespaces if response.namespaces else [], - page_token=response.page_token, + return await self._inner.list_namespaces( + namespace_path=namespace_path, page_token=page_token, limit=limit ) async def create_namespace( @@ -1192,15 +1174,11 @@ class AsyncLanceNamespaceDBConnection: CreateNamespaceResponse Response containing the properties of the created namespace. """ - request = CreateNamespaceRequest( - id=namespace_path, - mode=_normalize_create_namespace_mode(mode), + return await self._inner.create_namespace( + namespace_path=namespace_path, + mode=mode, properties=properties, ) - response = self._namespace_client.create_namespace(request) - return CreateNamespaceResponse( - properties=response.properties if hasattr(response, "properties") else None - ) async def drop_namespace( self, @@ -1226,20 +1204,16 @@ class AsyncLanceNamespaceDBConnection: DropNamespaceResponse Response containing properties and transaction_id if applicable. """ - request = DropNamespaceRequest( - id=namespace_path, - mode=_normalize_drop_namespace_mode(mode), - behavior=_normalize_drop_namespace_behavior(behavior), - ) - response = self._namespace_client.drop_namespace(request) - return DropNamespaceResponse( - properties=( - response.properties if hasattr(response, "properties") else None - ), - transaction_id=( - response.transaction_id if hasattr(response, "transaction_id") else None - ), - ) + try: + return await self._inner.drop_namespace( + namespace_path=namespace_path, + mode=mode, + behavior=behavior, + ) + except RuntimeError as e: + if "Namespace not empty" in str(e): + raise NamespaceNotEmptyError(str(e)) from e + raise async def describe_namespace( self, namespace_path: List[str] @@ -1257,11 +1231,7 @@ class AsyncLanceNamespaceDBConnection: DescribeNamespaceResponse Response containing the namespace properties. """ - request = DescribeNamespaceRequest(id=namespace_path) - response = self._namespace_client.describe_namespace(request) - return DescribeNamespaceResponse( - properties=response.properties if hasattr(response, "properties") else None - ) + return await self._inner.describe_namespace(namespace_path) async def list_tables( self, @@ -1290,13 +1260,8 @@ class AsyncLanceNamespaceDBConnection: """ if namespace_path is None: namespace_path = [] - request = ListTablesRequest( - id=namespace_path, page_token=page_token, limit=limit - ) - response = self._namespace_client.list_tables(request) - return ListTablesResponse( - tables=response.tables if response.tables else [], - page_token=response.page_token, + return await self._inner.list_tables( + namespace_path=namespace_path, page_token=page_token, limit=limit ) async def namespace_client(self) -> LanceNamespace: @@ -1310,6 +1275,18 @@ class AsyncLanceNamespaceDBConnection: LanceNamespace The namespace client for this connection. """ + if self._namespace_client is None: + if ( + self._namespace_client_impl is None + or self._namespace_client_properties is None + ): + raise ValueError( + "Cannot construct a Python namespace client without " + "namespace implementation properties" + ) + self._namespace_client = namespace_connect( + self._namespace_client_impl, self._namespace_client_properties + ) return self._namespace_client @@ -1360,7 +1337,7 @@ def connect_namespace( LanceNamespaceDBConnection A namespace-based connection to LanceDB """ - if _supports_native_sync_namespace(namespace_client_impl): + if _supports_native_namespace(namespace_client_impl): inner = AsyncConnection( _connect_namespace( namespace_client_impl, @@ -1384,7 +1361,6 @@ def connect_namespace( namespace_client_impl=namespace_client_impl, namespace_client_properties=namespace_client_properties, _inner=inner, - _route_pushdown_to_rust=True, ) namespace_client = namespace_connect( @@ -1462,6 +1438,32 @@ def connect_namespace_async( ... tables = await db.table_names() ... table = await db.create_table("my_table", schema=schema) """ + if _supports_native_namespace(namespace_client_impl): + inner = AsyncConnection( + _connect_namespace( + namespace_client_impl, + namespace_client_properties, + read_consistency_interval=( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ), + storage_options=storage_options, + session=session, + namespace_client_pushdown_operations=namespace_client_pushdown_operations, + ) + ) + return AsyncLanceNamespaceDBConnection( + namespace_client=None, + read_consistency_interval=read_consistency_interval, + storage_options=storage_options, + session=session, + namespace_client_pushdown_operations=namespace_client_pushdown_operations, + namespace_client_impl=namespace_client_impl, + namespace_client_properties=namespace_client_properties, + _inner=inner, + ) + namespace_client = namespace_connect( namespace_client_impl, namespace_client_properties ) diff --git a/python/python/tests/test_namespace.py b/python/python/tests/test_namespace.py index d0219534a..f8cbfe92c 100644 --- a/python/python/tests/test_namespace.py +++ b/python/python/tests/test_namespace.py @@ -599,6 +599,61 @@ class TestAsyncNamespaceConnection: table_names = await db.table_names() assert len(list(table_names)) == 0 + async def test_async_builtin_namespace_uses_rust_without_python_client( + self, monkeypatch + ): + """Built-in async namespace connections should not construct or call the + Python namespace client for normal namespace/table management.""" + namespace_module = importlib.import_module("lancedb.namespace") + + def fail_namespace_connect(*args, **kwargs): + raise AssertionError("Python namespace client should not be constructed") + + monkeypatch.setattr( + namespace_module, "namespace_connect", fail_namespace_connect + ) + + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection) + assert db._namespace_client is None + assert db._route_pushdown_to_rust is True + + await db.create_namespace(["test_ns"]) + assert "test_ns" in (await db.list_namespaces()).namespaces + + schema = pa.schema([pa.field("id", pa.int64())]) + table = await db.create_table( + "test_table", schema=schema, namespace_path=["test_ns"] + ) + assert table._namespace_path == ["test_ns"] + assert table._namespace_client is None + assert table._route_pushdown_to_rust is True + assert "test_table" in await db.table_names(namespace_path=["test_ns"]) + assert "test_table" in (await db.list_tables(namespace_path=["test_ns"])).tables + + opened = await db.open_table("test_table", namespace_path=["test_ns"]) + assert opened._namespace_path == ["test_ns"] + + await db.drop_table("test_table", namespace_path=["test_ns"]) + assert (await db.list_tables(namespace_path=["test_ns"])).tables == [] + await db.drop_namespace(["test_ns"]) + assert "test_ns" not in (await db.list_namespaces()).namespaces + + async def test_async_namespace_client_is_lazy(self): + """namespace_client() should still return the backing client on demand.""" + pytest.importorskip("lance") + from lance.namespace import DirectoryNamespace + + db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) + assert db._namespace_client is None + + ns_client = await db.namespace_client() + + assert isinstance(ns_client, DirectoryNamespace) + namespace_id = ns_client.namespace_id().replace("\\\\", "\\") + assert str(self.temp_dir) in namespace_id + assert db._namespace_client is ns_client + # Async connect via namespace helper is not enabled yet. async def test_create_table_async(self): @@ -870,10 +925,11 @@ class TestPushdownOperations: ) assert db._route_pushdown_to_rust is True - def test_async_route_pushdown_to_rust_false_for_dir(self): - """The async non-native (dir) connection keeps the Python pushdown path.""" + def test_async_route_pushdown_to_rust_for_native_dir(self): + """The async dir connection is natively built and defers QueryTable + pushdown to Rust.""" db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir}) - assert db._route_pushdown_to_rust is False + assert db._route_pushdown_to_rust is True def test_lance_table_to_arrow_uses_query_pushdown(self): namespace_client = _NamespaceClient()