From a0defd448fa9c8303d197526898172186f30cc4f Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Fri, 17 Apr 2026 15:39:43 -0700 Subject: [PATCH] fix(python): handle namespace tables via resolved locations --- python/python/lancedb/namespace.py | 152 ++++++++++++++--------------- 1 file changed, 72 insertions(+), 80 deletions(-) diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index 1eb5fd99d..a26e13071 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: from datetime import timedelta import pyarrow as pa +from lancedb.background_loop import LOOP from lancedb.db import DBConnection, LanceDBConnection from lancedb.namespace_utils import ( _normalize_create_namespace_mode, @@ -582,31 +583,33 @@ class LanceNamespaceDBConnection(DBConnection): _is_temporary_location_connection=True, ) - # Note: storage_options_provider is auto-created in Rust from namespace_client - tbl = LanceTable.create( + async_table = LOOP.run( + temp_conn._conn.create_table( + name, + data, + schema=schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, + embedding_functions=embedding_functions, + namespace_path=[], + storage_options=merged_storage_options, + location=location, + namespace_client=self._namespace_client, + ) + ) + + return LanceTable( temp_conn, name, - data, - schema, - mode=mode, - exist_ok=exist_ok, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - embedding_functions=embedding_functions, - namespace_path=[], - storage_options=merged_storage_options, + namespace_path=namespace_path, location=location, namespace_client=self._namespace_client, pushdown_operations=self._namespace_client_pushdown_operations, + _async=async_table, ) - tbl._namespace_path = namespace_path - tbl._location = location - tbl._namespace_client = self._namespace_client - tbl._pushdown_operations = self._namespace_client_pushdown_operations - - return tbl - def _create_table_server_side( self, name: str, @@ -939,28 +942,28 @@ class LanceNamespaceDBConnection(DBConnection): _is_temporary_location_connection=True, ) - # Open the table using the temporary connection with the location parameter - # Pass namespace_client to enable managed versioning support and auto-create - # storage options provider - # Pass managed_versioning to avoid redundant describe_table call - # Pass pushdown_operations if configured on this connection - table = LanceTable.open( - temp_conn, - name, - namespace_path=[], - storage_options=storage_options, - index_cache_size=index_cache_size, - location=table_uri, - namespace_client=None, - managed_versioning=False, - pushdown_operations=self._namespace_client_pushdown_operations, + async_table = LOOP.run( + temp_conn._conn.open_table( + name, + namespace_path=[], + storage_options=storage_options, + index_cache_size=index_cache_size, + location=table_uri, + namespace_client=None, + managed_versioning=False, + ) ) - table._namespace_path = namespace_path - table._location = table_uri - table._namespace_client = namespace_client - table._pushdown_operations = self._namespace_client_pushdown_operations - return table + return LanceTable( + temp_conn, + name, + namespace_path=namespace_path, + location=table_uri, + namespace_client=namespace_client, + managed_versioning=managed_versioning, + pushdown_operations=self._namespace_client_pushdown_operations, + _async=async_table, + ) @override def namespace_client(self) -> LanceNamespace: @@ -1135,37 +1138,35 @@ class AsyncLanceNamespaceDBConnection: if namespace_storage_options: merged_storage_options.update(namespace_storage_options) - # Step 2: Create table using LanceTable.create with the location - # Run the sync operation in a thread + # Step 2: Create the dataset through the temporary connection from a + # thread, without re-entering LanceTable.create's shared path. def _create_table(): temp_conn = LanceDBConnection( location, read_consistency_interval=self.read_consistency_interval, storage_options=merged_storage_options, session=self.session, + _is_temporary_location_connection=True, ) - # storage_options_provider is auto-created in Rust from namespace_client - return LanceTable.create( - temp_conn, - name, - data, - schema, - mode=mode, - exist_ok=exist_ok, - on_bad_vectors=on_bad_vectors, - fill_value=fill_value, - embedding_functions=embedding_functions, - namespace_path=[], - storage_options=merged_storage_options, - location=location, - namespace_client=self._namespace_client, - pushdown_operations=self._namespace_client_pushdown_operations, + return LOOP.run( + temp_conn._conn.create_table( + name, + data, + schema=schema, + mode=mode, + exist_ok=exist_ok, + on_bad_vectors=on_bad_vectors, + fill_value=fill_value, + embedding_functions=embedding_functions, + namespace_path=[], + storage_options=merged_storage_options, + location=location, + namespace_client=self._namespace_client, + ) ) - lance_table = await asyncio.to_thread(_create_table) - # Get the underlying async table from LanceTable - return lance_table._table + return await asyncio.to_thread(_create_table) async def _create_table_server_side( self, @@ -1249,9 +1250,7 @@ class AsyncLanceNamespaceDBConnection: managed_versioning = response.managed_versioning is True # Open the table from the namespace-resolved physical location in a - # thread. As in the sync helper above, use a temporary location-rooted - # connection, avoid reapplying namespace_path during the low-level open, - # and restore the logical namespace metadata on the Python wrapper. + # thread, without re-entering LanceTable.open's shared path. def _open_table(): temp_conn = LanceDBConnection( response.location, @@ -1261,26 +1260,19 @@ class AsyncLanceNamespaceDBConnection: _is_temporary_location_connection=True, ) - table = LanceTable.open( - temp_conn, - name, - namespace_path=[], - storage_options=merged_storage_options, - index_cache_size=index_cache_size, - location=response.location, - namespace_client=None, - managed_versioning=False, - pushdown_operations=self._namespace_client_pushdown_operations, + return LOOP.run( + temp_conn._conn.open_table( + name, + namespace_path=[], + storage_options=merged_storage_options, + index_cache_size=index_cache_size, + location=response.location, + namespace_client=None, + managed_versioning=False, + ) ) - table._namespace_path = namespace_path - table._location = response.location - table._namespace_client = self._namespace_client - table._pushdown_operations = self._namespace_client_pushdown_operations - return table - - lance_table = await asyncio.to_thread(_open_table) - return lance_table._table + return await asyncio.to_thread(_open_table) async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None): """Drop a table from the namespace."""