Compare commits

...

5 Commits

Author SHA1 Message Date
Lance Release
93b8ac8e3e Bump version: 0.25.4-beta.1 → 0.25.4-beta.2 2025-11-19 20:24:46 +00:00
Jack Ye
1b78ccedaf feat: support async namespace connection (#2788)
Also fix 2 bugs:
1. make storage options provider serializable in ray
2. fix table.to_table() uri is wrong for namespace-backed tables
2025-11-19 12:23:50 -08:00
Mykola Skrynnyk
ca8d118f78 feat(python): support to_pydantic in async (#2438)
This request improves support for `pydantic` integration by adding
`to_pydantic` method to asynchronous queries and handling models that
use `alias` in field definitions. Fixes #2436 and closes #2437 .

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added support for converting asynchronous query results to Pydantic
models.
- **Bug Fixes**
- Simplified conversion of query results to Pydantic models for improved
reliability.
- Improved handling of field aliases and computed fields when mapping
query results to Pydantic models.
- **Tests**
- Added tests to verify correct mapping of aliased and computed fields
in both synchronous and asynchronous scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-11-19 11:20:14 -08:00
Wyatt Alt
386fc9e466 feat: add num_attempts to merge insert result (#2795)
This pipes the num_attempts field from lance's merge insert result
through lancedb. This allows callers of merge_insert to get a better
idea of whether transaction conflicts are occurring.
2025-11-19 09:32:57 -08:00
Lance Release
ce1bafec1a Bump version: 0.22.4-beta.0 → 0.22.4-beta.1 2025-11-19 12:58:59 +00:00
32 changed files with 701 additions and 33 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "0.22.4-beta.0" current_version = "0.22.4-beta.1"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

6
Cargo.lock generated
View File

@@ -4691,7 +4691,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb" name = "lancedb"
version = "0.22.4-beta.0" version = "0.22.4-beta.1"
dependencies = [ dependencies = [
"ahash", "ahash",
"anyhow", "anyhow",
@@ -4786,7 +4786,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-nodejs" name = "lancedb-nodejs"
version = "0.22.4-beta.0" version = "0.22.4-beta.1"
dependencies = [ dependencies = [
"arrow-array", "arrow-array",
"arrow-ipc", "arrow-ipc",
@@ -4806,7 +4806,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-python" name = "lancedb-python"
version = "0.25.4-beta.0" version = "0.25.4-beta.1"
dependencies = [ dependencies = [
"arrow", "arrow",
"async-trait", "async-trait",

View File

@@ -8,6 +8,14 @@
## Properties ## Properties
### numAttempts
```ts
numAttempts: number;
```
***
### numDeletedRows ### numDeletedRows
```ts ```ts

View File

@@ -8,7 +8,7 @@
<parent> <parent>
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.22.4-beta.0</version> <version>0.22.4-beta.1</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@@ -8,7 +8,7 @@
<parent> <parent>
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.22.4-beta.0</version> <version>0.22.4-beta.1</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.22.4-beta.0</version> <version>0.22.4-beta.1</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description> <description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package] [package]
name = "lancedb-nodejs" name = "lancedb-nodejs"
edition.workspace = true edition.workspace = true
version = "0.22.4-beta.0" version = "0.22.4-beta.1"
license.workspace = true license.workspace = true
description.workspace = true description.workspace = true
repository.workspace = true repository.workspace = true

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-darwin-arm64", "name": "@lancedb/lancedb-darwin-arm64",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["darwin"], "os": ["darwin"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node", "main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-darwin-x64", "name": "@lancedb/lancedb-darwin-x64",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["darwin"], "os": ["darwin"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.darwin-x64.node", "main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-arm64-gnu", "name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node", "main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-arm64-musl", "name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node", "main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-x64-gnu", "name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node", "main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-x64-musl", "name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["linux"], "os": ["linux"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node", "main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-win32-arm64-msvc", "name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": [ "os": [
"win32" "win32"
], ],

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-win32-x64-msvc", "name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"os": ["win32"], "os": ["win32"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node", "main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{ {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"cpu": [ "cpu": [
"x64", "x64",
"arm64" "arm64"

View File

@@ -11,7 +11,7 @@
"ann" "ann"
], ],
"private": false, "private": false,
"version": "0.22.4-beta.0", "version": "0.22.4-beta.1",
"main": "dist/index.js", "main": "dist/index.js",
"exports": { "exports": {
".": "./dist/index.js", ".": "./dist/index.js",

View File

@@ -740,6 +740,7 @@ pub struct MergeResult {
pub num_inserted_rows: i64, pub num_inserted_rows: i64,
pub num_updated_rows: i64, pub num_updated_rows: i64,
pub num_deleted_rows: i64, pub num_deleted_rows: i64,
pub num_attempts: i64,
} }
impl From<lancedb::table::MergeResult> for MergeResult { impl From<lancedb::table::MergeResult> for MergeResult {
@@ -749,6 +750,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_inserted_rows: value.num_inserted_rows as i64, num_inserted_rows: value.num_inserted_rows as i64,
num_updated_rows: value.num_updated_rows as i64, num_updated_rows: value.num_updated_rows as i64,
num_deleted_rows: value.num_deleted_rows as i64, num_deleted_rows: value.num_deleted_rows as i64,
num_attempts: value.num_attempts as i64,
} }
} }
} }

View File

@@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "0.25.4-beta.1" current_version = "0.25.4-beta.2"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb-python" name = "lancedb-python"
version = "0.25.4-beta.1" version = "0.25.4-beta.2"
edition.workspace = true edition.workspace = true
description = "Python bindings for LanceDB" description = "Python bindings for LanceDB"
license.workspace = true license.workspace = true

View File

@@ -59,7 +59,7 @@ tests = [
"polars>=0.19, <=1.3.0", "polars>=0.19, <=1.3.0",
"tantivy", "tantivy",
"pyarrow-stubs", "pyarrow-stubs",
"pylance>=1.0.0b2", "pylance>=1.0.0b4",
"requests", "requests",
"datafusion", "datafusion",
] ]

View File

@@ -20,7 +20,12 @@ from .remote.db import RemoteDBConnection
from .schema import vector from .schema import vector
from .table import AsyncTable, Table from .table import AsyncTable, Table
from ._lancedb import Session from ._lancedb import Session
from .namespace import connect_namespace, LanceNamespaceDBConnection from .namespace import (
connect_namespace,
connect_namespace_async,
LanceNamespaceDBConnection,
AsyncLanceNamespaceDBConnection,
)
def connect( def connect(
@@ -36,7 +41,7 @@ def connect(
session: Optional[Session] = None, session: Optional[Session] = None,
**kwargs: Any, **kwargs: Any,
) -> DBConnection: ) -> DBConnection:
"""Connect to a LanceDB database. YAY! """Connect to a LanceDB database.
Parameters Parameters
---------- ----------
@@ -224,7 +229,9 @@ __all__ = [
"connect", "connect",
"connect_async", "connect_async",
"connect_namespace", "connect_namespace",
"connect_namespace_async",
"AsyncConnection", "AsyncConnection",
"AsyncLanceNamespaceDBConnection",
"AsyncTable", "AsyncTable",
"URI", "URI",
"sanitize_uri", "sanitize_uri",

View File

@@ -306,6 +306,7 @@ class MergeResult:
num_updated_rows: int num_updated_rows: int
num_inserted_rows: int num_inserted_rows: int
num_deleted_rows: int num_deleted_rows: int
num_attempts: int
class AddColumnsResult: class AddColumnsResult:
version: int version: int

View File

@@ -10,6 +10,7 @@ through a namespace abstraction.
from __future__ import annotations from __future__ import annotations
import asyncio
import sys import sys
from typing import Dict, Iterable, List, Optional, Union from typing import Dict, Iterable, List, Optional, Union
@@ -23,7 +24,7 @@ import pyarrow as pa
from lancedb.db import DBConnection, LanceDBConnection from lancedb.db import DBConnection, LanceDBConnection
from lancedb.io import StorageOptionsProvider from lancedb.io import StorageOptionsProvider
from lancedb.table import LanceTable, Table from lancedb.table import AsyncTable, LanceTable, Table
from lancedb.util import validate_table_name from lancedb.util import validate_table_name
from lancedb.common import DATA from lancedb.common import DATA
from lancedb.pydantic import LanceModel from lancedb.pydantic import LanceModel
@@ -497,6 +498,294 @@ class LanceNamespaceDBConnection(DBConnection):
) )
class AsyncLanceNamespaceDBConnection:
"""
An async LanceDB connection that uses a namespace for table management.
This connection delegates table URI resolution to a lance_namespace instance,
while providing async methods for all operations.
"""
def __init__(
self,
namespace: LanceNamespace,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
"""
Initialize an async namespace-based LanceDB connection.
Parameters
----------
namespace : LanceNamespace
The namespace instance to use for table management
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
"""
self._ns = namespace
self.read_consistency_interval = read_consistency_interval
self.storage_options = storage_options or {}
self.session = session
async def table_names(
self,
page_token: Optional[str] = None,
limit: int = 10,
*,
namespace: List[str] = [],
) -> Iterable[str]:
"""List table names in the namespace."""
request = ListTablesRequest(id=namespace, page_token=page_token, limit=limit)
response = self._ns.list_tables(request)
return response.tables if response.tables else []
async def create_table(
self,
name: str,
data: Optional[DATA] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
mode: str = "create",
exist_ok: bool = False,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
) -> AsyncTable:
"""Create a new table in the namespace."""
if mode.lower() not in ["create", "overwrite"]:
raise ValueError("mode must be either 'create' or 'overwrite'")
validate_table_name(name)
# Get location from namespace
table_id = namespace + [name]
# Step 1: Get the table location and storage options from namespace
location = None
namespace_storage_options = None
if mode.lower() == "overwrite":
# Try to describe the table first to see if it exists
try:
describe_request = DescribeTableRequest(id=table_id)
describe_response = self._ns.describe_table(describe_request)
location = describe_response.location
namespace_storage_options = describe_response.storage_options
except Exception:
# Table doesn't exist, will create a new one below
pass
if location is None:
# Table doesn't exist or mode is "create", reserve a new location
create_empty_request = CreateEmptyTableRequest(
id=table_id,
location=None,
properties=self.storage_options if self.storage_options else None,
)
create_empty_response = self._ns.create_empty_table(create_empty_request)
if not create_empty_response.location:
raise ValueError(
"Table location is missing from create_empty_table response"
)
location = create_empty_response.location
namespace_storage_options = create_empty_response.storage_options
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if namespace_storage_options:
merged_storage_options.update(namespace_storage_options)
# Step 2: Create table using LanceTable.create with the location
# Run the sync operation in a thread
def _create_table():
temp_conn = LanceDBConnection(
location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
# Create a storage options provider if not provided by user
if (
storage_options_provider is None
and namespace_storage_options is not None
):
provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
else:
provider = storage_options_provider
return LanceTable.create(
temp_conn,
name,
data,
schema,
mode=mode,
exist_ok=exist_ok,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
embedding_functions=embedding_functions,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=provider,
location=location,
)
lance_table = await asyncio.to_thread(_create_table)
# Get the underlying async table from LanceTable
return lance_table._table
async def open_table(
self,
name: str,
*,
namespace: List[str] = [],
storage_options: Optional[Dict[str, str]] = None,
storage_options_provider: Optional[StorageOptionsProvider] = None,
index_cache_size: Optional[int] = None,
) -> AsyncTable:
"""Open an existing table from the namespace."""
table_id = namespace + [name]
request = DescribeTableRequest(id=table_id)
response = self._ns.describe_table(request)
# Merge storage options: self.storage_options < user options < namespace options
merged_storage_options = dict(self.storage_options)
if storage_options:
merged_storage_options.update(storage_options)
if response.storage_options:
merged_storage_options.update(response.storage_options)
# Create a storage options provider if not provided by user
if storage_options_provider is None and response.storage_options is not None:
storage_options_provider = LanceNamespaceStorageOptionsProvider(
namespace=self._ns,
table_id=table_id,
)
# Open table in a thread
def _open_table():
temp_conn = LanceDBConnection(
response.location,
read_consistency_interval=self.read_consistency_interval,
storage_options=merged_storage_options,
session=self.session,
)
return LanceTable.open(
temp_conn,
name,
namespace=namespace,
storage_options=merged_storage_options,
storage_options_provider=storage_options_provider,
index_cache_size=index_cache_size,
location=response.location,
)
lance_table = await asyncio.to_thread(_open_table)
return lance_table._table
async def drop_table(self, name: str, namespace: List[str] = []):
"""Drop a table from the namespace."""
table_id = namespace + [name]
request = DropTableRequest(id=table_id)
self._ns.drop_table(request)
async def rename_table(
self,
cur_name: str,
new_name: str,
cur_namespace: List[str] = [],
new_namespace: List[str] = [],
):
"""Rename is not supported for namespace connections."""
raise NotImplementedError(
"rename_table is not supported for namespace connections"
)
async def drop_database(self):
"""Deprecated method."""
raise NotImplementedError(
"drop_database is deprecated, use drop_all_tables instead"
)
async def drop_all_tables(self, namespace: List[str] = []):
"""Drop all tables in the namespace."""
table_names = await self.table_names(namespace=namespace)
for table_name in table_names:
await self.drop_table(table_name, namespace=namespace)
async def list_namespaces(
self,
namespace: List[str] = [],
page_token: Optional[str] = None,
limit: int = 10,
) -> Iterable[str]:
"""
List child namespaces under the given namespace.
Parameters
----------
namespace : Optional[List[str]]
The parent namespace to list children from.
If None, lists root-level namespaces.
page_token : Optional[str]
Pagination token for listing results.
limit : int
Maximum number of namespaces to return.
Returns
-------
Iterable[str]
Names of child namespaces.
"""
request = ListNamespacesRequest(
id=namespace, page_token=page_token, limit=limit
)
response = self._ns.list_namespaces(request)
return response.namespaces if response.namespaces else []
async def create_namespace(self, namespace: List[str]) -> None:
"""
Create a new namespace.
Parameters
----------
namespace : List[str]
The namespace path to create.
"""
request = CreateNamespaceRequest(id=namespace)
self._ns.create_namespace(request)
async def drop_namespace(self, namespace: List[str]) -> None:
"""
Drop a namespace.
Parameters
----------
namespace : List[str]
The namespace path to drop.
"""
request = DropNamespaceRequest(id=namespace)
self._ns.drop_namespace(request)
def connect_namespace( def connect_namespace(
impl: str, impl: str,
properties: Dict[str, str], properties: Dict[str, str],
@@ -541,3 +830,62 @@ def connect_namespace(
storage_options=storage_options, storage_options=storage_options,
session=session, session=session,
) )
def connect_namespace_async(
impl: str,
properties: Dict[str, str],
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
) -> AsyncLanceNamespaceDBConnection:
"""
Connect to a LanceDB database through a namespace (returns async connection).
This function is synchronous but returns an AsyncLanceNamespaceDBConnection
that provides async methods for all database operations.
Parameters
----------
impl : str
The namespace implementation to use. For examples:
- "dir" for DirectoryNamespace
- "rest" for REST-based namespace
- Full module path for custom implementations
properties : Dict[str, str]
Configuration properties for the namespace implementation.
Different namespace implemenation has different config properties.
For example, use DirectoryNamespace with {"root": "/path/to/directory"}
read_consistency_interval : Optional[timedelta]
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked.
storage_options : Optional[Dict[str, str]]
Additional options for the storage backend
session : Optional[Session]
A session to use for this connection
Returns
-------
AsyncLanceNamespaceDBConnection
An async namespace-based connection to LanceDB
Examples
--------
>>> import lancedb
>>> # This function is sync, but returns an async connection
>>> db = lancedb.connect_namespace_async("dir", {"root": "/path/to/db"})
>>> # Use async methods on the connection
>>> async def use_db():
... tables = await db.table_names()
... table = await db.create_table("my_table", schema=schema)
"""
namespace = namespace_connect(impl, properties)
# Return the async namespace-based connection
return AsyncLanceNamespaceDBConnection(
namespace,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
)

View File

@@ -14,6 +14,7 @@ from typing import (
Literal, Literal,
Optional, Optional,
Tuple, Tuple,
Type,
TypeVar, TypeVar,
Union, Union,
Any, Any,
@@ -786,10 +787,7 @@ class LanceQueryBuilder(ABC):
------- -------
List[LanceModel] List[LanceModel]
""" """
return [ return [model(**row) for row in self.to_arrow(timeout=timeout).to_pylist()]
model(**{k: v for k, v in row.items() if k in model.field_names()})
for row in self.to_arrow(timeout=timeout).to_pylist()
]
def to_polars(self, *, timeout: Optional[timedelta] = None) -> "pl.DataFrame": def to_polars(self, *, timeout: Optional[timedelta] = None) -> "pl.DataFrame":
""" """
@@ -2400,6 +2398,28 @@ class AsyncQueryBase(object):
return pl.from_arrow(await self.to_arrow(timeout=timeout)) return pl.from_arrow(await self.to_arrow(timeout=timeout))
async def to_pydantic(
self, model: Type[LanceModel], *, timeout: Optional[timedelta] = None
) -> List[LanceModel]:
"""
Convert results to a list of pydantic models.
Parameters
----------
model : Type[LanceModel]
The pydantic model to use.
timeout : timedelta, optional
The maximum time to wait for the query to complete.
If None, wait indefinitely.
Returns
-------
list[LanceModel]
"""
return [
model(**row) for row in (await self.to_arrow(timeout=timeout)).to_pylist()
]
async def explain_plan(self, verbose: Optional[bool] = False): async def explain_plan(self, verbose: Optional[bool] = False):
"""Return the execution plan for this query. """Return the execution plan for this query.

View File

@@ -1717,6 +1717,7 @@ class LanceTable(Table):
): ):
self._conn = connection self._conn = connection
self._namespace = namespace self._namespace = namespace
self._location = location # Store location for use in _dataset_path
if _async is not None: if _async is not None:
self._table = _async self._table = _async
else: else:
@@ -1794,6 +1795,10 @@ class LanceTable(Table):
@cached_property @cached_property
def _dataset_path(self) -> str: def _dataset_path(self) -> str:
# Cacheable since it's deterministic # Cacheable since it's deterministic
# If table was opened with explicit location (e.g., from namespace),
# use that location directly instead of constructing from base URI
if self._location is not None:
return self._location
return _table_path(self._conn.uri, self.name) return _table_path(self._conn.uri, self.name)
def to_lance(self, **kwargs) -> lance.LanceDataset: def to_lance(self, **kwargs) -> lance.LanceDataset:
@@ -2681,6 +2686,7 @@ class LanceTable(Table):
self = cls.__new__(cls) self = cls.__new__(cls)
self._conn = db self._conn = db
self._namespace = namespace self._namespace = namespace
self._location = location
if data_storage_version is not None: if data_storage_version is not None:
warnings.warn( warnings.warn(

View File

@@ -423,3 +423,218 @@ class TestNamespaceConnection:
db.drop_table("same_name_table", namespace=["namespace_b"]) db.drop_table("same_name_table", namespace=["namespace_b"])
db.drop_namespace(["namespace_a"]) db.drop_namespace(["namespace_a"])
db.drop_namespace(["namespace_b"]) db.drop_namespace(["namespace_b"])
@pytest.mark.asyncio
class TestAsyncNamespaceConnection:
"""Test async namespace-based LanceDB connection using DirectoryNamespace."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
def teardown_method(self):
"""Clean up test fixtures."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
async def test_connect_namespace_async(self):
"""Test connecting to LanceDB through DirectoryNamespace asynchronously."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Should be an AsyncLanceNamespaceDBConnection
assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection)
# Initially no tables in root
table_names = await db.table_names()
assert len(list(table_names)) == 0
async def test_create_table_async(self):
"""Test creating a table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Define schema for empty table
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
pa.field("text", pa.string()),
]
)
# Create empty table in child namespace
table = await db.create_table(
"test_table", schema=schema, namespace=["test_ns"]
)
assert table is not None
assert isinstance(table, lancedb.AsyncTable)
# Table should appear in child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert "test_table" in list(table_names)
async def test_open_table_async(self):
"""Test opening an existing table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create a table with schema in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
await db.create_table("test_table", schema=schema, namespace=["test_ns"])
# Open the table
table = await db.open_table("test_table", namespace=["test_ns"])
assert table is not None
assert isinstance(table, lancedb.AsyncTable)
# Test write operation - add data to the table
test_data = [
{"id": 1, "vector": [1.0, 2.0]},
{"id": 2, "vector": [3.0, 4.0]},
{"id": 3, "vector": [5.0, 6.0]},
]
await table.add(test_data)
# Test read operation - query the table
result = await table.to_arrow()
assert len(result) == 3
assert result.schema.field("id").type == pa.int64()
assert result.schema.field("vector").type == pa.list_(pa.float32(), 2)
# Verify data content
result_df = result.to_pandas()
assert result_df["id"].tolist() == [1, 2, 3]
assert [v.tolist() for v in result_df["vector"]] == [
[1.0, 2.0],
[3.0, 4.0],
[5.0, 6.0],
]
# Test update operation
await table.update({"id": 20}, where="id = 2")
result = await table.to_arrow()
result_df = result.to_pandas().sort_values("id").reset_index(drop=True)
assert result_df["id"].tolist() == [1, 3, 20]
# Test delete operation
await table.delete("id = 1")
result = await table.to_arrow()
assert len(result) == 2
result_df = result.to_pandas().sort_values("id").reset_index(drop=True)
assert result_df["id"].tolist() == [3, 20]
async def test_drop_table_async(self):
"""Test dropping a table asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
await db.create_table("table1", schema=schema, namespace=["test_ns"])
await db.create_table("table2", schema=schema, namespace=["test_ns"])
# Verify both tables exist in child namespace
table_names = list(await db.table_names(namespace=["test_ns"]))
assert "table1" in table_names
assert "table2" in table_names
assert len(table_names) == 2
# Drop one table
await db.drop_table("table1", namespace=["test_ns"])
# Verify only table2 remains
table_names = list(await db.table_names(namespace=["test_ns"]))
assert "table1" not in table_names
assert "table2" in table_names
assert len(table_names) == 1
async def test_namespace_operations_async(self):
"""Test namespace management operations asynchronously."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Initially no namespaces
namespaces = await db.list_namespaces()
assert len(list(namespaces)) == 0
# Create a namespace
await db.create_namespace(["test_namespace"])
# Verify namespace exists
namespaces = list(await db.list_namespaces())
assert "test_namespace" in namespaces
assert len(namespaces) == 1
# Create table in namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
table = await db.create_table(
"test_table", schema=schema, namespace=["test_namespace"]
)
assert table is not None
# Verify table exists in namespace
tables_in_namespace = list(await db.table_names(namespace=["test_namespace"]))
assert "test_table" in tables_in_namespace
assert len(tables_in_namespace) == 1
# Drop table from namespace
await db.drop_table("test_table", namespace=["test_namespace"])
# Verify table no longer exists in namespace
tables_in_namespace = list(await db.table_names(namespace=["test_namespace"]))
assert len(tables_in_namespace) == 0
# Drop namespace
await db.drop_namespace(["test_namespace"])
# Verify namespace no longer exists
namespaces = list(await db.list_namespaces())
assert len(namespaces) == 0
async def test_drop_all_tables_async(self):
"""Test dropping all tables asynchronously through namespace."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
# Create a child namespace first
await db.create_namespace(["test_ns"])
# Create multiple tables in child namespace
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 2)),
]
)
for i in range(3):
await db.create_table(f"table{i}", schema=schema, namespace=["test_ns"])
# Verify tables exist in child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert len(list(table_names)) == 3
# Drop all tables in child namespace
await db.drop_all_tables(namespace=["test_ns"])
# Verify all tables are gone from child namespace
table_names = await db.table_names(namespace=["test_ns"])
assert len(list(table_names)) == 0

View File

@@ -412,3 +412,50 @@ def test_multi_vector_in_lance_model():
t = TestModel(id=1) t = TestModel(id=1)
assert t.vectors == [[0.0] * 16] assert t.vectors == [[0.0] * 16]
def test_aliases_in_lance_model(mem_db):
data = [
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 6.5], "item": "bar", "price": 20.0},
]
tbl = mem_db.create_table("items", data=data)
class TestModel(LanceModel):
name: str = Field(alias="item")
price: float
distance: float = Field(alias="_distance")
model = (
tbl.search([5.9, 6.5])
.distance_type("cosine")
.limit(1)
.to_pydantic(TestModel)[0]
)
assert hasattr(model, "name")
assert hasattr(model, "distance")
assert model.distance < 0.01
@pytest.mark.asyncio
async def test_aliases_in_lance_model_async(mem_db_async):
data = [
{"vector": [8.3, 2.5], "item": "foo", "price": 12.0},
{"vector": [7.7, 3.9], "item": "bar", "price": 11.2},
]
tbl = await mem_db_async.create_table("items", data=data)
class TestModel(LanceModel):
name: str = Field(alias="item")
price: float
distance: float = Field(alias="_distance")
model = (
await tbl.vector_search([7.7, 3.9])
.distance_type("cosine")
.limit(1)
.to_pydantic(TestModel)
)[0]
assert hasattr(model, "name")
assert hasattr(model, "distance")
assert model.distance < 0.01

View File

@@ -134,17 +134,19 @@ pub struct MergeResult {
pub num_updated_rows: u64, pub num_updated_rows: u64,
pub num_inserted_rows: u64, pub num_inserted_rows: u64,
pub num_deleted_rows: u64, pub num_deleted_rows: u64,
pub num_attempts: u32,
} }
#[pymethods] #[pymethods]
impl MergeResult { impl MergeResult {
pub fn __repr__(&self) -> String { pub fn __repr__(&self) -> String {
format!( format!(
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={})", "MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={})",
self.version, self.version,
self.num_updated_rows, self.num_updated_rows,
self.num_inserted_rows, self.num_inserted_rows,
self.num_deleted_rows self.num_deleted_rows,
self.num_attempts
) )
} }
} }
@@ -156,6 +158,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_updated_rows: result.num_updated_rows, num_updated_rows: result.num_updated_rows,
num_inserted_rows: result.num_inserted_rows, num_inserted_rows: result.num_inserted_rows,
num_deleted_rows: result.num_deleted_rows, num_deleted_rows: result.num_deleted_rows,
num_attempts: result.num_attempts,
} }
} }
} }

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb" name = "lancedb"
version = "0.22.4-beta.0" version = "0.22.4-beta.1"
edition.workspace = true edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications" description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true license.workspace = true

View File

@@ -1183,6 +1183,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
num_deleted_rows: 0, num_deleted_rows: 0,
num_inserted_rows: 0, num_inserted_rows: 0,
num_updated_rows: 0, num_updated_rows: 0,
num_attempts: 0,
}); });
} }

View File

@@ -467,6 +467,11 @@ pub struct MergeResult {
/// However those rows are not shared with the user. /// However those rows are not shared with the user.
#[serde(default)] #[serde(default)]
pub num_deleted_rows: u64, pub num_deleted_rows: u64,
/// Number of attempts performed during the merge operation.
/// This includes the initial attempt plus any retries due to transaction conflicts.
/// A value of 1 means the operation succeeded on the first try.
#[serde(default)]
pub num_attempts: u32,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
@@ -2531,6 +2536,7 @@ impl BaseTable for NativeTable {
num_updated_rows: stats.num_updated_rows, num_updated_rows: stats.num_updated_rows,
num_inserted_rows: stats.num_inserted_rows, num_inserted_rows: stats.num_inserted_rows,
num_deleted_rows: stats.num_deleted_rows, num_deleted_rows: stats.num_deleted_rows,
num_attempts: stats.num_attempts,
}) })
} }
@@ -2990,9 +2996,13 @@ mod tests {
// Perform a "insert if not exists" // Perform a "insert if not exists"
let mut merge_insert_builder = table.merge_insert(&["i"]); let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_not_matched_insert_all(); merge_insert_builder.when_not_matched_insert_all();
merge_insert_builder.execute(new_batches).await.unwrap(); let result = merge_insert_builder.execute(new_batches).await.unwrap();
// Only 5 rows should actually be inserted // Only 5 rows should actually be inserted
assert_eq!(table.count_rows(None).await.unwrap(), 15); assert_eq!(table.count_rows(None).await.unwrap(), 15);
assert_eq!(result.num_inserted_rows, 5);
assert_eq!(result.num_updated_rows, 0);
assert_eq!(result.num_deleted_rows, 0);
assert_eq!(result.num_attempts, 1);
// Create new data with i=15..25 (no id matches) // Create new data with i=15..25 (no id matches)
let new_batches = Box::new(merge_insert_test_batches(15, 2)); let new_batches = Box::new(merge_insert_test_batches(15, 2));