mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-01 01:50:39 +00:00
Compare commits
7 Commits
jack/sopho
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b70fc4c9d | ||
|
|
3a7b02119b | ||
|
|
bcbc0da090 | ||
|
|
9bead9f53d | ||
|
|
0351b77984 | ||
|
|
f6c9d31f98 | ||
|
|
a8f1c5a69f |
137
.agents/skills/lancedb-branch-ops/SKILL.md
Normal file
137
.agents/skills/lancedb-branch-ops/SKILL.md
Normal file
@@ -0,0 +1,137 @@
|
||||
---
|
||||
name: lancedb-branch-ops
|
||||
description: Branch management for LanceDB tables via the REST API. Use this skill whenever someone wants to create, delete, list, or switch branches on a LanceDB table — or needs to make sure a write (metadata update, index build, etc.) lands on a specific branch instead of main. Invoke it even without the word "branch" if context makes clear they want an experimental copy of a table, want to isolate changes, or want to confirm a mutation didn't touch main. Covers: branches/list, branches/create, branches/delete, and passing "branch" in describe/update_field_metadata/create_index to target a non-main version.
|
||||
---
|
||||
|
||||
## Goal
|
||||
|
||||
Manage branches on a LanceDB table: list what exists, create new ones, delete stale ones, and direct read/write operations at a specific branch without touching main.
|
||||
|
||||
## Step 0: Establish the connection
|
||||
|
||||
Use the `lancedb-connect` skill to resolve the base URL and auth headers (`x-api-key`, `x-lancedb-database`). Skip this only if the connection is already known from the current conversation.
|
||||
|
||||
All examples below use `{base_url}` — substitute the resolved endpoint and include the auth headers on every request.
|
||||
|
||||
## The branch model (important)
|
||||
|
||||
LanceDB branches are named snapshots that diverge from the table's current state at creation time. There is **no checkout command** — you never switch the whole table to a branch. Instead, you **pass `"branch": "<name>"` in the request body** of any operation to target that branch. Omitting the key (or sending an empty body) always targets main.
|
||||
|
||||
`branches/list` returns only non-main branches. Main always exists and is not listed.
|
||||
|
||||
## List branches
|
||||
|
||||
```http
|
||||
POST {base_url}/v1/table/{table_id}/branches/list
|
||||
Content-Type: application/json
|
||||
|
||||
{}
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"branches": {
|
||||
"experiment-reindex": {"parentVersion": 1, "createAt": 1782506085, "manifestSize": 1029}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
If `branches` is `{}`, the table has no branches besides main.
|
||||
|
||||
## Create a branch
|
||||
|
||||
```http
|
||||
POST {base_url}/v1/table/{table_id}/branches/create
|
||||
Content-Type: application/json
|
||||
|
||||
{"name": "experiment-reindex"}
|
||||
```
|
||||
|
||||
HTTP 200 with `{}` body = success. The branch is created off the table's current state on main.
|
||||
|
||||
Verify by calling `branches/list` and confirming the new name appears.
|
||||
|
||||
## Delete a branch
|
||||
|
||||
```http
|
||||
POST {base_url}/v1/table/{table_id}/branches/delete
|
||||
Content-Type: application/json
|
||||
|
||||
{"name": "stale-2024"}
|
||||
```
|
||||
|
||||
HTTP 200 with `{}` body = success. Only the branch pointer is removed — main and all row data remain intact.
|
||||
|
||||
Verify by calling `branches/list` (name gone) and `describe` with no branch param (main still responds).
|
||||
|
||||
## Operate on a specific branch
|
||||
|
||||
Pass `"branch": "<name>"` in the body of any operation to scope it to that branch:
|
||||
|
||||
**Read schema on a branch:**
|
||||
```http
|
||||
POST {base_url}/v1/table/{table_id}/describe
|
||||
Content-Type: application/json
|
||||
|
||||
{"branch": "wip-branch"}
|
||||
```
|
||||
|
||||
**Write metadata to a branch (not main):**
|
||||
```http
|
||||
POST {base_url}/v1/table/{table_id}/update_field_metadata
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"branch": "wip-branch",
|
||||
"updates": [
|
||||
{
|
||||
"path": "category",
|
||||
"metadata": {"lancedb:description": "Product category label."},
|
||||
"replace": false
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
**Build an index on a branch:**
|
||||
```http
|
||||
POST {base_url}/v1/table/{table_id}/create_index
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"branch": "wip-branch",
|
||||
"column": "category",
|
||||
"index_type": "BTREE"
|
||||
}
|
||||
```
|
||||
|
||||
## Verifying isolation
|
||||
|
||||
After writing to a branch, always confirm the change did NOT land on main:
|
||||
|
||||
```bash
|
||||
# Should show the new metadata
|
||||
curl -s -X POST {base_url}/v1/table/{table_id}/describe \
|
||||
-H "x-api-key: <key>" -H "x-lancedb-database: <db>" \
|
||||
-H "content-type: application/json" \
|
||||
-d '{"branch": "wip-branch"}'
|
||||
|
||||
# Should NOT show the new metadata
|
||||
curl -s -X POST {base_url}/v1/table/{table_id}/describe \
|
||||
-H "x-api-key: <key>" -H "x-lancedb-database: <db>" \
|
||||
-H "content-type: application/json" \
|
||||
-d '{}'
|
||||
```
|
||||
|
||||
## Quick reference
|
||||
|
||||
| Goal | Endpoint | Body |
|
||||
|------|----------|------|
|
||||
| List all branches | `branches/list` | `{}` |
|
||||
| Create a branch | `branches/create` | `{"name": "..."}` |
|
||||
| Delete a branch | `branches/delete` | `{"name": "..."}` |
|
||||
| Read schema on branch | `describe` | `{"branch": "..."}` |
|
||||
| Write metadata on branch | `update_field_metadata` | `{"branch": "...", "updates": [...]}` |
|
||||
| Build index on branch | `create_index` | `{"branch": "...", "column": ..., "index_type": ...}` |
|
||||
| Target main (default) | any endpoint | omit `"branch"` key |
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.31.0-beta.4"
|
||||
current_version = "0.31.0-beta.5"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -5299,7 +5299,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.31.0-beta.4"
|
||||
version = "0.31.0-beta.5"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
@@ -5383,7 +5383,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.31.0-beta.4"
|
||||
version = "0.31.0-beta.5"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -5408,7 +5408,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.34.0-beta.4"
|
||||
version = "0.34.0-beta.5"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.31.0-beta.4</version>
|
||||
<version>0.31.0-beta.5</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.31.0-beta.4</version>
|
||||
<version>0.31.0-beta.5</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.31.0-beta.4</version>
|
||||
<version>0.31.0-beta.5</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.31.0-beta.4"
|
||||
version = "0.31.0-beta.5"
|
||||
publish = false
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.31.0-beta.4",
|
||||
"version": "0.31.0-beta.5",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
|
||||
use lancedb::{ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
|
||||
@@ -66,11 +66,9 @@ impl NativeMergeInsertBuilder {
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
|
||||
let data = ipc_file_to_batches(buf.to_vec())
|
||||
.and_then(IntoArrow::into_arrow)
|
||||
.map_err(|e| {
|
||||
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
|
||||
})?;
|
||||
let data = ipc_file_to_batches(buf.to_vec()).map_err(|e| {
|
||||
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
|
||||
})?;
|
||||
|
||||
let this = self.clone();
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.34.0-beta.4"
|
||||
current_version = "0.34.0-beta.5"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.34.0-beta.4"
|
||||
version = "0.34.0-beta.5"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
|
||||
@@ -282,6 +282,23 @@ async def connect(
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
oauth_config: Optional[Any] = None,
|
||||
) -> Connection: ...
|
||||
def connect_namespace(
|
||||
namespace_client_impl: str,
|
||||
namespace_client_properties: Dict[str, str],
|
||||
read_consistency_interval: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
) -> Connection: ...
|
||||
def connect_namespace_client(
|
||||
namespace_client: Any,
|
||||
read_consistency_interval: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
session: Optional[Session] = None,
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
) -> Connection: ...
|
||||
|
||||
class RecordBatchStream:
|
||||
@property
|
||||
|
||||
@@ -38,15 +38,13 @@ from lance_namespace_urllib3_client.models.query_table_request_vector import (
|
||||
QueryTableRequestVector,
|
||||
)
|
||||
from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery
|
||||
from lance_namespace.errors import TableNotFoundError
|
||||
from lancedb._lancedb import connect_namespace_client as _connect_namespace_client
|
||||
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
|
||||
from lancedb._lancedb import (
|
||||
connect_namespace as _connect_namespace,
|
||||
connect_namespace_client as _connect_namespace_client,
|
||||
)
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.db import AsyncConnection, DBConnection
|
||||
from lancedb.namespace_utils import (
|
||||
_normalize_create_namespace_mode,
|
||||
_normalize_drop_namespace_mode,
|
||||
_normalize_drop_namespace_behavior,
|
||||
)
|
||||
from lance_namespace import (
|
||||
LanceNamespace,
|
||||
connect as namespace_connect,
|
||||
@@ -55,13 +53,6 @@ from lance_namespace import (
|
||||
DropNamespaceResponse,
|
||||
ListNamespacesResponse,
|
||||
ListTablesResponse,
|
||||
ListTablesRequest,
|
||||
DescribeNamespaceRequest,
|
||||
DropTableRequest,
|
||||
RenameTableRequest,
|
||||
ListNamespacesRequest,
|
||||
CreateNamespaceRequest,
|
||||
DropNamespaceRequest,
|
||||
)
|
||||
from lancedb.table import AsyncTable, LanceTable, Table
|
||||
from lancedb.util import validate_table_name
|
||||
@@ -386,6 +377,10 @@ def _builds_namespace_natively(
|
||||
return namespace_client_impl == "rest" and bool(namespace_client_properties)
|
||||
|
||||
|
||||
def _supports_native_namespace(namespace_client_impl: str) -> bool:
|
||||
return namespace_client_impl in {"dir", "rest"}
|
||||
|
||||
|
||||
class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
A LanceDB connection that uses a namespace for table management.
|
||||
@@ -396,7 +391,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
namespace_client: LanceNamespace,
|
||||
namespace_client: Optional[LanceNamespace] = None,
|
||||
*,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
@@ -404,6 +399,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
_inner: Optional[AsyncConnection] = None,
|
||||
):
|
||||
"""
|
||||
Initialize a namespace-based LanceDB connection.
|
||||
@@ -445,30 +441,36 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
self._namespace_client_impl = namespace_client_impl
|
||||
self._namespace_client_properties = namespace_client_properties
|
||||
# When the namespace client is built natively (see Rust
|
||||
# ``build_namespace_natively``), the underlying Rust table performs
|
||||
# QueryTable pushdown through the read-freshness context provider, which
|
||||
# the pure-Python ``query_table`` path bypasses.
|
||||
self._route_pushdown_to_rust = _builds_namespace_natively(
|
||||
# When the namespace connection or client is built natively in Rust, the
|
||||
# underlying Rust table performs QueryTable pushdown through the
|
||||
# read-freshness context provider, which the pure-Python ``query_table``
|
||||
# path bypasses.
|
||||
self._route_pushdown_to_rust = _inner is not None or _builds_namespace_natively(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=self.storage_options or None,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=(
|
||||
list(self._namespace_client_pushdown_operations)
|
||||
),
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
if _inner is not None:
|
||||
self._inner = _inner
|
||||
else:
|
||||
if namespace_client is None:
|
||||
raise ValueError("namespace_client is required without a native _inner")
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=self.storage_options or None,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=(
|
||||
list(self._namespace_client_pushdown_operations)
|
||||
),
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
)
|
||||
)
|
||||
)
|
||||
self._uri = self._inner.uri
|
||||
|
||||
@override
|
||||
def serialize(self) -> str:
|
||||
@@ -514,11 +516,11 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
)
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
return LOOP.run(
|
||||
self._inner.table_names(
|
||||
namespace_path=namespace_path, start_after=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return response.tables if response.tables else []
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
@@ -589,8 +591,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "Table not found" in str(e):
|
||||
except (RuntimeError, ValueError) as e:
|
||||
if "Table not found" in str(e) or "was not found" in str(e):
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
@@ -612,12 +614,9 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
|
||||
@override
|
||||
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
# Use namespace drop_table directly
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
table_id = namespace_path + [name]
|
||||
request = DropTableRequest(id=table_id)
|
||||
self._namespace_client.drop_table(request)
|
||||
LOOP.run(self._inner.drop_table(name, namespace_path=namespace_path))
|
||||
|
||||
@override
|
||||
def rename_table(
|
||||
@@ -631,14 +630,19 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
cur_namespace_path = []
|
||||
if new_namespace_path is None:
|
||||
new_namespace_path = []
|
||||
cur_table_id = cur_namespace_path + [cur_name]
|
||||
new_namespace_id = new_namespace_path if new_namespace_path else None
|
||||
request = RenameTableRequest(
|
||||
id=cur_table_id,
|
||||
new_table_name=new_name,
|
||||
new_namespace_id=new_namespace_id,
|
||||
)
|
||||
self._namespace_client.rename_table(request)
|
||||
try:
|
||||
LOOP.run(
|
||||
self._inner.rename_table(
|
||||
cur_name,
|
||||
new_name,
|
||||
cur_namespace_path=cur_namespace_path,
|
||||
new_namespace_path=new_namespace_path,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "rename_table not implemented" in str(e):
|
||||
raise NotImplementedError("rename_table not implemented") from e
|
||||
raise
|
||||
|
||||
@override
|
||||
def drop_database(self):
|
||||
@@ -650,8 +654,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
def drop_all_tables(self, namespace_path: Optional[List[str]] = None):
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
for table_name in self.table_names(namespace_path=namespace_path):
|
||||
self.drop_table(table_name, namespace_path=namespace_path)
|
||||
LOOP.run(self._inner.drop_all_tables(namespace_path=namespace_path))
|
||||
|
||||
@override
|
||||
def list_namespaces(
|
||||
@@ -681,13 +684,10 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListNamespacesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_namespaces(request)
|
||||
return ListNamespacesResponse(
|
||||
namespaces=response.namespaces if response.namespaces else [],
|
||||
page_token=response.page_token,
|
||||
return LOOP.run(
|
||||
self._inner.list_namespaces(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -715,14 +715,12 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
CreateNamespaceResponse
|
||||
Response containing the properties of the created namespace.
|
||||
"""
|
||||
request = CreateNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_create_namespace_mode(mode),
|
||||
properties=properties,
|
||||
)
|
||||
response = self._namespace_client.create_namespace(request)
|
||||
return CreateNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
return LOOP.run(
|
||||
self._inner.create_namespace(
|
||||
namespace_path=namespace_path,
|
||||
mode=mode,
|
||||
properties=properties,
|
||||
)
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -750,20 +748,18 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
DropNamespaceResponse
|
||||
Response containing properties and transaction_id if applicable.
|
||||
"""
|
||||
request = DropNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_drop_namespace_mode(mode),
|
||||
behavior=_normalize_drop_namespace_behavior(behavior),
|
||||
)
|
||||
response = self._namespace_client.drop_namespace(request)
|
||||
return DropNamespaceResponse(
|
||||
properties=(
|
||||
response.properties if hasattr(response, "properties") else None
|
||||
),
|
||||
transaction_id=(
|
||||
response.transaction_id if hasattr(response, "transaction_id") else None
|
||||
),
|
||||
)
|
||||
try:
|
||||
return LOOP.run(
|
||||
self._inner.drop_namespace(
|
||||
namespace_path=namespace_path,
|
||||
mode=mode,
|
||||
behavior=behavior,
|
||||
)
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "Namespace not empty" in str(e):
|
||||
raise NamespaceNotEmptyError(str(e)) from e
|
||||
raise
|
||||
|
||||
@override
|
||||
def describe_namespace(
|
||||
@@ -782,11 +778,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
DescribeNamespaceResponse
|
||||
Response containing the namespace properties.
|
||||
"""
|
||||
request = DescribeNamespaceRequest(id=namespace_path)
|
||||
response = self._namespace_client.describe_namespace(request)
|
||||
return DescribeNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
return LOOP.run(self._inner.describe_namespace(namespace_path))
|
||||
|
||||
@override
|
||||
def list_tables(
|
||||
@@ -816,13 +808,10 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return ListTablesResponse(
|
||||
tables=response.tables if response.tables else [],
|
||||
page_token=response.page_token,
|
||||
return LOOP.run(
|
||||
self._inner.list_tables(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
)
|
||||
|
||||
def _lance_table_from_uri(
|
||||
@@ -878,6 +867,18 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
LanceNamespace
|
||||
The namespace client for this connection.
|
||||
"""
|
||||
if self._namespace_client is None:
|
||||
if (
|
||||
self._namespace_client_impl is None
|
||||
or self._namespace_client_properties is None
|
||||
):
|
||||
raise ValueError(
|
||||
"Cannot construct a Python namespace client without "
|
||||
"namespace implementation properties"
|
||||
)
|
||||
self._namespace_client = namespace_connect(
|
||||
self._namespace_client_impl, self._namespace_client_properties
|
||||
)
|
||||
return self._namespace_client
|
||||
|
||||
|
||||
@@ -891,7 +892,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
namespace_client: LanceNamespace,
|
||||
namespace_client: Optional[LanceNamespace] = None,
|
||||
*,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
@@ -899,6 +900,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
namespace_client_pushdown_operations: Optional[List[str]] = None,
|
||||
namespace_client_impl: Optional[str] = None,
|
||||
namespace_client_properties: Optional[Dict[str, str]] = None,
|
||||
_inner: Optional[AsyncConnection] = None,
|
||||
):
|
||||
"""
|
||||
Initialize an async namespace-based LanceDB connection.
|
||||
@@ -940,29 +942,35 @@ class AsyncLanceNamespaceDBConnection:
|
||||
)
|
||||
self._namespace_client_impl = namespace_client_impl
|
||||
self._namespace_client_properties = namespace_client_properties
|
||||
# See LanceNamespaceDBConnection: when built natively the Rust table runs
|
||||
# QueryTable pushdown through the read-freshness provider, so defer to it
|
||||
# rather than the urllib3 client (which omits x-lancedb-min-timestamp).
|
||||
self._route_pushdown_to_rust = _builds_namespace_natively(
|
||||
# See LanceNamespaceDBConnection: when Rust owns the namespace
|
||||
# connection/client, its table performs QueryTable pushdown through the
|
||||
# read-freshness provider, so defer to it rather than the urllib3 client
|
||||
# path (which omits x-lancedb-min-timestamp).
|
||||
self._route_pushdown_to_rust = _inner is not None or _builds_namespace_natively(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=self.storage_options or None,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=(
|
||||
list(self._namespace_client_pushdown_operations)
|
||||
),
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
if _inner is not None:
|
||||
self._inner = _inner
|
||||
else:
|
||||
if namespace_client is None:
|
||||
raise ValueError("namespace_client is required without a native _inner")
|
||||
self._inner = AsyncConnection(
|
||||
_connect_namespace_client(
|
||||
namespace_client,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=self.storage_options or None,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=(
|
||||
list(self._namespace_client_pushdown_operations)
|
||||
),
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
async def table_names(
|
||||
self,
|
||||
@@ -986,11 +994,9 @@ class AsyncLanceNamespaceDBConnection:
|
||||
)
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
return await self._inner.table_names(
|
||||
namespace_path=namespace_path, start_after=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return response.tables if response.tables else []
|
||||
|
||||
async def create_table(
|
||||
self,
|
||||
@@ -1053,8 +1059,8 @@ class AsyncLanceNamespaceDBConnection:
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "Table not found" in str(e):
|
||||
except (RuntimeError, ValueError) as e:
|
||||
if "Table not found" in str(e) or "was not found" in str(e):
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
@@ -1075,9 +1081,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
"""Drop a table from the namespace."""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
table_id = namespace_path + [name]
|
||||
request = DropTableRequest(id=table_id)
|
||||
self._namespace_client.drop_table(request)
|
||||
await self._inner.drop_table(name, namespace_path=namespace_path)
|
||||
|
||||
async def rename_table(
|
||||
self,
|
||||
@@ -1091,14 +1095,17 @@ class AsyncLanceNamespaceDBConnection:
|
||||
cur_namespace_path = []
|
||||
if new_namespace_path is None:
|
||||
new_namespace_path = []
|
||||
cur_table_id = cur_namespace_path + [cur_name]
|
||||
new_namespace_id = new_namespace_path if new_namespace_path else None
|
||||
request = RenameTableRequest(
|
||||
id=cur_table_id,
|
||||
new_table_name=new_name,
|
||||
new_namespace_id=new_namespace_id,
|
||||
)
|
||||
self._namespace_client.rename_table(request)
|
||||
try:
|
||||
await self._inner.rename_table(
|
||||
cur_name,
|
||||
new_name,
|
||||
cur_namespace_path=cur_namespace_path,
|
||||
new_namespace_path=new_namespace_path,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "rename_table not implemented" in str(e):
|
||||
raise NotImplementedError("rename_table not implemented") from e
|
||||
raise
|
||||
|
||||
async def drop_database(self):
|
||||
"""Deprecated method."""
|
||||
@@ -1110,9 +1117,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
"""Drop all tables in the namespace."""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
table_names = await self.table_names(namespace_path=namespace_path)
|
||||
for table_name in table_names:
|
||||
await self.drop_table(table_name, namespace_path=namespace_path)
|
||||
await self._inner.drop_all_tables(namespace_path=namespace_path)
|
||||
|
||||
async def list_namespaces(
|
||||
self,
|
||||
@@ -1141,13 +1146,8 @@ class AsyncLanceNamespaceDBConnection:
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListNamespacesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_namespaces(request)
|
||||
return ListNamespacesResponse(
|
||||
namespaces=response.namespaces if response.namespaces else [],
|
||||
page_token=response.page_token,
|
||||
return await self._inner.list_namespaces(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
|
||||
async def create_namespace(
|
||||
@@ -1174,15 +1174,11 @@ class AsyncLanceNamespaceDBConnection:
|
||||
CreateNamespaceResponse
|
||||
Response containing the properties of the created namespace.
|
||||
"""
|
||||
request = CreateNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_create_namespace_mode(mode),
|
||||
return await self._inner.create_namespace(
|
||||
namespace_path=namespace_path,
|
||||
mode=mode,
|
||||
properties=properties,
|
||||
)
|
||||
response = self._namespace_client.create_namespace(request)
|
||||
return CreateNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
|
||||
async def drop_namespace(
|
||||
self,
|
||||
@@ -1208,20 +1204,16 @@ class AsyncLanceNamespaceDBConnection:
|
||||
DropNamespaceResponse
|
||||
Response containing properties and transaction_id if applicable.
|
||||
"""
|
||||
request = DropNamespaceRequest(
|
||||
id=namespace_path,
|
||||
mode=_normalize_drop_namespace_mode(mode),
|
||||
behavior=_normalize_drop_namespace_behavior(behavior),
|
||||
)
|
||||
response = self._namespace_client.drop_namespace(request)
|
||||
return DropNamespaceResponse(
|
||||
properties=(
|
||||
response.properties if hasattr(response, "properties") else None
|
||||
),
|
||||
transaction_id=(
|
||||
response.transaction_id if hasattr(response, "transaction_id") else None
|
||||
),
|
||||
)
|
||||
try:
|
||||
return await self._inner.drop_namespace(
|
||||
namespace_path=namespace_path,
|
||||
mode=mode,
|
||||
behavior=behavior,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "Namespace not empty" in str(e):
|
||||
raise NamespaceNotEmptyError(str(e)) from e
|
||||
raise
|
||||
|
||||
async def describe_namespace(
|
||||
self, namespace_path: List[str]
|
||||
@@ -1239,11 +1231,7 @@ class AsyncLanceNamespaceDBConnection:
|
||||
DescribeNamespaceResponse
|
||||
Response containing the namespace properties.
|
||||
"""
|
||||
request = DescribeNamespaceRequest(id=namespace_path)
|
||||
response = self._namespace_client.describe_namespace(request)
|
||||
return DescribeNamespaceResponse(
|
||||
properties=response.properties if hasattr(response, "properties") else None
|
||||
)
|
||||
return await self._inner.describe_namespace(namespace_path)
|
||||
|
||||
async def list_tables(
|
||||
self,
|
||||
@@ -1272,13 +1260,8 @@ class AsyncLanceNamespaceDBConnection:
|
||||
"""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
request = ListTablesRequest(
|
||||
id=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
response = self._namespace_client.list_tables(request)
|
||||
return ListTablesResponse(
|
||||
tables=response.tables if response.tables else [],
|
||||
page_token=response.page_token,
|
||||
return await self._inner.list_tables(
|
||||
namespace_path=namespace_path, page_token=page_token, limit=limit
|
||||
)
|
||||
|
||||
async def namespace_client(self) -> LanceNamespace:
|
||||
@@ -1292,6 +1275,18 @@ class AsyncLanceNamespaceDBConnection:
|
||||
LanceNamespace
|
||||
The namespace client for this connection.
|
||||
"""
|
||||
if self._namespace_client is None:
|
||||
if (
|
||||
self._namespace_client_impl is None
|
||||
or self._namespace_client_properties is None
|
||||
):
|
||||
raise ValueError(
|
||||
"Cannot construct a Python namespace client without "
|
||||
"namespace implementation properties"
|
||||
)
|
||||
self._namespace_client = namespace_connect(
|
||||
self._namespace_client_impl, self._namespace_client_properties
|
||||
)
|
||||
return self._namespace_client
|
||||
|
||||
|
||||
@@ -1342,6 +1337,32 @@ def connect_namespace(
|
||||
LanceNamespaceDBConnection
|
||||
A namespace-based connection to LanceDB
|
||||
"""
|
||||
if _supports_native_namespace(namespace_client_impl):
|
||||
inner = AsyncConnection(
|
||||
_connect_namespace(
|
||||
namespace_client_impl,
|
||||
namespace_client_properties,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
)
|
||||
)
|
||||
return LanceNamespaceDBConnection(
|
||||
namespace_client=None,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
_inner=inner,
|
||||
)
|
||||
|
||||
namespace_client = namespace_connect(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
@@ -1417,6 +1438,32 @@ def connect_namespace_async(
|
||||
... tables = await db.table_names()
|
||||
... table = await db.create_table("my_table", schema=schema)
|
||||
"""
|
||||
if _supports_native_namespace(namespace_client_impl):
|
||||
inner = AsyncConnection(
|
||||
_connect_namespace(
|
||||
namespace_client_impl,
|
||||
namespace_client_properties,
|
||||
read_consistency_interval=(
|
||||
read_consistency_interval.total_seconds()
|
||||
if read_consistency_interval is not None
|
||||
else None
|
||||
),
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
)
|
||||
)
|
||||
return AsyncLanceNamespaceDBConnection(
|
||||
namespace_client=None,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
storage_options=storage_options,
|
||||
session=session,
|
||||
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
|
||||
namespace_client_impl=namespace_client_impl,
|
||||
namespace_client_properties=namespace_client_properties,
|
||||
_inner=inner,
|
||||
)
|
||||
|
||||
namespace_client = namespace_connect(
|
||||
namespace_client_impl, namespace_client_properties
|
||||
)
|
||||
|
||||
@@ -2142,12 +2142,19 @@ class LanceTable(Table):
|
||||
|
||||
branch = self.current_branch()
|
||||
version = None if branch is not None else self.version
|
||||
if self._namespace_client is not None:
|
||||
namespace_client = self._namespace_client
|
||||
if namespace_client is None:
|
||||
conn_uri = getattr(self._conn, "uri", "")
|
||||
if get_uri_scheme(conn_uri) == "namespace":
|
||||
namespace_client = self._conn.namespace_client()
|
||||
self._namespace_client = namespace_client
|
||||
|
||||
if namespace_client is not None:
|
||||
table_id = self._namespace_path + [self.name]
|
||||
ds = lance.dataset(
|
||||
version=version,
|
||||
storage_options=self._conn.storage_options,
|
||||
namespace_client=self._namespace_client,
|
||||
namespace_client=namespace_client,
|
||||
table_id=table_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
import tempfile
|
||||
import shutil
|
||||
import importlib
|
||||
import pytest
|
||||
import pyarrow as pa
|
||||
import lancedb
|
||||
@@ -103,6 +104,40 @@ class TestNamespaceConnection:
|
||||
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
|
||||
assert len(list(db.table_names())) == 0
|
||||
|
||||
def test_sync_builtin_namespace_uses_rust_without_python_client(self, monkeypatch):
|
||||
"""Built-in sync namespace connections should not construct or call the
|
||||
Python namespace client for normal namespace/table management."""
|
||||
namespace_module = importlib.import_module("lancedb.namespace")
|
||||
|
||||
def fail_namespace_connect(*args, **kwargs):
|
||||
raise AssertionError("Python namespace client should not be constructed")
|
||||
|
||||
monkeypatch.setattr(
|
||||
namespace_module, "namespace_connect", fail_namespace_connect
|
||||
)
|
||||
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
|
||||
assert db._namespace_client is None
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
db.create_namespace(["test_ns"])
|
||||
assert "test_ns" in db.list_namespaces().namespaces
|
||||
|
||||
schema = pa.schema([pa.field("id", pa.int64())])
|
||||
table = db.create_table("test_table", schema=schema, namespace_path=["test_ns"])
|
||||
assert table.namespace == ["test_ns"]
|
||||
assert "test_table" in db.table_names(namespace_path=["test_ns"])
|
||||
assert "test_table" in db.list_tables(namespace_path=["test_ns"]).tables
|
||||
|
||||
opened = db.open_table("test_table", namespace_path=["test_ns"])
|
||||
assert opened.namespace == ["test_ns"]
|
||||
|
||||
db.drop_table("test_table", namespace_path=["test_ns"])
|
||||
assert db.list_tables(namespace_path=["test_ns"]).tables == []
|
||||
db.drop_namespace(["test_ns"])
|
||||
assert "test_ns" not in db.list_namespaces().namespaces
|
||||
|
||||
def test_create_table_through_namespace(self):
|
||||
"""Test creating a table through namespace."""
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
@@ -564,6 +599,61 @@ class TestAsyncNamespaceConnection:
|
||||
table_names = await db.table_names()
|
||||
assert len(list(table_names)) == 0
|
||||
|
||||
async def test_async_builtin_namespace_uses_rust_without_python_client(
|
||||
self, monkeypatch
|
||||
):
|
||||
"""Built-in async namespace connections should not construct or call the
|
||||
Python namespace client for normal namespace/table management."""
|
||||
namespace_module = importlib.import_module("lancedb.namespace")
|
||||
|
||||
def fail_namespace_connect(*args, **kwargs):
|
||||
raise AssertionError("Python namespace client should not be constructed")
|
||||
|
||||
monkeypatch.setattr(
|
||||
namespace_module, "namespace_connect", fail_namespace_connect
|
||||
)
|
||||
|
||||
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
|
||||
assert isinstance(db, lancedb.AsyncLanceNamespaceDBConnection)
|
||||
assert db._namespace_client is None
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
await db.create_namespace(["test_ns"])
|
||||
assert "test_ns" in (await db.list_namespaces()).namespaces
|
||||
|
||||
schema = pa.schema([pa.field("id", pa.int64())])
|
||||
table = await db.create_table(
|
||||
"test_table", schema=schema, namespace_path=["test_ns"]
|
||||
)
|
||||
assert table._namespace_path == ["test_ns"]
|
||||
assert table._namespace_client is None
|
||||
assert table._route_pushdown_to_rust is True
|
||||
assert "test_table" in await db.table_names(namespace_path=["test_ns"])
|
||||
assert "test_table" in (await db.list_tables(namespace_path=["test_ns"])).tables
|
||||
|
||||
opened = await db.open_table("test_table", namespace_path=["test_ns"])
|
||||
assert opened._namespace_path == ["test_ns"]
|
||||
|
||||
await db.drop_table("test_table", namespace_path=["test_ns"])
|
||||
assert (await db.list_tables(namespace_path=["test_ns"])).tables == []
|
||||
await db.drop_namespace(["test_ns"])
|
||||
assert "test_ns" not in (await db.list_namespaces()).namespaces
|
||||
|
||||
async def test_async_namespace_client_is_lazy(self):
|
||||
"""namespace_client() should still return the backing client on demand."""
|
||||
pytest.importorskip("lance")
|
||||
from lance.namespace import DirectoryNamespace
|
||||
|
||||
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
|
||||
assert db._namespace_client is None
|
||||
|
||||
ns_client = await db.namespace_client()
|
||||
|
||||
assert isinstance(ns_client, DirectoryNamespace)
|
||||
namespace_id = ns_client.namespace_id().replace("\\\\", "\\")
|
||||
assert str(self.temp_dir) in namespace_id
|
||||
assert db._namespace_client is ns_client
|
||||
|
||||
# Async connect via namespace helper is not enabled yet.
|
||||
|
||||
async def test_create_table_async(self):
|
||||
@@ -818,10 +908,11 @@ class TestPushdownOperations:
|
||||
)
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
def test_route_pushdown_to_rust_false_for_dir(self):
|
||||
"""A non-native (dir) connection keeps the Python pushdown path."""
|
||||
def test_route_pushdown_to_rust_for_native_dir(self):
|
||||
"""The sync dir connection is natively built and defers QueryTable
|
||||
pushdown to Rust."""
|
||||
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
|
||||
assert db._route_pushdown_to_rust is False
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
def test_async_route_pushdown_to_rust_for_native_rest(self):
|
||||
"""The async connection must not silently bypass the read-freshness fix:
|
||||
@@ -834,10 +925,11 @@ class TestPushdownOperations:
|
||||
)
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
def test_async_route_pushdown_to_rust_false_for_dir(self):
|
||||
"""The async non-native (dir) connection keeps the Python pushdown path."""
|
||||
def test_async_route_pushdown_to_rust_for_native_dir(self):
|
||||
"""The async dir connection is natively built and defers QueryTable
|
||||
pushdown to Rust."""
|
||||
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
|
||||
assert db._route_pushdown_to_rust is False
|
||||
assert db._route_pushdown_to_rust is True
|
||||
|
||||
def test_lance_table_to_arrow_uses_query_pushdown(self):
|
||||
namespace_client = _NamespaceClient()
|
||||
|
||||
@@ -1137,6 +1137,16 @@ def test_namespace_open_table_with_branch_version(tmp_path):
|
||||
assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3
|
||||
|
||||
|
||||
def test_namespace_root_table_to_lance_uses_namespace_client(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace("dir", {"root": str(tmp_path)})
|
||||
table = db.create_table("t", [{"i": 0}])
|
||||
|
||||
assert table._namespace_client is None
|
||||
assert table.to_lance().count_rows() == 1
|
||||
assert table._namespace_client is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch_version(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
|
||||
@@ -655,6 +655,46 @@ pub fn connect_namespace_client(
|
||||
)))
|
||||
}
|
||||
|
||||
#[pyfunction]
|
||||
#[pyo3(signature = (
|
||||
namespace_client_impl,
|
||||
namespace_client_properties,
|
||||
read_consistency_interval=None,
|
||||
storage_options=None,
|
||||
session=None,
|
||||
namespace_client_pushdown_operations=None,
|
||||
))]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn connect_namespace(
|
||||
namespace_client_impl: String,
|
||||
namespace_client_properties: HashMap<String, String>,
|
||||
read_consistency_interval: Option<f64>,
|
||||
storage_options: Option<HashMap<String, String>>,
|
||||
session: Option<crate::session::Session>,
|
||||
namespace_client_pushdown_operations: Option<Vec<String>>,
|
||||
) -> PyResult<Connection> {
|
||||
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
|
||||
let namespace_client_pushdown_operations =
|
||||
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
|
||||
|
||||
let mut builder =
|
||||
lancedb::connect_namespace(&namespace_client_impl, namespace_client_properties)
|
||||
.pushdown_operations(namespace_client_pushdown_operations);
|
||||
if let Some(storage_options) = storage_options {
|
||||
builder = builder.storage_options(storage_options);
|
||||
}
|
||||
if let Some(read_consistency_interval) = read_consistency_interval {
|
||||
builder = builder.read_consistency_interval(read_consistency_interval);
|
||||
}
|
||||
if let Some(session) = session {
|
||||
builder = builder.session(session.inner.clone());
|
||||
}
|
||||
|
||||
Ok(Connection::new(
|
||||
crate::runtime::block_on(builder.execute()).infer_error()?,
|
||||
))
|
||||
}
|
||||
|
||||
/// Whether to build the namespace natively (from impl + properties) instead of
|
||||
/// wrapping a pre-built client. Native construction is required for the
|
||||
/// read-freshness provider to be installed
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
use arrow::RecordBatchStream;
|
||||
use connection::{Connection, connect, connect_namespace_client};
|
||||
use connection::{Connection, connect, connect_namespace, connect_namespace_client};
|
||||
use env_logger::Env;
|
||||
use expr::{PyExpr, expr_col, expr_func, expr_lit};
|
||||
use index::IndexConfig;
|
||||
@@ -62,6 +62,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<PyPermutationReader>()?;
|
||||
m.add_class::<PyExpr>()?;
|
||||
m.add_function(wrap_pyfunction!(connect, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(connect_namespace, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(connect_namespace_client, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(permutation::async_permutation_builder, m)?)?;
|
||||
m.add_function(wrap_pyfunction!(util::validate_table_name, m)?)?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.31.0-beta.4"
|
||||
version = "0.31.0-beta.5"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
@@ -166,6 +166,10 @@ required-features = ["bedrock"]
|
||||
[[example]]
|
||||
name = "simple"
|
||||
|
||||
[[example]]
|
||||
name = "polars"
|
||||
required-features = ["polars"]
|
||||
|
||||
[[example]]
|
||||
name = "full_text_search"
|
||||
|
||||
|
||||
47
rust/lancedb/examples/polars.rs
Normal file
47
rust/lancedb/examples/polars.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! This example demonstrates ingesting a Polars DataFrame into LanceDB and
|
||||
//! reading it back out as a Polars DataFrame.
|
||||
|
||||
use lancedb::arrow::IntoPolars;
|
||||
use lancedb::query::ExecutableQuery;
|
||||
use lancedb::{Result, connect};
|
||||
use polars::prelude::{DataFrame, NamedFrom, Series};
|
||||
|
||||
fn make_dataframe() -> DataFrame {
|
||||
let ids = Series::new("id", &[1i32, 2, 3, 4, 5]);
|
||||
let names = Series::new("name", &["Alice", "Bob", "Carol", "Dave", "Eve"]);
|
||||
let scores = Series::new("score", &[9.5f64, 8.1, 7.3, 9.0, 6.5]);
|
||||
DataFrame::new(vec![ids, names, scores]).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
|
||||
// Ingest a Polars DataFrame directly — DataFrame now implements Scannable.
|
||||
let df = make_dataframe();
|
||||
println!("Input DataFrame:\n{df}");
|
||||
|
||||
let table = db.create_table("people", df).execute().await?;
|
||||
|
||||
// Append more rows.
|
||||
let more = DataFrame::new(vec![
|
||||
Series::new("id", &[6i32, 7]),
|
||||
Series::new("name", &["Frank", "Grace"]),
|
||||
Series::new("score", &[7.8f64, 8.9]),
|
||||
])
|
||||
.unwrap();
|
||||
table.add(more).execute().await?;
|
||||
|
||||
// Read back as a Polars DataFrame.
|
||||
let result_df = table.query().execute().await?.into_polars().await?;
|
||||
|
||||
println!(
|
||||
"\nRound-tripped DataFrame ({} rows):\n{result_df}",
|
||||
result_df.height()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -112,54 +112,14 @@ impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> RecordBatchStream
|
||||
|
||||
/// A trait for converting incoming data to Arrow
|
||||
///
|
||||
/// Integrations should implement this trait to allow data to be
|
||||
/// imported directly from the integration. For example, implementing
|
||||
/// this trait for `Vec<Vec<...>>` would allow the `Vec` to be directly
|
||||
/// used in methods like [`crate::connection::Connection::create_table`]
|
||||
/// or [`crate::table::Table::add`]
|
||||
pub trait IntoArrow {
|
||||
/// Convert the data into an iterator of Arrow batches
|
||||
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>>;
|
||||
}
|
||||
|
||||
pub type BoxedRecordBatchReader = Box<dyn arrow_array::RecordBatchReader + Send>;
|
||||
|
||||
impl<T: arrow_array::RecordBatchReader + Send + 'static> IntoArrow for T {
|
||||
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>> {
|
||||
Ok(Box::new(self))
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for converting incoming data to Arrow asynchronously
|
||||
///
|
||||
/// Serves the same purpose as [`IntoArrow`], but for asynchronous data.
|
||||
///
|
||||
/// Note: Arrow has no async equivalent to RecordBatchReader and so
|
||||
pub trait IntoArrowStream {
|
||||
/// Convert the data into a stream of Arrow batches
|
||||
fn into_arrow(self) -> Result<SendableRecordBatchStream>;
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> SimpleRecordBatchStream<S> {
|
||||
pub fn new(stream: S, schema: Arc<arrow_schema::Schema>) -> Self {
|
||||
Self { schema, stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoArrowStream for SendableRecordBatchStream {
|
||||
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoArrowStream for datafusion_physical_plan::SendableRecordBatchStream {
|
||||
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
|
||||
let schema = self.schema();
|
||||
let stream = self.map_err(|df_err| df_err.into());
|
||||
Ok(Box::pin(SimpleRecordBatchStream::new(stream, schema)))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait LanceDbDatagenExt {
|
||||
fn into_ldb_stream(
|
||||
self,
|
||||
@@ -264,9 +224,7 @@ impl IntoPolars for SendableRecordBatchStream {
|
||||
#[cfg(all(test, feature = "polars"))]
|
||||
mod tests {
|
||||
use super::SendableRecordBatchStream;
|
||||
use crate::arrow::{
|
||||
IntoArrow, IntoPolars, PolarsDataFrameRecordBatchReader, SimpleRecordBatchStream,
|
||||
};
|
||||
use crate::arrow::{IntoPolars, PolarsDataFrameRecordBatchReader, SimpleRecordBatchStream};
|
||||
use polars::prelude::{DataFrame, NamedFrom, Series};
|
||||
|
||||
fn get_record_batch_reader_from_polars() -> Box<dyn arrow_array::RecordBatchReader + Send> {
|
||||
@@ -280,10 +238,7 @@ mod tests {
|
||||
float_series = Series::new("float", &[2.0]);
|
||||
let df2 = DataFrame::new(vec![string_series, int_series, float_series]).unwrap();
|
||||
|
||||
PolarsDataFrameRecordBatchReader::new(df1.vstack(&df2).unwrap())
|
||||
.unwrap()
|
||||
.into_arrow()
|
||||
.unwrap()
|
||||
Box::new(PolarsDataFrameRecordBatchReader::new(df1.vstack(&df2).unwrap()).unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -185,6 +185,43 @@ impl Scannable for SendableRecordBatchStream {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "polars")]
|
||||
impl Scannable for polars::frame::DataFrame {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
crate::polars_arrow_convertors::convert_polars_df_schema_to_arrow_rb_schema(
|
||||
self.schema().clone(),
|
||||
)
|
||||
.expect("failed to convert Polars DataFrame schema to Arrow schema")
|
||||
}
|
||||
|
||||
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
|
||||
let schema = Scannable::schema(self);
|
||||
let batches: crate::Result<Vec<RecordBatch>> =
|
||||
match crate::arrow::PolarsDataFrameRecordBatchReader::new(self.clone()) {
|
||||
Err(e) => Err(e),
|
||||
Ok(reader) => reader.map(|b| b.map_err(Into::into)).collect(),
|
||||
};
|
||||
match batches {
|
||||
Err(e) => Box::pin(SimpleRecordBatchStream {
|
||||
schema,
|
||||
stream: once(async move { Err(e) }),
|
||||
}),
|
||||
Ok(batches) => {
|
||||
let stream = futures::stream::iter(batches.into_iter().map(Ok));
|
||||
Box::pin(SimpleRecordBatchStream { schema, stream })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn num_rows(&self) -> Option<usize> {
|
||||
Some(self.height())
|
||||
}
|
||||
|
||||
fn rescannable(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StreamingWriteSource for Box<dyn Scannable> {
|
||||
fn arrow_schema(&self) -> SchemaRef {
|
||||
@@ -1089,4 +1126,60 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "polars")]
|
||||
mod polars_tests {
|
||||
use super::*;
|
||||
use crate::arrow::IntoPolars;
|
||||
use crate::query::ExecutableQuery;
|
||||
use polars::prelude::{DataFrame, NamedFrom, Series};
|
||||
|
||||
fn make_df() -> DataFrame {
|
||||
DataFrame::new(vec![
|
||||
Series::new("id", &[1i32, 2, 3]),
|
||||
Series::new("val", &[1.1f64, 2.2, 3.3]),
|
||||
])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dataframe_scannable_round_trip() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db = crate::connect(tmp.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let df = make_df();
|
||||
let table = db.create_table("t", df.clone()).execute().await.unwrap();
|
||||
|
||||
// Append the same rows again.
|
||||
table.add(df.clone()).execute().await.unwrap();
|
||||
|
||||
let result = table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.into_polars()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.height(), df.height() * 2);
|
||||
assert_eq!(result.schema(), df.schema());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dataframe_scannable_rescannable() {
|
||||
let mut df = make_df();
|
||||
assert!(df.rescannable());
|
||||
|
||||
let batches1: Vec<RecordBatch> = df.scan_as_stream().try_collect().await.unwrap();
|
||||
assert_eq!(batches1.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
|
||||
|
||||
// Can be scanned again.
|
||||
let batches2: Vec<RecordBatch> = df.scan_as_stream().try_collect().await.unwrap();
|
||||
assert_eq!(batches2.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,18 +70,29 @@ use tokio::sync::RwLock;
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
|
||||
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
||||
const MIN_READ_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-read-version");
|
||||
const VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-version");
|
||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||
const INDEX_TYPE_KEY: &str = "index_type";
|
||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
|
||||
/// `x-lancedb-min-timestamp`) sent on read requests.
|
||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version`,
|
||||
/// `x-lancedb-min-timestamp`, and `x-lancedb-min-read-version`) sent on read
|
||||
/// requests.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessState {
|
||||
/// Provides read-your-write within a single handle: writes that return a
|
||||
/// version update this, and reads send it as `x-lancedb-min-version`.
|
||||
min_version: Option<u64>,
|
||||
/// Highest dataset version observed in a *read* response on this handle.
|
||||
/// Reads send it as `x-lancedb-min-read-version` so a load-balanced query
|
||||
/// node whose cache is behind this version must refresh before serving,
|
||||
/// giving monotonic reads across nodes regardless of which one the load
|
||||
/// balancer routes to. Sourced only from reads (always committed dataset
|
||||
/// versions), never from writes (which may return WAL entry ids), so it is
|
||||
/// unaffected by the WAL/version mismatch that retired `min_version`.
|
||||
min_read_version: Option<u64>,
|
||||
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`]
|
||||
/// call. Subsequent reads send
|
||||
/// `max(baseline, now - read_consistency_interval)` as
|
||||
@@ -102,6 +113,7 @@ struct FreshnessState {
|
||||
struct FreshnessHeaders {
|
||||
min_version: Option<u64>,
|
||||
min_timestamp: Option<SystemTime>,
|
||||
min_read_version: Option<u64>,
|
||||
}
|
||||
|
||||
impl FreshnessHeaders {
|
||||
@@ -113,6 +125,9 @@ impl FreshnessHeaders {
|
||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
||||
}
|
||||
if let Some(v) = self.min_read_version {
|
||||
request = request.header(MIN_READ_VERSION_HEADER, v.to_string());
|
||||
}
|
||||
request
|
||||
}
|
||||
}
|
||||
@@ -884,6 +899,7 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
self.client.read_consistency_interval,
|
||||
SystemTime::now(),
|
||||
),
|
||||
min_read_version: state.min_read_version,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -905,6 +921,30 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
|
||||
}
|
||||
|
||||
/// Record a dataset version observed in a *read* response so subsequent
|
||||
/// reads request at least this version via `x-lancedb-min-read-version`,
|
||||
/// giving monotonic reads across load-balanced query nodes. A returned `0`
|
||||
/// (or absent header from an old server) is ignored.
|
||||
fn track_read_version(&self, version: u64) {
|
||||
if version == 0 {
|
||||
return;
|
||||
}
|
||||
let mut state = self.freshness.lock().unwrap();
|
||||
state.min_read_version = Some(state.min_read_version.map_or(version, |v| v.max(version)));
|
||||
}
|
||||
|
||||
/// Parse the `x-lancedb-version` response header (the dataset version a read
|
||||
/// reflects) and fold it into the read-version watermark.
|
||||
fn track_read_version_from_headers(&self, headers: &reqwest::header::HeaderMap) {
|
||||
if let Some(version) = headers
|
||||
.get(&VERSION_HEADER)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<u64>().ok())
|
||||
{
|
||||
self.track_read_version(version);
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_query(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
@@ -928,6 +968,7 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
|
||||
let futures = requests.into_iter().map(|req| async move {
|
||||
let (request_id, response) = self.send(req, true).await?;
|
||||
self.track_read_version_from_headers(response.headers());
|
||||
self.read_arrow_stream(&request_id, response).await
|
||||
});
|
||||
let streams = futures::future::try_join_all(futures);
|
||||
@@ -1545,11 +1586,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = None;
|
||||
drop(write_guard);
|
||||
|
||||
// Drop any per-handle write tracking; subsequent reads use the
|
||||
// Drop any per-handle read/write tracking; subsequent reads use the
|
||||
// baseline timestamp captured now to guarantee freshness.
|
||||
*self.freshness.lock().unwrap() = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(SystemTime::now()),
|
||||
min_read_version: None,
|
||||
};
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
@@ -1805,6 +1847,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
};
|
||||
|
||||
self.track_read_version_from_headers(response.headers());
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
@@ -7124,6 +7167,7 @@ mod tests {
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
min_read_version: None,
|
||||
};
|
||||
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
||||
|
||||
@@ -7148,6 +7192,7 @@ mod tests {
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
min_read_version: None,
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
||||
@@ -7159,6 +7204,7 @@ mod tests {
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(recent_baseline),
|
||||
min_read_version: None,
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
||||
@@ -7303,6 +7349,106 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
/// A handler that records every request's headers and answers each read with
|
||||
/// an `x-lancedb-version` response header taken from `versions` (by call
|
||||
/// index, saturating at the last entry). An empty string means "no header".
|
||||
fn read_version_handler(
|
||||
versions: &'static [&'static str],
|
||||
) -> (
|
||||
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
|
||||
Arc<std::sync::Mutex<Vec<http::HeaderMap>>>,
|
||||
) {
|
||||
let requests = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
let requests_c = requests.clone();
|
||||
let call = Arc::new(AtomicUsize::new(0));
|
||||
let handler = move |request: reqwest::Request| {
|
||||
requests_c.lock().unwrap().push(request.headers().clone());
|
||||
let i = call.fetch_add(1, Ordering::SeqCst).min(versions.len() - 1);
|
||||
let mut builder = http::Response::builder().status(200);
|
||||
if !versions[i].is_empty() {
|
||||
builder = builder.header("x-lancedb-version", versions[i]);
|
||||
}
|
||||
builder.body("42".to_string()).unwrap()
|
||||
};
|
||||
(handler, requests)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_watermark_tracked_and_sent() {
|
||||
let (handler, requests) = read_version_handler(&["100", "100"]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
// First read has no watermark yet; the response advertises version 100,
|
||||
// so the second read must floor the server at 100.
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert!(!reqs[0].contains_key("x-lancedb-min-read-version"));
|
||||
assert_eq!(
|
||||
reqs[1]
|
||||
.get("x-lancedb-min-read-version")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
"100"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_watermark_keeps_max() {
|
||||
// Server reports 100 then a stale 50; the watermark must not regress.
|
||||
let (handler, requests) = read_version_handler(&["100", "50", "50"]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert_eq!(
|
||||
reqs[2]
|
||||
.get("x-lancedb-min-read-version")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
"100"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_absent_header_no_watermark() {
|
||||
// An old server that doesn't return the version header leaves the
|
||||
// watermark unset, preserving backward compatibility.
|
||||
let (handler, requests) = read_version_handler(&[""]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert!(!reqs[1].contains_key("x-lancedb-min-read-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_version_watermark_reset_on_checkout_latest() {
|
||||
let (handler, requests) = read_version_handler(&["100", "100"]);
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
table.checkout_latest().await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
// The read after checkout_latest starts from a clean slate.
|
||||
let reqs = requests.lock().unwrap();
|
||||
assert!(
|
||||
!reqs
|
||||
.last()
|
||||
.unwrap()
|
||||
.contains_key("x-lancedb-min-read-version")
|
||||
);
|
||||
}
|
||||
|
||||
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
||||
/// from every request so tests can assert on a specific endpoint.
|
||||
#[allow(clippy::type_complexity)]
|
||||
|
||||
Reference in New Issue
Block a user