Compare commits

..

2 Commits

Author SHA1 Message Date
Lance Release
93b8ac8e3e Bump version: 0.25.4-beta.1 → 0.25.4-beta.2 2025-11-19 20:24:46 +00:00
Jack Ye
1b78ccedaf feat: support async namespace connection (#2788)
Also fix 2 bugs:
1. make storage options provider serializable in ray
2. fix table.to_table() uri is wrong for namespace-backed tables
2025-11-19 12:23:50 -08:00
10 changed files with 629 additions and 52 deletions

64
Cargo.lock generated
View File

@@ -3032,8 +3032,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4217,8 +4217,8 @@ dependencies = [
[[package]]
name = "lance"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-arith",
@@ -4282,8 +4282,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4301,8 +4301,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrayref",
"paste",
@@ -4311,8 +4311,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4348,8 +4348,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-array",
@@ -4378,8 +4378,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-array",
@@ -4396,8 +4396,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4434,8 +4434,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4467,8 +4467,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-arith",
@@ -4529,8 +4529,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-arith",
@@ -4570,8 +4570,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4587,8 +4587,8 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"async-trait",
@@ -4600,8 +4600,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4639,8 +4639,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow",
"arrow-array",
@@ -4679,8 +4679,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "1.0.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.4#737c394ca9368bf1d350bf4a2018be1d858fff54"
version = "1.0.0-beta.3"
source = "git+https://github.com/lance-format/lance.git?tag=v1.0.0-beta.3#95ff7f6e684c1911fc00c9c2811cce1a61c06ff5"
dependencies = [
"arrow-array",
"arrow-schema",

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=1.0.0-beta.4", default-features = false, "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=1.0.0-beta.4", default-features = false, "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=1.0.0-beta.4", "features" = ["dir-aws", "dir-gcp", "dir-azure", "dir-oss", "rest"], "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=1.0.0-beta.4", "tag" = "v1.0.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=1.0.0-beta.3", default-features = false, "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=1.0.0-beta.3", default-features = false, "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=1.0.0-beta.3", "features" = ["dir-aws", "dir-gcp", "dir-azure", "dir-oss", "rest"], "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=1.0.0-beta.3", "tag" = "v1.0.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "56.2", optional = false }

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.25.4-beta.1"
current_version = "0.25.4-beta.2"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.25.4-beta.1"
version = "0.25.4-beta.2"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -59,7 +59,7 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=1.0.0b2",
"pylance>=1.0.0b4",
"requests",
"datafusion",
]

View File

@@ -20,7 +20,12 @@ from .remote.db import RemoteDBConnection
from .schema import vector
from .table import AsyncTable, Table
from ._lancedb import Session
from .namespace import connect_namespace, LanceNamespaceDBConnection
from .namespace import (
connect_namespace,
connect_namespace_async,
LanceNamespaceDBConnection,
AsyncLanceNamespaceDBConnection,
)
def connect(
@@ -36,7 +41,7 @@ def connect(
session: Optional[Session] = None,
**kwargs: Any,
) -> DBConnection:
"""Connect to a LanceDB database. YAY!
"""Connect to a LanceDB database.
Parameters
----------
@@ -224,7 +229,9 @@ __all__ = [
"connect",
"connect_async",
"connect_namespace",
"connect_namespace_async",
"AsyncConnection",
"AsyncLanceNamespaceDBConnection",
"AsyncTable",
"URI",
"sanitize_uri",

View File

@@ -10,6 +10,7 @@ through a namespace abstraction.
from __future__ import annotations
import asyncio
import sys
from typing import Dict, Iterable, List, Optional, Union
@@ -23,7 +24,7 @@ import pyarrow as pa
from lancedb.db import DBConnection, LanceDBConnection
from lancedb.io import StorageOptionsProvider
from lancedb.table import LanceTable, Table
from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name
from lancedb.common import DATA
from lancedb.pydantic import LanceModel
@@ -497,6 +498,294 @@ class LanceNamespaceDBConnection(DBConnection):
)
class AsyncLanceNamespaceDBConnection:
"""
An async LanceDB connection that uses a namespace for table management.
This connection delegates table URI resolution to a lance_namespace instance,
while providing async methods for all operations.
"""
def __init__(
self,
namespace: LanceNamespace,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
"""
Initialize an async namespace-based LanceDB connection.
Parameters
----------
namespace : LanceNamespace
The namespace instance to use for table management
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
async def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
) -> Iterable[str]:
"""List table names in the namespace."""
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
async def create_table(
self,
name: str,
data: Optional[DATA] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
mode: str = "create",
exist_ok: bool = False,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> AsyncTable:
"""Create a new table in the namespace."""
if mode.lower() not in ["create", "overwrite"]:
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
# Get location from namespace
table_id = namespace + [name]
# Step 1: Get the table location and storage options from namespace
location = None
namespace_storage_options = None
if mode.lower() == "overwrite":
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
# Table doesn't exist, will create a new one below
pass
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
create_empty_request = CreateEmptyTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
create_empty_response = self._ns.create_empty_table(create_empty_request)
if not create_empty_response.location:
raise ValueError(
"Table location is missing from create_empty_table response"
)
location = create_empty_response.location
namespace_storage_options = create_empty_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if namespace_storage_options:
merged_storage_options.update(namespace_storage_options)
# Step 2: Create table using LanceTable.create with the location
# Run the sync operation in a thread
def _create_table():
temp_conn = LanceDBConnection(
location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
# Create a storage options provider if not provided by user
if (
storage_options_provider is None
and namespace_storage_options is not None
):
provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
else:
provider = storage_options_provider
return LanceTable.create(
temp_conn,
name,
data,
schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=provider,
location=location,
)
lance_table = await asyncio.to_thread(_create_table)
# Get the underlying async table from LanceTable
return lance_table._table
async def open_table(
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
) -> AsyncTable:
"""Open an existing table from the namespace."""
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
# Create a storage options provider if not provided by user
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
# Open table in a thread
def _open_table():
temp_conn = LanceDBConnection(
response.location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
return LanceTable.open(
temp_conn,
name,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=response.location,
)
lance_table = await asyncio.to_thread(_open_table)
return lance_table._table
async def drop_table(self, name: str, namespace: List[str] = []):
"""Drop a table from the namespace."""
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
async def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
"""Rename is not supported for namespace connections."""
raise NotImplementedError(
"rename_table is not supported for namespace connections"
)
async def drop_database(self):
"""Deprecated method."""
raise NotImplementedError(
"drop_database is deprecated, use drop_all_tables instead"
)
async def drop_all_tables(self, namespace: List[str] = []):
"""Drop all tables in the namespace."""
table_names = await self.table_names(namespace=namespace)
for table_name in table_names:
await self.drop_table(table_name, namespace=namespace)
async def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""
List child namespaces under the given namespace.
Parameters
----------
namespace : Optional[List[str]]
The parent namespace to list children from.
If None, lists root-level namespaces.
page_token : Optional[str]
Pagination token for listing results.
limit : int
Maximum number of namespaces to return.
Returns
-------
Iterable[str]
Names of child namespaces.
"""
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else []
async def create_namespace(self, namespace: List[str]) -> None:
"""
Create a new namespace.
Parameters
----------
namespace : List[str]
The namespace path to create.
"""
request = CreateNamespaceRequest(id=namespace)
self._ns.create_namespace(request)
async def drop_namespace(self, namespace: List[str]) -> None:
"""
Drop a namespace.
Parameters
----------
namespace : List[str]
The namespace path to drop.
"""
request = DropNamespaceRequest(id=namespace)
self._ns.drop_namespace(request)
def connect_namespace(
impl: str,
properties: Dict[str, str],
@@ -541,3 +830,62 @@ def connect_namespace(
storage_options=storage_options,
session=session,
)
def connect_namespace_async(
impl: str,
properties: Dict[str, str],
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
) -> AsyncLanceNamespaceDBConnection:
"""
Connect to a LanceDB database through a namespace (returns async connection).
This function is synchronous but returns an AsyncLanceNamespaceDBConnection
that provides async methods for all database operations.
Parameters
----------
impl : str
The namespace implementation to use. For examples:
- "dir" for DirectoryNamespace
- "rest" for REST-based namespace
- Full module path for custom implementations
properties : Dict[str, str]
Configuration properties for the namespace implementation.
Different namespace implemenation has different config properties.
For example, use DirectoryNamespace with {"root": "/path/to/directory"}
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
Returns
-------
AsyncLanceNamespaceDBConnection
An async namespace-based connection to LanceDB
Examples
--------
>>> import lancedb
>>> # This function is sync, but returns an async connection
>>> db = lancedb.connect_namespace_async("dir", {"root": "/path/to/db"})
>>> # Use async methods on the connection
>>> async def use_db():
... tables = await db.table_names()
... table = await db.create_table("my_table", schema=schema)
"""
namespace = namespace_connect(impl, properties)
# Return the async namespace-based connection
return AsyncLanceNamespaceDBConnection(
namespace,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
)

View File

@@ -14,6 +14,7 @@ from typing import (
Literal,
Optional,
Tuple,
Type,
TypeVar,
Union,
Any,

View File

@@ -1717,6 +1717,7 @@ class LanceTable(Table):
):
self._conn = connection
self._namespace = namespace
self._location = location # Store location for use in _dataset_path
if _async is not None:
self._table = _async
else:
@@ -1794,6 +1795,10 @@ class LanceTable(Table):
@cached_property
def _dataset_path(self) -> str:
# Cacheable since it's deterministic
# If table was opened with explicit location (e.g., from namespace),
# use that location directly instead of constructing from base URI
if self._location is not None:
return self._location
return _table_path(self._conn.uri, self.name)
def to_lance(self, **kwargs) -> lance.LanceDataset:
@@ -2681,6 +2686,7 @@ class LanceTable(Table):
self = cls.__new__(cls)
self._conn = db
self._namespace = namespace
self._location = location
if data_storage_version is not None:
warnings.warn(

View File

@@ -423,3 +423,218 @@ class TestNamespaceConnection:
db.drop_table("same_name_table", namespace=["namespace_b"])
db.drop_namespace(["namespace_a"])
db.drop_namespace(["namespace_b"])
@pytest.mark.asyncio
class TestAsyncNamespaceConnection:
"""Test async namespace-based LanceDB connection using DirectoryNamespace."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
"""Clean up test fixtures."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
async def test_connect_namespace_async(self):
"""Test connecting to LanceDB through DirectoryNamespace asynchronously."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Should be an AsyncLanceNamespaceDBConnection
assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection)
# Initially no tables in root
table_names = await db.table_names()
assert len(list(table_names)) == 0
async def test_create_table_async(self):
"""Test creating a table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Define schema for empty table
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.string()),
]
)
# Create empty table in child namespace
table = await db.create_table(
"test_table", schema=schema, namespace=["test_ns"]
)
assert table is not None
assert isinstance(table, lancedb.AsyncTable)
# Table should appear in child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert "test_table" in list(table_names)
async def test_open_table_async(self):
"""Test opening an existing table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create a table with schema in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
await db.create_table("test_table", schema=schema, namespace=["test_ns"])
# Open the table
table = await db.open_table("test_table", namespace=["test_ns"])
assert table is not None
assert isinstance(table, lancedb.AsyncTable)
# Test write operation - add data to the table
test_data = [
{"id": 1, "vector": [1.0, 2.0]},
{"id": 2, "vector": [3.0, 4.0]},
{"id": 3, "vector": [5.0, 6.0]},
]
await table.add(test_data)
# Test read operation - query the table
result = await table.to_arrow()
assert len(result) == 3
assert result.schema.field("id").type == pa.int64()
assert result.schema.field("vector").type == pa.list_(pa.float32(), 2)
# Verify data content
result_df = result.to_pandas()
assert result_df["id"].tolist() == [1, 2, 3]
assert [v.tolist() for v in result_df["vector"]] == [
[1.0, 2.0],
[3.0, 4.0],
[5.0, 6.0],
]
# Test update operation
await table.update({"id": 20}, where="id = 2")
result = await table.to_arrow()
result_df = result.to_pandas().sort_values("id").reset_index(drop=True)
assert result_df["id"].tolist() == [1, 3, 20]
# Test delete operation
await table.delete("id = 1")
result = await table.to_arrow()
assert len(result) == 2
result_df = result.to_pandas().sort_values("id").reset_index(drop=True)
assert result_df["id"].tolist() == [3, 20]
async def test_drop_table_async(self):
"""Test dropping a table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
await db.create_table("table1", schema=schema, namespace=["test_ns"])
await db.create_table("table2", schema=schema, namespace=["test_ns"])
# Verify both tables exist in child namespace
table_names = list(await db.table_names(namespace=["test_ns"]))
assert "table1" in table_names
assert "table2" in table_names
assert len(table_names) == 2
# Drop one table
await db.drop_table("table1", namespace=["test_ns"])
# Verify only table2 remains
table_names = list(await db.table_names(namespace=["test_ns"]))
assert "table1" not in table_names
assert "table2" in table_names
assert len(table_names) == 1
async def test_namespace_operations_async(self):
"""Test namespace management operations asynchronously."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Initially no namespaces
namespaces = await db.list_namespaces()
assert len(list(namespaces)) == 0
# Create a namespace
await db.create_namespace(["test_namespace"])
# Verify namespace exists
namespaces = list(await db.list_namespaces())
assert "test_namespace" in namespaces
assert len(namespaces) == 1
# Create table in namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
table = await db.create_table(
"test_table", schema=schema, namespace=["test_namespace"]
)
assert table is not None
# Verify table exists in namespace
tables_in_namespace = list(await db.table_names(namespace=["test_namespace"]))
assert "test_table" in tables_in_namespace
assert len(tables_in_namespace) == 1
# Drop table from namespace
await db.drop_table("test_table", namespace=["test_namespace"])
# Verify table no longer exists in namespace
tables_in_namespace = list(await db.table_names(namespace=["test_namespace"]))
assert len(tables_in_namespace) == 0
# Drop namespace
await db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists
namespaces = list(await db.list_namespaces())
assert len(namespaces) == 0
async def test_drop_all_tables_async(self):
"""Test dropping all tables asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create multiple tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
for i in range(3):
await db.create_table(f"table{i}", schema=schema, namespace=["test_ns"])
# Verify tables exist in child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert len(list(table_names)) == 3
# Drop all tables in child namespace
await db.drop_all_tables(namespace=["test_ns"])
# Verify all tables are gone from child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert len(list(table_names)) == 0