feat: support async namespace connection (#2788)

Also fix 2 bugs:
1. make storage options provider serializable in ray
2. fix table.to_table() uri is wrong for namespace-backed tables
This commit is contained in:
Jack Ye
2025-11-19 12:23:50 -08:00
committed by GitHub
parent ca8d118f78
commit 1b78ccedaf
6 changed files with 581 additions and 4 deletions

View File

@@ -59,7 +59,7 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=1.0.0b2",
"pylance>=1.0.0b4",
"requests",
"datafusion",
]

View File

@@ -20,7 +20,12 @@ from .remote.db import RemoteDBConnection
from .schema import vector
from .table import AsyncTable, Table
from ._lancedb import Session
from .namespace import connect_namespace, LanceNamespaceDBConnection
from .namespace import (
connect_namespace,
connect_namespace_async,
LanceNamespaceDBConnection,
AsyncLanceNamespaceDBConnection,
)
def connect(
@@ -36,7 +41,7 @@ def connect(
session: Optional[Session] = None,
**kwargs: Any,
) -> DBConnection:
"""Connect to a LanceDB database. YAY!
"""Connect to a LanceDB database.
Parameters
----------
@@ -224,7 +229,9 @@ __all__ = [
"connect",
"connect_async",
"connect_namespace",
"connect_namespace_async",
"AsyncConnection",
"AsyncLanceNamespaceDBConnection",
"AsyncTable",
"URI",
"sanitize_uri",

View File

@@ -10,6 +10,7 @@ through a namespace abstraction.
from __future__ import annotations
import asyncio
import sys
from typing import Dict, Iterable, List, Optional, Union
@@ -23,7 +24,7 @@ import pyarrow as pa
from lancedb.db import DBConnection, LanceDBConnection
from lancedb.io import StorageOptionsProvider
from lancedb.table import LanceTable, Table
from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name
from lancedb.common import DATA
from lancedb.pydantic import LanceModel
@@ -497,6 +498,294 @@ class LanceNamespaceDBConnection(DBConnection):
)
class AsyncLanceNamespaceDBConnection:
"""
An async LanceDB connection that uses a namespace for table management.
This connection delegates table URI resolution to a lance_namespace instance,
while providing async methods for all operations.
"""
def __init__(
self,
namespace: LanceNamespace,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
"""
Initialize an async namespace-based LanceDB connection.
Parameters
----------
namespace : LanceNamespace
The namespace instance to use for table management
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
async def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
) -> Iterable[str]:
"""List table names in the namespace."""
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
async def create_table(
self,
name: str,
data: Optional[DATA] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
mode: str = "create",
exist_ok: bool = False,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> AsyncTable:
"""Create a new table in the namespace."""
if mode.lower() not in ["create", "overwrite"]:
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
# Get location from namespace
table_id = namespace + [name]
# Step 1: Get the table location and storage options from namespace
location = None
namespace_storage_options = None
if mode.lower() == "overwrite":
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
# Table doesn't exist, will create a new one below
pass
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
create_empty_request = CreateEmptyTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
create_empty_response = self._ns.create_empty_table(create_empty_request)
if not create_empty_response.location:
raise ValueError(
"Table location is missing from create_empty_table response"
)
location = create_empty_response.location
namespace_storage_options = create_empty_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if namespace_storage_options:
merged_storage_options.update(namespace_storage_options)
# Step 2: Create table using LanceTable.create with the location
# Run the sync operation in a thread
def _create_table():
temp_conn = LanceDBConnection(
location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
# Create a storage options provider if not provided by user
if (
storage_options_provider is None
and namespace_storage_options is not None
):
provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
else:
provider = storage_options_provider
return LanceTable.create(
temp_conn,
name,
data,
schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=provider,
location=location,
)
lance_table = await asyncio.to_thread(_create_table)
# Get the underlying async table from LanceTable
return lance_table._table
async def open_table(
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
) -> AsyncTable:
"""Open an existing table from the namespace."""
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
# Create a storage options provider if not provided by user
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
# Open table in a thread
def _open_table():
temp_conn = LanceDBConnection(
response.location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
return LanceTable.open(
temp_conn,
name,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=response.location,
)
lance_table = await asyncio.to_thread(_open_table)
return lance_table._table
async def drop_table(self, name: str, namespace: List[str] = []):
"""Drop a table from the namespace."""
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
async def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
"""Rename is not supported for namespace connections."""
raise NotImplementedError(
"rename_table is not supported for namespace connections"
)
async def drop_database(self):
"""Deprecated method."""
raise NotImplementedError(
"drop_database is deprecated, use drop_all_tables instead"
)
async def drop_all_tables(self, namespace: List[str] = []):
"""Drop all tables in the namespace."""
table_names = await self.table_names(namespace=namespace)
for table_name in table_names:
await self.drop_table(table_name, namespace=namespace)
async def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""
List child namespaces under the given namespace.
Parameters
----------
namespace : Optional[List[str]]
The parent namespace to list children from.
If None, lists root-level namespaces.
page_token : Optional[str]
Pagination token for listing results.
limit : int
Maximum number of namespaces to return.
Returns
-------
Iterable[str]
Names of child namespaces.
"""
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else []
async def create_namespace(self, namespace: List[str]) -> None:
"""
Create a new namespace.
Parameters
----------
namespace : List[str]
The namespace path to create.
"""
request = CreateNamespaceRequest(id=namespace)
self._ns.create_namespace(request)
async def drop_namespace(self, namespace: List[str]) -> None:
"""
Drop a namespace.
Parameters
----------
namespace : List[str]
The namespace path to drop.
"""
request = DropNamespaceRequest(id=namespace)
self._ns.drop_namespace(request)
def connect_namespace(
impl: str,
properties: Dict[str, str],
@@ -541,3 +830,62 @@ def connect_namespace(
storage_options=storage_options,
session=session,
)
def connect_namespace_async(
impl: str,
properties: Dict[str, str],
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
) -> AsyncLanceNamespaceDBConnection:
"""
Connect to a LanceDB database through a namespace (returns async connection).
This function is synchronous but returns an AsyncLanceNamespaceDBConnection
that provides async methods for all database operations.
Parameters
----------
impl : str
The namespace implementation to use. For examples:
- "dir" for DirectoryNamespace
- "rest" for REST-based namespace
- Full module path for custom implementations
properties : Dict[str, str]
Configuration properties for the namespace implementation.
Different namespace implemenation has different config properties.
For example, use DirectoryNamespace with {"root": "/path/to/directory"}
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
Returns
-------
AsyncLanceNamespaceDBConnection
An async namespace-based connection to LanceDB
Examples
--------
>>> import lancedb
>>> # This function is sync, but returns an async connection
>>> db = lancedb.connect_namespace_async("dir", {"root": "/path/to/db"})
>>> # Use async methods on the connection
>>> async def use_db():
... tables = await db.table_names()
... table = await db.create_table("my_table", schema=schema)
"""
namespace = namespace_connect(impl, properties)
# Return the async namespace-based connection
return AsyncLanceNamespaceDBConnection(
namespace,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
)

View File

@@ -14,6 +14,7 @@ from typing import (
Literal,
Optional,
Tuple,
Type,
TypeVar,
Union,
Any,

View File

@@ -1717,6 +1717,7 @@ class LanceTable(Table):
):
self._conn = connection
self._namespace = namespace
self._location = location # Store location for use in _dataset_path
if _async is not None:
self._table = _async
else:
@@ -1794,6 +1795,10 @@ class LanceTable(Table):
@cached_property
def _dataset_path(self) -> str:
# Cacheable since it's deterministic
# If table was opened with explicit location (e.g., from namespace),
# use that location directly instead of constructing from base URI
if self._location is not None:
return self._location
return _table_path(self._conn.uri, self.name)
def to_lance(self, **kwargs) -> lance.LanceDataset:
@@ -2681,6 +2686,7 @@ class LanceTable(Table):
self = cls.__new__(cls)
self._conn = db
self._namespace = namespace
self._location = location
if data_storage_version is not None:
warnings.warn(

View File

@@ -423,3 +423,218 @@ class TestNamespaceConnection:
db.drop_table("same_name_table", namespace=["namespace_b"])
db.drop_namespace(["namespace_a"])
db.drop_namespace(["namespace_b"])
@pytest.mark.asyncio
class TestAsyncNamespaceConnection:
"""Test async namespace-based LanceDB connection using DirectoryNamespace."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
"""Clean up test fixtures."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
async def test_connect_namespace_async(self):
"""Test connecting to LanceDB through DirectoryNamespace asynchronously."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Should be an AsyncLanceNamespaceDBConnection
assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection)
# Initially no tables in root
table_names = await db.table_names()
assert len(list(table_names)) == 0
async def test_create_table_async(self):
"""Test creating a table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Define schema for empty table
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.string()),
]
)
# Create empty table in child namespace
table = await db.create_table(
"test_table", schema=schema, namespace=["test_ns"]
)
assert table is not None
assert isinstance(table, lancedb.AsyncTable)
# Table should appear in child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert "test_table" in list(table_names)
async def test_open_table_async(self):
"""Test opening an existing table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create a table with schema in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
await db.create_table("test_table", schema=schema, namespace=["test_ns"])
# Open the table
table = await db.open_table("test_table", namespace=["test_ns"])
assert table is not None
assert isinstance(table, lancedb.AsyncTable)
# Test write operation - add data to the table
test_data = [
{"id": 1, "vector": [1.0, 2.0]},
{"id": 2, "vector": [3.0, 4.0]},
{"id": 3, "vector": [5.0, 6.0]},
]
await table.add(test_data)
# Test read operation - query the table
result = await table.to_arrow()
assert len(result) == 3
assert result.schema.field("id").type == pa.int64()
assert result.schema.field("vector").type == pa.list_(pa.float32(), 2)
# Verify data content
result_df = result.to_pandas()
assert result_df["id"].tolist() == [1, 2, 3]
assert [v.tolist() for v in result_df["vector"]] == [
[1.0, 2.0],
[3.0, 4.0],
[5.0, 6.0],
]
# Test update operation
await table.update({"id": 20}, where="id = 2")
result = await table.to_arrow()
result_df = result.to_pandas().sort_values("id").reset_index(drop=True)
assert result_df["id"].tolist() == [1, 3, 20]
# Test delete operation
await table.delete("id = 1")
result = await table.to_arrow()
assert len(result) == 2
result_df = result.to_pandas().sort_values("id").reset_index(drop=True)
assert result_df["id"].tolist() == [3, 20]
async def test_drop_table_async(self):
"""Test dropping a table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
await db.create_table("table1", schema=schema, namespace=["test_ns"])
await db.create_table("table2", schema=schema, namespace=["test_ns"])
# Verify both tables exist in child namespace
table_names = list(await db.table_names(namespace=["test_ns"]))
assert "table1" in table_names
assert "table2" in table_names
assert len(table_names) == 2
# Drop one table
await db.drop_table("table1", namespace=["test_ns"])
# Verify only table2 remains
table_names = list(await db.table_names(namespace=["test_ns"]))
assert "table1" not in table_names
assert "table2" in table_names
assert len(table_names) == 1
async def test_namespace_operations_async(self):
"""Test namespace management operations asynchronously."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Initially no namespaces
namespaces = await db.list_namespaces()
assert len(list(namespaces)) == 0
# Create a namespace
await db.create_namespace(["test_namespace"])
# Verify namespace exists
namespaces = list(await db.list_namespaces())
assert "test_namespace" in namespaces
assert len(namespaces) == 1
# Create table in namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
table = await db.create_table(
"test_table", schema=schema, namespace=["test_namespace"]
)
assert table is not None
# Verify table exists in namespace
tables_in_namespace = list(await db.table_names(namespace=["test_namespace"]))
assert "test_table" in tables_in_namespace
assert len(tables_in_namespace) == 1
# Drop table from namespace
await db.drop_table("test_table", namespace=["test_namespace"])
# Verify table no longer exists in namespace
tables_in_namespace = list(await db.table_names(namespace=["test_namespace"]))
assert len(tables_in_namespace) == 0
# Drop namespace
await db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists
namespaces = list(await db.list_namespaces())
assert len(namespaces) == 0
async def test_drop_all_tables_async(self):
"""Test dropping all tables asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create multiple tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
for i in range(3):
await db.create_table(f"table{i}", schema=schema, namespace=["test_ns"])
# Verify tables exist in child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert len(list(table_names)) == 3
# Drop all tables in child namespace
await db.drop_all_tables(namespace=["test_ns"])
# Verify all tables are gone from child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert len(list(table_names)) == 0