mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 02:20:40 +00:00
refactor(python): decouple namespace tables from temp connections
This commit is contained in:
@@ -591,12 +591,10 @@ class LanceDBConnection(DBConnection):
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
_inner: Optional[LanceDbConnection] = None,
|
||||
_is_temporary_location_connection: bool = False,
|
||||
):
|
||||
if _inner is not None:
|
||||
self._conn = _inner
|
||||
self._cached_namespace_client = None
|
||||
self._is_temporary_location_connection = False
|
||||
return
|
||||
|
||||
if not isinstance(uri, Path):
|
||||
@@ -645,7 +643,6 @@ class LanceDBConnection(DBConnection):
|
||||
self.storage_options = storage_options
|
||||
self._conn = AsyncConnection(LOOP.run(do_connect()))
|
||||
self._cached_namespace_client: Optional[LanceNamespace] = None
|
||||
self._is_temporary_location_connection = _is_temporary_location_connection
|
||||
|
||||
@property
|
||||
def read_consistency_interval(self) -> Optional[timedelta]:
|
||||
@@ -916,12 +913,6 @@ class LanceDBConnection(DBConnection):
|
||||
def _namespace_conn(self) -> DBConnection:
|
||||
"""Return a LanceNamespaceDBConnection backed by this connection's
|
||||
directory namespace. Used to delegate child-namespace operations."""
|
||||
if self._is_temporary_location_connection:
|
||||
raise RuntimeError(
|
||||
"Temporary location connections cannot derive nested namespace "
|
||||
"connections. Use the original namespace connection instead."
|
||||
)
|
||||
|
||||
from lancedb.namespace import LanceNamespaceDBConnection
|
||||
|
||||
return LanceNamespaceDBConnection(
|
||||
|
||||
@@ -26,7 +26,7 @@ from datetime import timedelta
|
||||
import pyarrow as pa
|
||||
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.db import DBConnection, LanceDBConnection
|
||||
from lancedb.db import AsyncConnection, DBConnection
|
||||
from lancedb.namespace_utils import (
|
||||
_normalize_create_namespace_mode,
|
||||
_normalize_drop_namespace_mode,
|
||||
@@ -52,10 +52,38 @@ from lance_namespace import (
|
||||
)
|
||||
from lancedb.table import AsyncTable, LanceTable, Table
|
||||
from lancedb.util import validate_table_name
|
||||
from lancedb.common import DATA
|
||||
from lancedb.common import DATA, sanitize_uri
|
||||
from lancedb.pydantic import LanceModel
|
||||
from lancedb.embeddings import EmbeddingFunctionConfig
|
||||
from ._lancedb import Session
|
||||
from ._lancedb import Session, connect as lancedb_connect
|
||||
|
||||
|
||||
def _make_temp_async_connection(
|
||||
uri: str,
|
||||
read_consistency_interval: Optional[timedelta],
|
||||
storage_options: Optional[Dict[str, str]],
|
||||
session: Optional[Session],
|
||||
) -> AsyncConnection:
|
||||
read_consistency_interval_secs = (
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
)
|
||||
|
||||
async def do_connect():
|
||||
return await lancedb_connect(
|
||||
sanitize_uri(uri),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
read_consistency_interval_secs,
|
||||
None,
|
||||
storage_options,
|
||||
session,
|
||||
)
|
||||
|
||||
return AsyncConnection(LOOP.run(do_connect()))
|
||||
|
||||
|
||||
from lance_namespace_urllib3_client.models.json_arrow_schema import JsonArrowSchema
|
||||
from lance_namespace_urllib3_client.models.json_arrow_field import JsonArrowField
|
||||
@@ -562,29 +590,23 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
# Step 2: Create the dataset at the namespace-declared physical location.
|
||||
#
|
||||
# The namespace has already resolved the logical table identifier
|
||||
# (namespace_path + name) to an exact storage location. We create a
|
||||
# temporary LanceDBConnection rooted at that location so we can reuse the
|
||||
# standard local LanceTable.create logic for the actual write.
|
||||
# TODO: Refactor the local/namespace create path so we do not need this
|
||||
# temporary connection trick just to reuse the existing LanceTable.create
|
||||
# implementation.
|
||||
# (namespace_path + name) to an exact storage location. Create a
|
||||
# short-lived AsyncConnection rooted at that physical location and use it
|
||||
# only to perform the low-level dataset write.
|
||||
#
|
||||
# This temporary connection is *not* a logical database root. If we were
|
||||
# to pass the original namespace_path through to LanceTable.create, the
|
||||
# lower layers would try to resolve the namespace again underneath an
|
||||
# already-resolved table location. We therefore pass namespace_path=[] to
|
||||
# the low-level create call and restore the logical namespace metadata on
|
||||
# the returned Python wrapper after the write succeeds.
|
||||
temp_conn = LanceDBConnection(
|
||||
location, # Use the actual location as the connection URI
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=merged_storage_options,
|
||||
session=self.session,
|
||||
_is_temporary_location_connection=True,
|
||||
# The sync LanceTable wrapper that we return should still be bound to the
|
||||
# original namespace connection so that namespace-aware operations keep
|
||||
# using the logical table id, namespace client, and connection storage
|
||||
# options.
|
||||
temp_async_conn = _make_temp_async_connection(
|
||||
location,
|
||||
self.read_consistency_interval,
|
||||
merged_storage_options,
|
||||
self.session,
|
||||
)
|
||||
|
||||
async_table = LOOP.run(
|
||||
temp_conn._conn.create_table(
|
||||
temp_async_conn.create_table(
|
||||
name,
|
||||
data,
|
||||
schema=schema,
|
||||
@@ -601,7 +623,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
return LanceTable(
|
||||
temp_conn,
|
||||
self,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
location=location,
|
||||
@@ -922,28 +944,24 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
) -> LanceTable:
|
||||
# Open a table directly from the namespace-resolved physical location.
|
||||
#
|
||||
# As in the create path above, the temporary connection is rooted at the
|
||||
# concrete table location returned by the namespace. The logical
|
||||
# namespace_path has already been resolved, so we must not reapply it when
|
||||
# opening the table. We open with namespace_path=[] and then restore the
|
||||
# logical namespace metadata on the returned Python wrapper.
|
||||
# TODO: Refactor the local/namespace open path so we do not need this
|
||||
# temporary connection trick just to reuse the existing LanceTable.open
|
||||
# implementation.
|
||||
# The namespace has already resolved the logical table identifier to a
|
||||
# concrete storage location. Create a short-lived AsyncConnection rooted
|
||||
# at that physical location and use it only for the low-level open.
|
||||
#
|
||||
# Note: storage_options should already be merged by the caller.
|
||||
# The returned sync LanceTable remains bound to the original namespace
|
||||
# connection so that logical namespace metadata and namespace-aware helper
|
||||
# behavior remain intact.
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
temp_conn = LanceDBConnection(
|
||||
table_uri, # Use the table location as the connection URI
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=storage_options if storage_options is not None else {},
|
||||
session=self.session,
|
||||
_is_temporary_location_connection=True,
|
||||
temp_async_conn = _make_temp_async_connection(
|
||||
table_uri,
|
||||
self.read_consistency_interval,
|
||||
storage_options if storage_options is not None else {},
|
||||
self.session,
|
||||
)
|
||||
|
||||
async_table = LOOP.run(
|
||||
temp_conn._conn.open_table(
|
||||
temp_async_conn.open_table(
|
||||
name,
|
||||
namespace_path=[],
|
||||
storage_options=storage_options,
|
||||
@@ -955,7 +973,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
return LanceTable(
|
||||
temp_conn,
|
||||
self,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
location=table_uri,
|
||||
@@ -1138,19 +1156,18 @@ class AsyncLanceNamespaceDBConnection:
|
||||
if namespace_storage_options:
|
||||
merged_storage_options.update(namespace_storage_options)
|
||||
|
||||
# Step 2: Create the dataset through the temporary connection from a
|
||||
# thread, without re-entering LanceTable.create's shared path.
|
||||
# Step 2: Create the dataset through a short-lived AsyncConnection
|
||||
# rooted at the namespace-declared physical location.
|
||||
def _create_table():
|
||||
temp_conn = LanceDBConnection(
|
||||
temp_async_conn = _make_temp_async_connection(
|
||||
location,
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=merged_storage_options,
|
||||
session=self.session,
|
||||
_is_temporary_location_connection=True,
|
||||
self.read_consistency_interval,
|
||||
merged_storage_options,
|
||||
self.session,
|
||||
)
|
||||
|
||||
return LOOP.run(
|
||||
temp_conn._conn.create_table(
|
||||
temp_async_conn.create_table(
|
||||
name,
|
||||
data,
|
||||
schema=schema,
|
||||
@@ -1249,19 +1266,18 @@ class AsyncLanceNamespaceDBConnection:
|
||||
# Convert None to False since we already have the answer from describe_table.
|
||||
managed_versioning = response.managed_versioning is True
|
||||
|
||||
# Open the table from the namespace-resolved physical location in a
|
||||
# thread, without re-entering LanceTable.open's shared path.
|
||||
# Open the table from the namespace-resolved physical location through a
|
||||
# short-lived AsyncConnection.
|
||||
def _open_table():
|
||||
temp_conn = LanceDBConnection(
|
||||
temp_async_conn = _make_temp_async_connection(
|
||||
response.location,
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=merged_storage_options,
|
||||
session=self.session,
|
||||
_is_temporary_location_connection=True,
|
||||
self.read_consistency_interval,
|
||||
merged_storage_options,
|
||||
self.session,
|
||||
)
|
||||
|
||||
return LOOP.run(
|
||||
temp_conn._conn.open_table(
|
||||
temp_async_conn.open_table(
|
||||
name,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
|
||||
Reference in New Issue
Block a user