mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 10:30:40 +00:00
feat: cache namespace_client and auto-delegate child namespace operations
LanceDBConnection now: - Caches namespace_client() result to avoid repeated DirectoryNamespace builds - Auto-delegates open_table/create_table with non-empty namespace_path through the directory namespace client - Routes create_namespace/drop_namespace/describe_namespace/list_namespaces through the namespace client - Routes list_tables/drop_table for child namespaces through namespace client This enables local storage connections to transparently handle child namespaces like ["__system"] without requiring a separate LanceNamespaceDBConnection. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -596,6 +596,7 @@ class LanceDBConnection(DBConnection):
|
||||
):
|
||||
if _inner is not None:
|
||||
self._conn = _inner
|
||||
self._cached_namespace_client = None
|
||||
return
|
||||
|
||||
if not isinstance(uri, Path):
|
||||
@@ -643,6 +644,7 @@ class LanceDBConnection(DBConnection):
|
||||
# beyond _conn.
|
||||
self.storage_options = storage_options
|
||||
self._conn = AsyncConnection(LOOP.run(do_connect()))
|
||||
self._cached_namespace_client: Optional[LanceNamespace] = None
|
||||
|
||||
@property
|
||||
def read_consistency_interval(self) -> Optional[timedelta]:
|
||||
@@ -714,11 +716,13 @@ class LanceDBConnection(DBConnection):
|
||||
ListNamespacesResponse
|
||||
Response containing namespace names and optional page_token for pagination.
|
||||
"""
|
||||
from lance_namespace import ListNamespacesRequest
|
||||
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
return LOOP.run(
|
||||
self._conn.list_namespaces(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
return self.namespace_client().list_namespaces(
|
||||
ListNamespacesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
@@ -746,11 +750,15 @@ class LanceDBConnection(DBConnection):
|
||||
CreateNamespaceResponse
|
||||
Response containing the properties of the created namespace.
|
||||
"""
|
||||
return LOOP.run(
|
||||
self._conn.create_namespace(
|
||||
namespace_path=namespace_path, mode=mode, properties=properties
|
||||
)
|
||||
from lance_namespace import CreateNamespaceRequest
|
||||
from lancedb.namespace_utils import _normalize_create_namespace_mode
|
||||
|
||||
req = CreateNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_create_namespace_mode(mode),
|
||||
properties=properties,
|
||||
)
|
||||
return self.namespace_client().create_namespace(req)
|
||||
|
||||
@override
|
||||
def drop_namespace(
|
||||
@@ -776,12 +784,19 @@ class LanceDBConnection(DBConnection):
|
||||
DropNamespaceResponse
|
||||
Response containing properties and transaction_id if applicable.
|
||||
"""
|
||||
return LOOP.run(
|
||||
self._conn.drop_namespace(
|
||||
namespace_path=namespace_path, mode=mode, behavior=behavior
|
||||
)
|
||||
from lance_namespace import DropNamespaceRequest
|
||||
from lancedb.namespace_utils import (
|
||||
_normalize_drop_namespace_behavior,
|
||||
_normalize_drop_namespace_mode,
|
||||
)
|
||||
|
||||
req = DropNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_drop_namespace_mode(mode),
|
||||
behavior=_normalize_drop_namespace_behavior(behavior),
|
||||
)
|
||||
return self.namespace_client().drop_namespace(req)
|
||||
|
||||
@override
|
||||
def describe_namespace(
|
||||
self, namespace_path: List[str]
|
||||
@@ -798,7 +813,11 @@ class LanceDBConnection(DBConnection):
|
||||
DescribeNamespaceResponse
|
||||
Response containing the namespace properties.
|
||||
"""
|
||||
return LOOP.run(self._conn.describe_namespace(namespace_path=namespace_path))
|
||||
from lance_namespace import DescribeNamespaceRequest
|
||||
|
||||
return self.namespace_client().describe_namespace(
|
||||
DescribeNamespaceRequest(id=namespace_path)
|
||||
)
|
||||
|
||||
@override
|
||||
def list_tables(
|
||||
@@ -827,6 +846,14 @@ class LanceDBConnection(DBConnection):
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
if namespace_path:
|
||||
from lance_namespace import ListTablesRequest
|
||||
|
||||
return self.namespace_client().list_tables(
|
||||
ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
return LOOP.run(
|
||||
self._conn.list_tables(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
@@ -915,6 +942,22 @@ class LanceDBConnection(DBConnection):
|
||||
raise ValueError("mode must be either 'create' or 'overwrite'")
|
||||
validate_table_name(name)
|
||||
|
||||
if namespace_path:
|
||||
return self._create_table_via_namespace(
|
||||
name,
|
||||
data=data,
|
||||
schema=schema,
|
||||
mode=mode,
|
||||
exist_ok=exist_ok,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
embedding_functions=embedding_functions,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
data_storage_version=data_storage_version,
|
||||
enable_v2_manifest_paths=enable_v2_manifest_paths,
|
||||
)
|
||||
|
||||
tbl = LanceTable.create(
|
||||
self,
|
||||
name,
|
||||
@@ -930,6 +973,48 @@ class LanceDBConnection(DBConnection):
|
||||
)
|
||||
return tbl
|
||||
|
||||
def _create_table_via_namespace(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
data: Optional[DATA] = None,
|
||||
schema: Optional[Union[pa.Schema, LanceModel]] = None,
|
||||
mode: str = "create",
|
||||
exist_ok: bool = False,
|
||||
on_bad_vectors: str = "error",
|
||||
fill_value: float = 0.0,
|
||||
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
|
||||
namespace_path: List[str],
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
data_storage_version: Optional[str] = None,
|
||||
enable_v2_manifest_paths: Optional[bool] = None,
|
||||
) -> LanceTable:
|
||||
"""Create a table through the directory namespace client."""
|
||||
from lancedb.namespace import LanceNamespaceDBConnection
|
||||
|
||||
ns_client = self.namespace_client()
|
||||
ns_conn = LanceNamespaceDBConnection(
|
||||
ns_client,
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=self.storage_options,
|
||||
namespace_client_impl=None,
|
||||
namespace_client_properties=None,
|
||||
)
|
||||
return ns_conn.create_table(
|
||||
name,
|
||||
data=data,
|
||||
schema=schema,
|
||||
mode=mode,
|
||||
exist_ok=exist_ok,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
embedding_functions=embedding_functions,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
data_storage_version=data_storage_version,
|
||||
enable_v2_manifest_paths=enable_v2_manifest_paths,
|
||||
)
|
||||
|
||||
@override
|
||||
def open_table(
|
||||
self,
|
||||
@@ -946,7 +1031,8 @@ class LanceDBConnection(DBConnection):
|
||||
name: str
|
||||
The name of the table.
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from.
|
||||
The namespace to open the table from. When non-empty, the
|
||||
table is resolved through the directory namespace client.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -965,6 +1051,14 @@ class LanceDBConnection(DBConnection):
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
if namespace_path:
|
||||
return self._open_table_via_namespace(
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
|
||||
return LanceTable.open(
|
||||
self,
|
||||
name,
|
||||
@@ -973,6 +1067,45 @@ class LanceDBConnection(DBConnection):
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
|
||||
def _open_table_via_namespace(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
namespace_path: List[str],
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
) -> LanceTable:
|
||||
"""Open a table through the directory namespace client."""
|
||||
from lance_namespace import DescribeTableRequest
|
||||
|
||||
ns_client = self.namespace_client()
|
||||
table_id = namespace_path + [name]
|
||||
response = ns_client.describe_table(DescribeTableRequest(id=table_id))
|
||||
|
||||
merged = dict(self.storage_options or {})
|
||||
if storage_options:
|
||||
merged.update(storage_options)
|
||||
if response.storage_options:
|
||||
merged.update(response.storage_options)
|
||||
|
||||
managed_versioning = response.managed_versioning is True
|
||||
|
||||
temp_conn = LanceDBConnection(
|
||||
response.location,
|
||||
read_consistency_interval=self.read_consistency_interval,
|
||||
storage_options=merged or None,
|
||||
)
|
||||
return LanceTable.open(
|
||||
temp_conn,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=merged or None,
|
||||
index_cache_size=index_cache_size,
|
||||
location=response.location,
|
||||
namespace_client=ns_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
target_table_name: str,
|
||||
@@ -1049,6 +1182,13 @@ class LanceDBConnection(DBConnection):
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
if namespace_path:
|
||||
from lance_namespace import DropTableRequest
|
||||
|
||||
self.namespace_client().drop_table(
|
||||
DropTableRequest(id=namespace_path + [name])
|
||||
)
|
||||
return
|
||||
LOOP.run(
|
||||
self._conn.drop_table(
|
||||
name, namespace_path=namespace_path, ignore_missing=ignore_missing
|
||||
@@ -1100,14 +1240,19 @@ class LanceDBConnection(DBConnection):
|
||||
"""Get the equivalent namespace client for this connection.
|
||||
|
||||
Returns a DirectoryNamespace pointing to the same root with the
|
||||
same storage options.
|
||||
same storage options. The result is cached for the lifetime of
|
||||
this connection.
|
||||
|
||||
Returns
|
||||
-------
|
||||
LanceNamespace
|
||||
The namespace client for this connection.
|
||||
"""
|
||||
return LOOP.run(self._conn.namespace_client())
|
||||
if self._cached_namespace_client is None:
|
||||
self._cached_namespace_client = LOOP.run(
|
||||
self._conn.namespace_client()
|
||||
)
|
||||
return self._cached_namespace_client
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.15.1",
|
||||
|
||||
Reference in New Issue
Block a user