mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 10:30:40 +00:00
fix(python): handle namespace tables via resolved locations
This commit is contained in:
@@ -25,6 +25,7 @@ if TYPE_CHECKING:
|
||||
from datetime import timedelta
|
||||
import pyarrow as pa
|
||||
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.db import DBConnection, LanceDBConnection
|
||||
from lancedb.namespace_utils import (
|
||||
_normalize_create_namespace_mode,
|
||||
@@ -582,31 +583,33 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
_is_temporary_location_connection=True,
|
||||
)
|
||||
|
||||
# Note: storage_options_provider is auto-created in Rust from namespace_client
|
||||
tbl = LanceTable.create(
|
||||
async_table = LOOP.run(
|
||||
temp_conn._conn.create_table(
|
||||
name,
|
||||
data,
|
||||
schema=schema,
|
||||
mode=mode,
|
||||
exist_ok=exist_ok,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
embedding_functions=embedding_functions,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
location=location,
|
||||
namespace_client=self._namespace_client,
|
||||
)
|
||||
)
|
||||
|
||||
return LanceTable(
|
||||
temp_conn,
|
||||
name,
|
||||
data,
|
||||
schema,
|
||||
mode=mode,
|
||||
exist_ok=exist_ok,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
embedding_functions=embedding_functions,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
namespace_path=namespace_path,
|
||||
location=location,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
_async=async_table,
|
||||
)
|
||||
|
||||
tbl._namespace_path = namespace_path
|
||||
tbl._location = location
|
||||
tbl._namespace_client = self._namespace_client
|
||||
tbl._pushdown_operations = self._namespace_client_pushdown_operations
|
||||
|
||||
return tbl
|
||||
|
||||
def _create_table_server_side(
|
||||
self,
|
||||
name: str,
|
||||
@@ -939,28 +942,28 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
_is_temporary_location_connection=True,
|
||||
)
|
||||
|
||||
# Open the table using the temporary connection with the location parameter
|
||||
# Pass namespace_client to enable managed versioning support and auto-create
|
||||
# storage options provider
|
||||
# Pass managed_versioning to avoid redundant describe_table call
|
||||
# Pass pushdown_operations if configured on this connection
|
||||
table = LanceTable.open(
|
||||
temp_conn,
|
||||
name,
|
||||
namespace_path=[],
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
location=table_uri,
|
||||
namespace_client=None,
|
||||
managed_versioning=False,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
async_table = LOOP.run(
|
||||
temp_conn._conn.open_table(
|
||||
name,
|
||||
namespace_path=[],
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
location=table_uri,
|
||||
namespace_client=None,
|
||||
managed_versioning=False,
|
||||
)
|
||||
)
|
||||
|
||||
table._namespace_path = namespace_path
|
||||
table._location = table_uri
|
||||
table._namespace_client = namespace_client
|
||||
table._pushdown_operations = self._namespace_client_pushdown_operations
|
||||
return table
|
||||
return LanceTable(
|
||||
temp_conn,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
location=table_uri,
|
||||
namespace_client=namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
_async=async_table,
|
||||
)
|
||||
|
||||
@override
|
||||
def namespace_client(self) -> LanceNamespace:
|
||||
@@ -1135,37 +1138,35 @@ class AsyncLanceNamespaceDBConnection:
|
||||
if namespace_storage_options:
|
||||
merged_storage_options.update(namespace_storage_options)
|
||||
|
||||
# Step 2: Create table using LanceTable.create with the location
|
||||
# Run the sync operation in a thread
|
||||
# Step 2: Create the dataset through the temporary connection from a
|
||||
# thread, without re-entering LanceTable.create's shared path.
|
||||
def _create_table():
|
||||
temp_conn = LanceDBConnection(
|
||||
location,
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=merged_storage_options,
|
||||
session=self.session,
|
||||
_is_temporary_location_connection=True,
|
||||
)
|
||||
|
||||
# storage_options_provider is auto-created in Rust from namespace_client
|
||||
return LanceTable.create(
|
||||
temp_conn,
|
||||
name,
|
||||
data,
|
||||
schema,
|
||||
mode=mode,
|
||||
exist_ok=exist_ok,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
embedding_functions=embedding_functions,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
location=location,
|
||||
namespace_client=self._namespace_client,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
return LOOP.run(
|
||||
temp_conn._conn.create_table(
|
||||
name,
|
||||
data,
|
||||
schema=schema,
|
||||
mode=mode,
|
||||
exist_ok=exist_ok,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
embedding_functions=embedding_functions,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
location=location,
|
||||
namespace_client=self._namespace_client,
|
||||
)
|
||||
)
|
||||
|
||||
lance_table = await asyncio.to_thread(_create_table)
|
||||
# Get the underlying async table from LanceTable
|
||||
return lance_table._table
|
||||
return await asyncio.to_thread(_create_table)
|
||||
|
||||
async def _create_table_server_side(
|
||||
self,
|
||||
@@ -1249,9 +1250,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
managed_versioning = response.managed_versioning is True
|
||||
|
||||
# Open the table from the namespace-resolved physical location in a
|
||||
# thread. As in the sync helper above, use a temporary location-rooted
|
||||
# connection, avoid reapplying namespace_path during the low-level open,
|
||||
# and restore the logical namespace metadata on the Python wrapper.
|
||||
# thread, without re-entering LanceTable.open's shared path.
|
||||
def _open_table():
|
||||
temp_conn = LanceDBConnection(
|
||||
response.location,
|
||||
@@ -1261,26 +1260,19 @@ class AsyncLanceNamespaceDBConnection:
|
||||
_is_temporary_location_connection=True,
|
||||
)
|
||||
|
||||
table = LanceTable.open(
|
||||
temp_conn,
|
||||
name,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
location=response.location,
|
||||
namespace_client=None,
|
||||
managed_versioning=False,
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
return LOOP.run(
|
||||
temp_conn._conn.open_table(
|
||||
name,
|
||||
namespace_path=[],
|
||||
storage_options=merged_storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
location=response.location,
|
||||
namespace_client=None,
|
||||
managed_versioning=False,
|
||||
)
|
||||
)
|
||||
|
||||
table._namespace_path = namespace_path
|
||||
table._location = response.location
|
||||
table._namespace_client = self._namespace_client
|
||||
table._pushdown_operations = self._namespace_client_pushdown_operations
|
||||
return table
|
||||
|
||||
lance_table = await asyncio.to_thread(_open_table)
|
||||
return lance_table._table
|
||||
return await asyncio.to_thread(_open_table)
|
||||
|
||||
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
"""Drop a table from the namespace."""
|
||||
|
||||
Reference in New Issue
Block a user