refactor: delegate child namespace ops to LanceNamespaceDBConnection

Instead of reimplementing namespace logic (describe_table, merge
storage_options, etc.) in LanceDBConnection, delegate child namespace
operations to a LanceNamespaceDBConnection via _namespace_conn().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jack Ye
2026-04-11 21:29:37 -07:00
parent e3893dacf8
commit fc2a9726b2

View File

@@ -716,14 +716,12 @@ class LanceDBConnection(DBConnection):
ListNamespacesResponse
Response containing namespace names and optional page_token for pagination.
"""
from lance_namespace import ListNamespacesRequest
if namespace_path is None:
namespace_path = []
return self.namespace_client().list_namespaces(
ListNamespacesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
return self._namespace_conn().list_namespaces(
namespace_path=namespace_path,
page_token=page_token,
limit=limit,
)
@override
@@ -733,32 +731,11 @@ class LanceDBConnection(DBConnection):
mode: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
) -> CreateNamespaceResponse:
"""Create a new namespace.
Parameters
----------
namespace_path: List[str]
The namespace identifier to create.
mode: str, optional
Creation mode - "create" (fail if exists), "exist_ok" (skip if exists),
or "overwrite" (replace if exists). Case insensitive.
properties: Dict[str, str], optional
Properties to set on the namespace.
Returns
-------
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
from lance_namespace import CreateNamespaceRequest
from lancedb.namespace_utils import _normalize_create_namespace_mode
req = CreateNamespaceRequest(
id=namespace_path,
mode=_normalize_create_namespace_mode(mode),
return self._namespace_conn().create_namespace(
namespace_path=namespace_path,
mode=mode,
properties=properties,
)
return self.namespace_client().create_namespace(req)
@override
def drop_namespace(
@@ -767,56 +744,18 @@ class LanceDBConnection(DBConnection):
mode: Optional[str] = None,
behavior: Optional[str] = None,
) -> DropNamespaceResponse:
"""Drop a namespace.
Parameters
----------
namespace_path: List[str]
The namespace identifier to drop.
mode: str, optional
Whether to skip if not exists ("SKIP") or fail ("FAIL"). Case insensitive.
behavior: str, optional
Whether to restrict drop if not empty ("RESTRICT") or cascade ("CASCADE").
Case insensitive.
Returns
-------
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
from lance_namespace import DropNamespaceRequest
from lancedb.namespace_utils import (
_normalize_drop_namespace_behavior,
_normalize_drop_namespace_mode,
return self._namespace_conn().drop_namespace(
namespace_path=namespace_path,
mode=mode,
behavior=behavior,
)
req = DropNamespaceRequest(
id=namespace_path,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
return self.namespace_client().drop_namespace(req)
@override
def describe_namespace(
self, namespace_path: List[str]
) -> DescribeNamespaceResponse:
"""Describe a namespace.
Parameters
----------
namespace_path: List[str]
The namespace identifier to describe.
Returns
-------
DescribeNamespaceResponse
Response containing the namespace properties.
"""
from lance_namespace import DescribeNamespaceRequest
return self.namespace_client().describe_namespace(
DescribeNamespaceRequest(id=namespace_path)
return self._namespace_conn().describe_namespace(
namespace_path=namespace_path,
)
@override
@@ -847,12 +786,10 @@ class LanceDBConnection(DBConnection):
if namespace_path is None:
namespace_path = []
if namespace_path:
from lance_namespace import ListTablesRequest
return self.namespace_client().list_tables(
ListTablesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
return self._namespace_conn().list_tables(
namespace_path=namespace_path,
page_token=page_token,
limit=limit,
)
return LOOP.run(
self._conn.list_tables(
@@ -943,7 +880,7 @@ class LanceDBConnection(DBConnection):
validate_table_name(name)
if namespace_path:
return self._create_table_via_namespace(
return self._namespace_conn().create_table(
name,
data=data,
schema=schema,
@@ -973,47 +910,18 @@ class LanceDBConnection(DBConnection):
)
return tbl
def _create_table_via_namespace(
self,
name: str,
*,
data: Optional[DATA] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
mode: str = "create",
exist_ok: bool = False,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
namespace_path: List[str],
storage_options: Optional[Dict[str, str]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> LanceTable:
"""Create a table through the directory namespace client."""
def _namespace_conn(self) -> "LanceNamespaceDBConnection":
"""Return a LanceNamespaceDBConnection backed by this connection's
directory namespace. Used to delegate child-namespace operations."""
from lancedb.namespace import LanceNamespaceDBConnection
ns_client = self.namespace_client()
ns_conn = LanceNamespaceDBConnection(
ns_client,
return LanceNamespaceDBConnection(
self.namespace_client(),
read_consistency_interval=self.read_consistency_interval,
storage_options=self.storage_options,
namespace_client_impl=None,
namespace_client_properties=None,
)
return ns_conn.create_table(
name,
data=data,
schema=schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace_path=namespace_path,
storage_options=storage_options,
data_storage_version=data_storage_version,
enable_v2_manifest_paths=enable_v2_manifest_paths,
)
@override
def open_table(
@@ -1052,7 +960,7 @@ class LanceDBConnection(DBConnection):
)
if namespace_path:
return self._open_table_via_namespace(
return self._namespace_conn().open_table(
name,
namespace_path=namespace_path,
storage_options=storage_options,
@@ -1067,45 +975,6 @@ class LanceDBConnection(DBConnection):
index_cache_size=index_cache_size,
)
def _open_table_via_namespace(
self,
name: str,
*,
namespace_path: List[str],
storage_options: Optional[Dict[str, str]] = None,
index_cache_size: Optional[int] = None,
) -> LanceTable:
"""Open a table through the directory namespace client."""
from lance_namespace import DescribeTableRequest
ns_client = self.namespace_client()
table_id = namespace_path + [name]
response = ns_client.describe_table(DescribeTableRequest(id=table_id))
merged = dict(self.storage_options or {})
if storage_options:
merged.update(storage_options)
if response.storage_options:
merged.update(response.storage_options)
managed_versioning = response.managed_versioning is True
temp_conn = LanceDBConnection(
response.location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged or None,
)
return LanceTable.open(
temp_conn,
name,
namespace_path=namespace_path,
storage_options=merged or None,
index_cache_size=index_cache_size,
location=response.location,
namespace_client=ns_client,
managed_versioning=managed_versioning,
)
def clone_table(
self,
target_table_name: str,
@@ -1183,11 +1052,7 @@ class LanceDBConnection(DBConnection):
if namespace_path is None:
namespace_path = []
if namespace_path:
from lance_namespace import DropTableRequest
self.namespace_client().drop_table(
DropTableRequest(id=namespace_path + [name])
)
self._namespace_conn().drop_table(name, namespace_path=namespace_path)
return
LOOP.run(
self._conn.drop_table(