diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index 21dc49ecf..b07d409eb 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -591,12 +591,10 @@ class LanceDBConnection(DBConnection): storage_options: Optional[Dict[str, str]] = None, session: Optional[Session] = None, _inner: Optional[LanceDbConnection] = None, - _is_temporary_location_connection: bool = False, ): if _inner is not None: self._conn = _inner self._cached_namespace_client = None - self._is_temporary_location_connection = False return if not isinstance(uri, Path): @@ -645,7 +643,6 @@ class LanceDBConnection(DBConnection): self.storage_options = storage_options self._conn = AsyncConnection(LOOP.run(do_connect())) self._cached_namespace_client: Optional[LanceNamespace] = None - self._is_temporary_location_connection = _is_temporary_location_connection @property def read_consistency_interval(self) -> Optional[timedelta]: @@ -916,12 +913,6 @@ class LanceDBConnection(DBConnection): def _namespace_conn(self) -> DBConnection: """Return a LanceNamespaceDBConnection backed by this connection's directory namespace. Used to delegate child-namespace operations.""" - if self._is_temporary_location_connection: - raise RuntimeError( - "Temporary location connections cannot derive nested namespace " - "connections. Use the original namespace connection instead." - ) - from lancedb.namespace import LanceNamespaceDBConnection return LanceNamespaceDBConnection( diff --git a/python/python/lancedb/namespace.py b/python/python/lancedb/namespace.py index a26e13071..332b57398 100644 --- a/python/python/lancedb/namespace.py +++ b/python/python/lancedb/namespace.py @@ -26,7 +26,7 @@ from datetime import timedelta import pyarrow as pa from lancedb.background_loop import LOOP -from lancedb.db import DBConnection, LanceDBConnection +from lancedb.db import AsyncConnection, DBConnection from lancedb.namespace_utils import ( _normalize_create_namespace_mode, _normalize_drop_namespace_mode, @@ -52,10 +52,38 @@ from lance_namespace import ( ) from lancedb.table import AsyncTable, LanceTable, Table from lancedb.util import validate_table_name -from lancedb.common import DATA +from lancedb.common import DATA, sanitize_uri from lancedb.pydantic import LanceModel from lancedb.embeddings import EmbeddingFunctionConfig -from ._lancedb import Session +from ._lancedb import Session, connect as lancedb_connect + + +def _make_temp_async_connection( + uri: str, + read_consistency_interval: Optional[timedelta], + storage_options: Optional[Dict[str, str]], + session: Optional[Session], +) -> AsyncConnection: + read_consistency_interval_secs = ( + read_consistency_interval.total_seconds() + if read_consistency_interval is not None + else None + ) + + async def do_connect(): + return await lancedb_connect( + sanitize_uri(uri), + None, + None, + None, + read_consistency_interval_secs, + None, + storage_options, + session, + ) + + return AsyncConnection(LOOP.run(do_connect())) + from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField @@ -562,29 +590,23 @@ class LanceNamespaceDBConnection(DBConnection): # Step 2: Create the dataset at the namespace-declared physical location. # # The namespace has already resolved the logical table identifier - # (namespace_path + name) to an exact storage location. We create a - # temporary LanceDBConnection rooted at that location so we can reuse the - # standard local LanceTable.create logic for the actual write. - # TODO: Refactor the local/namespace create path so we do not need this - # temporary connection trick just to reuse the existing LanceTable.create - # implementation. + # (namespace_path + name) to an exact storage location. Create a + # short-lived AsyncConnection rooted at that physical location and use it + # only to perform the low-level dataset write. # - # This temporary connection is *not* a logical database root. If we were - # to pass the original namespace_path through to LanceTable.create, the - # lower layers would try to resolve the namespace again underneath an - # already-resolved table location. We therefore pass namespace_path=[] to - # the low-level create call and restore the logical namespace metadata on - # the returned Python wrapper after the write succeeds. - temp_conn = LanceDBConnection( - location, # Use the actual location as the connection URI - read_consistency_interval=self.read_consistency_interval, - storage_options=merged_storage_options, - session=self.session, - _is_temporary_location_connection=True, + # The sync LanceTable wrapper that we return should still be bound to the + # original namespace connection so that namespace-aware operations keep + # using the logical table id, namespace client, and connection storage + # options. + temp_async_conn = _make_temp_async_connection( + location, + self.read_consistency_interval, + merged_storage_options, + self.session, ) async_table = LOOP.run( - temp_conn._conn.create_table( + temp_async_conn.create_table( name, data, schema=schema, @@ -601,7 +623,7 @@ class LanceNamespaceDBConnection(DBConnection): ) return LanceTable( - temp_conn, + self, name, namespace_path=namespace_path, location=location, @@ -922,28 +944,24 @@ class LanceNamespaceDBConnection(DBConnection): ) -> LanceTable: # Open a table directly from the namespace-resolved physical location. # - # As in the create path above, the temporary connection is rooted at the - # concrete table location returned by the namespace. The logical - # namespace_path has already been resolved, so we must not reapply it when - # opening the table. We open with namespace_path=[] and then restore the - # logical namespace metadata on the returned Python wrapper. - # TODO: Refactor the local/namespace open path so we do not need this - # temporary connection trick just to reuse the existing LanceTable.open - # implementation. + # The namespace has already resolved the logical table identifier to a + # concrete storage location. Create a short-lived AsyncConnection rooted + # at that physical location and use it only for the low-level open. # - # Note: storage_options should already be merged by the caller. + # The returned sync LanceTable remains bound to the original namespace + # connection so that logical namespace metadata and namespace-aware helper + # behavior remain intact. if namespace_path is None: namespace_path = [] - temp_conn = LanceDBConnection( - table_uri, # Use the table location as the connection URI - read_consistency_interval=self.read_consistency_interval, - storage_options=storage_options if storage_options is not None else {}, - session=self.session, - _is_temporary_location_connection=True, + temp_async_conn = _make_temp_async_connection( + table_uri, + self.read_consistency_interval, + storage_options if storage_options is not None else {}, + self.session, ) async_table = LOOP.run( - temp_conn._conn.open_table( + temp_async_conn.open_table( name, namespace_path=[], storage_options=storage_options, @@ -955,7 +973,7 @@ class LanceNamespaceDBConnection(DBConnection): ) return LanceTable( - temp_conn, + self, name, namespace_path=namespace_path, location=table_uri, @@ -1138,19 +1156,18 @@ class AsyncLanceNamespaceDBConnection: if namespace_storage_options: merged_storage_options.update(namespace_storage_options) - # Step 2: Create the dataset through the temporary connection from a - # thread, without re-entering LanceTable.create's shared path. + # Step 2: Create the dataset through a short-lived AsyncConnection + # rooted at the namespace-declared physical location. def _create_table(): - temp_conn = LanceDBConnection( + temp_async_conn = _make_temp_async_connection( location, - read_consistency_interval=self.read_consistency_interval, - storage_options=merged_storage_options, - session=self.session, - _is_temporary_location_connection=True, + self.read_consistency_interval, + merged_storage_options, + self.session, ) return LOOP.run( - temp_conn._conn.create_table( + temp_async_conn.create_table( name, data, schema=schema, @@ -1249,19 +1266,18 @@ class AsyncLanceNamespaceDBConnection: # Convert None to False since we already have the answer from describe_table. managed_versioning = response.managed_versioning is True - # Open the table from the namespace-resolved physical location in a - # thread, without re-entering LanceTable.open's shared path. + # Open the table from the namespace-resolved physical location through a + # short-lived AsyncConnection. def _open_table(): - temp_conn = LanceDBConnection( + temp_async_conn = _make_temp_async_connection( response.location, - read_consistency_interval=self.read_consistency_interval, - storage_options=merged_storage_options, - session=self.session, - _is_temporary_location_connection=True, + self.read_consistency_interval, + merged_storage_options, + self.session, ) return LOOP.run( - temp_conn._conn.open_table( + temp_async_conn.open_table( name, namespace_path=[], storage_options=merged_storage_options,