Compare commits

..

2 Commits

Author SHA1 Message Date
Jack Ye
3b70fc4c9d fix(python): route async namespace connections through rust (#3603)
Summary:
- Route built-in async namespace-backed connections through the Rust
namespace connector.
- Delegate async namespace/table management methods to the inner
AsyncConnection while keeping the custom implementation Python-client
fallback.
- Add regressions for the native async dir path and lazy
namespace_client() construction.

Validated locally with targeted namespace/db/table pytest, full
test_namespace.py, ruff, cargo fmt/check/clippy, and cargo test -p
lancedb-python.
2026-06-30 17:03:23 -07:00
Lance Release
3a7b02119b Bump version: 0.31.0-beta.4 → 0.31.0-beta.5 2026-06-30 22:24:56 +00:00
4 changed files with 173 additions and 115 deletions

6
Cargo.lock generated
View File

@@ -5299,7 +5299,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.31.0-beta.4"
version = "0.31.0-beta.5"
dependencies = [
"ahash",
"anyhow",
@@ -5383,7 +5383,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.31.0-beta.4"
version = "0.31.0-beta.5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5408,7 +5408,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.34.0-beta.4"
version = "0.34.0-beta.5"
dependencies = [
"arrow",
"async-trait",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.31.0-beta.4",
"version": "0.31.0-beta.5",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.31.0-beta.4",
"version": "0.31.0-beta.5",
"cpu": [
"x64",
"arm64"

View File

@@ -45,11 +45,6 @@ from lancedb._lancedb import (
)
from lancedb.background_loop import LOOP
from lancedb.db import AsyncConnection, DBConnection
from lancedb.namespace_utils import (
_normalize_create_namespace_mode,
_normalize_drop_namespace_mode,
_normalize_drop_namespace_behavior,
)
from lance_namespace import (
LanceNamespace,
connect as namespace_connect,
@@ -58,13 +53,6 @@ from lance_namespace import (
DropNamespaceResponse,
ListNamespacesResponse,
ListTablesResponse,
ListTablesRequest,
DescribeNamespaceRequest,
DropTableRequest,
RenameTableRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
DropNamespaceRequest,
)
from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name
@@ -389,7 +377,7 @@ def _builds_namespace_natively(
return namespace_client_impl == "rest" and bool(namespace_client_properties)
def _supports_native_sync_namespace(namespace_client_impl: str) -> bool:
def _supports_native_namespace(namespace_client_impl: str) -> bool:
return namespace_client_impl in {"dir", "rest"}
@@ -412,7 +400,6 @@ class LanceNamespaceDBConnection(DBConnection):
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
_inner: Optional[AsyncConnection] = None,
_route_pushdown_to_rust: Optional[bool] = None,
):
"""
Initialize a namespace-based LanceDB connection.
@@ -454,16 +441,12 @@ class LanceNamespaceDBConnection(DBConnection):
)
self._namespace_client_impl = namespace_client_impl
self._namespace_client_properties = namespace_client_properties
# When the namespace client is built natively (see Rust
# ``build_namespace_natively``), the underlying Rust table performs
# QueryTable pushdown through the read-freshness context provider, which
# the pure-Python ``query_table`` path bypasses.
self._route_pushdown_to_rust = (
_route_pushdown_to_rust
if _route_pushdown_to_rust is not None
else _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
)
# When the namespace connection or client is built natively in Rust, the
# underlying Rust table performs QueryTable pushdown through the
# read-freshness context provider, which the pure-Python ``query_table``
# path bypasses.
self._route_pushdown_to_rust = _inner is not None or _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
)
if _inner is not None:
self._inner = _inner
@@ -909,7 +892,7 @@ class AsyncLanceNamespaceDBConnection:
def __init__(
self,
namespace_client: LanceNamespace,
namespace_client: Optional[LanceNamespace] = None,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
@@ -917,6 +900,7 @@ class AsyncLanceNamespaceDBConnection:
namespace_client_pushdown_operations: Optional[List[str]] = None,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
_inner: Optional[AsyncConnection] = None,
):
"""
Initialize an async namespace-based LanceDB connection.
@@ -958,29 +942,35 @@ class AsyncLanceNamespaceDBConnection:
)
self._namespace_client_impl = namespace_client_impl
self._namespace_client_properties = namespace_client_properties
# See LanceNamespaceDBConnection: when built natively the Rust table runs
# QueryTable pushdown through the read-freshness provider, so defer to it
# rather than the urllib3 client (which omits x-lancedb-min-timestamp).
self._route_pushdown_to_rust = _builds_namespace_natively(
# See LanceNamespaceDBConnection: when Rust owns the namespace
# connection/client, its table performs QueryTable pushdown through the
# read-freshness provider, so defer to it rather than the urllib3 client
# path (which omits x-lancedb-min-timestamp).
self._route_pushdown_to_rust = _inner is not None or _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
)
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=self.storage_options or None,
session=session,
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
if _inner is not None:
self._inner = _inner
else:
if namespace_client is None:
raise ValueError("namespace_client is required without a native _inner")
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=self.storage_options or None,
session=session,
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
)
)
)
async def table_names(
self,
@@ -1004,11 +994,9 @@ class AsyncLanceNamespaceDBConnection:
)
if namespace_path is None:
namespace_path = []
request = ListTablesRequest(
id=namespace_path, page_token=page_token, limit=limit
return await self._inner.table_names(
namespace_path=namespace_path, start_after=page_token, limit=limit
)
response = self._namespace_client.list_tables(request)
return response.tables if response.tables else []
async def create_table(
self,
@@ -1071,8 +1059,8 @@ class AsyncLanceNamespaceDBConnection:
storage_options=storage_options,
index_cache_size=index_cache_size,
)
except RuntimeError as e:
if "Table not found" in str(e):
except (RuntimeError, ValueError) as e:
if "Table not found" in str(e) or "was not found" in str(e):
table_id = namespace_path + [name]
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
raise
@@ -1093,9 +1081,7 @@ class AsyncLanceNamespaceDBConnection:
"""Drop a table from the namespace."""
if namespace_path is None:
namespace_path = []
table_id = namespace_path + [name]
request = DropTableRequest(id=table_id)
self._namespace_client.drop_table(request)
await self._inner.drop_table(name, namespace_path=namespace_path)
async def rename_table(
self,
@@ -1109,14 +1095,17 @@ class AsyncLanceNamespaceDBConnection:
cur_namespace_path = []
if new_namespace_path is None:
new_namespace_path = []
cur_table_id = cur_namespace_path + [cur_name]
new_namespace_id = new_namespace_path if new_namespace_path else None
request = RenameTableRequest(
id=cur_table_id,
new_table_name=new_name,
new_namespace_id=new_namespace_id,
)
self._namespace_client.rename_table(request)
try:
await self._inner.rename_table(
cur_name,
new_name,
cur_namespace_path=cur_namespace_path,
new_namespace_path=new_namespace_path,
)
except RuntimeError as e:
if "rename_table not implemented" in str(e):
raise NotImplementedError("rename_table not implemented") from e
raise
async def drop_database(self):
"""Deprecated method."""
@@ -1128,9 +1117,7 @@ class AsyncLanceNamespaceDBConnection:
"""Drop all tables in the namespace."""
if namespace_path is None:
namespace_path = []
table_names = await self.table_names(namespace_path=namespace_path)
for table_name in table_names:
await self.drop_table(table_name, namespace_path=namespace_path)
await self._inner.drop_all_tables(namespace_path=namespace_path)
async def list_namespaces(
self,
@@ -1159,13 +1146,8 @@ class AsyncLanceNamespaceDBConnection:
"""
if namespace_path is None:
namespace_path = []
request = ListNamespacesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
response = self._namespace_client.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
return await self._inner.list_namespaces(
namespace_path=namespace_path, page_token=page_token, limit=limit
)
async def create_namespace(
@@ -1192,15 +1174,11 @@ class AsyncLanceNamespaceDBConnection:
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
request = CreateNamespaceRequest(
id=namespace_path,
mode=_normalize_create_namespace_mode(mode),
return await self._inner.create_namespace(
namespace_path=namespace_path,
mode=mode,
properties=properties,
)
response = self._namespace_client.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
async def drop_namespace(
self,
@@ -1226,20 +1204,16 @@ class AsyncLanceNamespaceDBConnection:
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
request = DropNamespaceRequest(
id=namespace_path,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._namespace_client.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
),
transaction_id=(
response.transaction_id if hasattr(response, "transaction_id") else None
),
)
try:
return await self._inner.drop_namespace(
namespace_path=namespace_path,
mode=mode,
behavior=behavior,
)
except RuntimeError as e:
if "Namespace not empty" in str(e):
raise NamespaceNotEmptyError(str(e)) from e
raise
async def describe_namespace(
self, namespace_path: List[str]
@@ -1257,11 +1231,7 @@ class AsyncLanceNamespaceDBConnection:
DescribeNamespaceResponse
Response containing the namespace properties.
"""
request = DescribeNamespaceRequest(id=namespace_path)
response = self._namespace_client.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
return await self._inner.describe_namespace(namespace_path)
async def list_tables(
self,
@@ -1290,13 +1260,8 @@ class AsyncLanceNamespaceDBConnection:
"""
if namespace_path is None:
namespace_path = []
request = ListTablesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
response = self._namespace_client.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
return await self._inner.list_tables(
namespace_path=namespace_path, page_token=page_token, limit=limit
)
async def namespace_client(self) -> LanceNamespace:
@@ -1310,6 +1275,18 @@ class AsyncLanceNamespaceDBConnection:
LanceNamespace
The namespace client for this connection.
"""
if self._namespace_client is None:
if (
self._namespace_client_impl is None
or self._namespace_client_properties is None
):
raise ValueError(
"Cannot construct a Python namespace client without "
"namespace implementation properties"
)
self._namespace_client = namespace_connect(
self._namespace_client_impl, self._namespace_client_properties
)
return self._namespace_client
@@ -1360,7 +1337,7 @@ def connect_namespace(
LanceNamespaceDBConnection
A namespace-based connection to LanceDB
"""
if _supports_native_sync_namespace(namespace_client_impl):
if _supports_native_namespace(namespace_client_impl):
inner = AsyncConnection(
_connect_namespace(
namespace_client_impl,
@@ -1384,7 +1361,6 @@ def connect_namespace(
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
_inner=inner,
_route_pushdown_to_rust=True,
)
namespace_client = namespace_connect(
@@ -1462,6 +1438,32 @@ def connect_namespace_async(
... tables = await db.table_names()
... table = await db.create_table("my_table", schema=schema)
"""
if _supports_native_namespace(namespace_client_impl):
inner = AsyncConnection(
_connect_namespace(
namespace_client_impl,
namespace_client_properties,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=storage_options,
session=session,
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
)
)
return AsyncLanceNamespaceDBConnection(
namespace_client=None,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
_inner=inner,
)
namespace_client = namespace_connect(
namespace_client_impl, namespace_client_properties
)

View File

@@ -599,6 +599,61 @@ class TestAsyncNamespaceConnection:
table_names = await db.table_names()
assert len(list(table_names)) == 0
async def test_async_builtin_namespace_uses_rust_without_python_client(
self, monkeypatch
):
"""Built-in async namespace connections should not construct or call the
Python namespace client for normal namespace/table management."""
namespace_module = importlib.import_module("lancedb.namespace")
def fail_namespace_connect(*args, **kwargs):
raise AssertionError("Python namespace client should not be constructed")
monkeypatch.setattr(
namespace_module, "namespace_connect", fail_namespace_connect
)
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection)
assert db._namespace_client is None
assert db._route_pushdown_to_rust is True
await db.create_namespace(["test_ns"])
assert "test_ns" in (await db.list_namespaces()).namespaces
schema = pa.schema([pa.field("id", pa.int64())])
table = await db.create_table(
"test_table", schema=schema, namespace_path=["test_ns"]
)
assert table._namespace_path == ["test_ns"]
assert table._namespace_client is None
assert table._route_pushdown_to_rust is True
assert "test_table" in await db.table_names(namespace_path=["test_ns"])
assert "test_table" in (await db.list_tables(namespace_path=["test_ns"])).tables
opened = await db.open_table("test_table", namespace_path=["test_ns"])
assert opened._namespace_path == ["test_ns"]
await db.drop_table("test_table", namespace_path=["test_ns"])
assert (await db.list_tables(namespace_path=["test_ns"])).tables == []
await db.drop_namespace(["test_ns"])
assert "test_ns" not in (await db.list_namespaces()).namespaces
async def test_async_namespace_client_is_lazy(self):
"""namespace_client() should still return the backing client on demand."""
pytest.importorskip("lance")
from lance.namespace import DirectoryNamespace
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
assert db._namespace_client is None
ns_client = await db.namespace_client()
assert isinstance(ns_client, DirectoryNamespace)
namespace_id = ns_client.namespace_id().replace("\\\\", "\\")
assert str(self.temp_dir) in namespace_id
assert db._namespace_client is ns_client
# Async connect via namespace helper is not enabled yet.
async def test_create_table_async(self):
@@ -870,10 +925,11 @@ class TestPushdownOperations:
)
assert db._route_pushdown_to_rust is True
def test_async_route_pushdown_to_rust_false_for_dir(self):
"""The async non-native (dir) connection keeps the Python pushdown path."""
def test_async_route_pushdown_to_rust_for_native_dir(self):
"""The async dir connection is natively built and defers QueryTable
pushdown to Rust."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
assert db._route_pushdown_to_rust is False
assert db._route_pushdown_to_rust is True
def test_lance_table_to_arrow_uses_query_pushdown(self):
namespace_client = _NamespaceClient()