Compare commits

..

1 Commits

Author SHA1 Message Date
lancedb automation
b9249881bb chore: update lance dependency to v8.0.0-rc.3 2026-06-30 13:42:40 +00:00
18 changed files with 217 additions and 609 deletions

105
Cargo.lock generated
View File

@@ -1297,6 +1297,15 @@ version = "2.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3"
[[package]]
name = "bitpacking"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96a7139abd3d9cebf8cd6f920a389cf3dc9576172e32f4563f188cae3c3eb019"
dependencies = [
"crunchy",
]
[[package]]
name = "bitvec"
version = "1.0.1"
@@ -3423,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4726,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arc-swap",
"arrow",
@@ -4745,6 +4754,7 @@ dependencies = [
"async_cell",
"aws-credential-types",
"aws-sdk-dynamodb",
"bitpacking",
"byteorder",
"bytes",
"chrono",
@@ -4761,9 +4771,8 @@ dependencies = [
"futures",
"half",
"humantime",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-arrow",
"lance-bitpacking",
"lance-core",
"lance-datafusion",
"lance-encoding",
@@ -4801,8 +4810,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4823,7 +4832,7 @@ dependencies = [
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4837,7 +4846,7 @@ dependencies = [
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4846,19 +4855,18 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrayref",
"crunchy",
"paste",
"seq-macro",
]
[[package]]
name = "lance-core"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4870,7 +4878,7 @@ dependencies = [
"datafusion-common",
"datafusion-sql",
"futures",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-arrow",
"lance-derive",
"libc",
@@ -4896,8 +4904,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow",
"arrow-array",
@@ -4927,8 +4935,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow",
"arrow-array",
@@ -4945,8 +4953,8 @@ dependencies = [
[[package]]
name = "lance-derive"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"proc-macro2",
"quote",
@@ -4955,8 +4963,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4972,7 +4980,7 @@ dependencies = [
"futures",
"hex",
"hyperloglogplus",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-arrow",
"lance-bitpacking",
"lance-core",
@@ -4991,8 +4999,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -5022,8 +5030,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arc-swap",
"arrow",
@@ -5035,6 +5043,7 @@ dependencies = [
"async-channel",
"async-recursion",
"async-trait",
"bitpacking",
"bitvec",
"bytes",
"chrono",
@@ -5047,12 +5056,11 @@ dependencies = [
"fst",
"futures",
"half",
"itertools 0.14.0",
"itertools 0.13.0",
"jieba-rs",
"jsonb",
"lance-arrow",
"lance-arrow-stats",
"lance-bitpacking",
"lance-core",
"lance-datafusion",
"lance-datagen",
@@ -5088,8 +5096,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow",
"arrow-arith",
@@ -5130,8 +5138,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5142,13 +5150,12 @@ dependencies = [
"lance-core",
"num-traits",
"rand 0.9.4",
"rayon",
]
[[package]]
name = "lance-namespace"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow",
"async-trait",
@@ -5160,8 +5167,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5215,15 +5222,15 @@ dependencies = [
[[package]]
name = "lance-select"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-schema",
"byteorder",
"bytes",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-core",
"roaring",
"tracing",
@@ -5231,8 +5238,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow",
"arrow-array",
@@ -5271,8 +5278,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5285,8 +5292,8 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "9.0.0-beta.10"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.10#e25b71e74b89d10c57b412d111bde087117383f3"
version = "8.0.0-rc.3"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.3#2b4775ff3973c7f945f5bc104f2cbba5825f1beb"
dependencies = [
"icu_segmenter",
"jieba-rs",

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=9.0.0-beta.10", default-features = false, "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=9.0.0-beta.10", default-features = false, "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=9.0.0-beta.10", default-features = false, "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=9.0.0-beta.10", "tag" = "v9.0.0-beta.10", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=8.0.0-rc.3", default-features = false, "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-rc.3", default-features = false, "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-rc.3", default-features = false, "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-rc.3", "tag" = "v8.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>9.0.0-beta.10</lance-core.version>
<lance-core.version>8.0.0-rc.3</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -3,7 +3,7 @@
use std::time::Duration;
use lancedb::{ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
use napi::bindgen_prelude::*;
use napi_derive::napi;
@@ -66,9 +66,11 @@ impl NativeMergeInsertBuilder {
#[napi(catch_unwind)]
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
let data = ipc_file_to_batches(buf.to_vec()).map_err(|e| {
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
})?;
let data = ipc_file_to_batches(buf.to_vec())
.and_then(IntoArrow::into_arrow)
.map_err(|e| {
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
})?;
let this = self.clone();

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.34.0-beta.5"
current_version = "0.34.0-beta.4"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.34.0-beta.5"
version = "0.34.0-beta.4"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -282,23 +282,6 @@ async def connect(
namespace_client_properties: Optional[Dict[str, str]] = None,
oauth_config: Optional[Any] = None,
) -> Connection: ...
def connect_namespace(
namespace_client_impl: str,
namespace_client_properties: Dict[str, str],
read_consistency_interval: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
) -> Connection: ...
def connect_namespace_client(
namespace_client: Any,
read_consistency_interval: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> Connection: ...
class RecordBatchStream:
@property

View File

@@ -38,11 +38,8 @@ from lance_namespace_urllib3_client.models.query_table_request_vector import (
QueryTableRequestVector,
)
from lance_namespace_urllib3_client.models.string_fts_query import StringFtsQuery
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
from lancedb._lancedb import (
connect_namespace as _connect_namespace,
connect_namespace_client as _connect_namespace_client,
)
from lance_namespace.errors import TableNotFoundError
from lancedb._lancedb import connect_namespace_client as _connect_namespace_client
from lancedb.background_loop import LOOP
from lancedb.db import AsyncConnection, DBConnection
from lancedb.namespace_utils import (
@@ -389,10 +386,6 @@ def _builds_namespace_natively(
return namespace_client_impl == "rest" and bool(namespace_client_properties)
def _supports_native_sync_namespace(namespace_client_impl: str) -> bool:
return namespace_client_impl in {"dir", "rest"}
class LanceNamespaceDBConnection(DBConnection):
"""
A LanceDB connection that uses a namespace for table management.
@@ -403,7 +396,7 @@ class LanceNamespaceDBConnection(DBConnection):
def __init__(
self,
namespace_client: Optional[LanceNamespace] = None,
namespace_client: LanceNamespace,
*,
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
@@ -411,8 +404,6 @@ class LanceNamespaceDBConnection(DBConnection):
namespace_client_pushdown_operations: Optional[List[str]] = None,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
_inner: Optional[AsyncConnection] = None,
_route_pushdown_to_rust: Optional[bool] = None,
):
"""
Initialize a namespace-based LanceDB connection.
@@ -458,36 +449,26 @@ class LanceNamespaceDBConnection(DBConnection):
# ``build_namespace_natively``), the underlying Rust table performs
# QueryTable pushdown through the read-freshness context provider, which
# the pure-Python ``query_table`` path bypasses.
self._route_pushdown_to_rust = (
_route_pushdown_to_rust
if _route_pushdown_to_rust is not None
else _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
self._route_pushdown_to_rust = _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
)
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=self.storage_options or None,
session=session,
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
)
)
if _inner is not None:
self._inner = _inner
else:
if namespace_client is None:
raise ValueError("namespace_client is required without a native _inner")
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=self.storage_options or None,
session=session,
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
)
)
self._uri = self._inner.uri
@override
def serialize(self) -> str:
@@ -533,11 +514,11 @@ class LanceNamespaceDBConnection(DBConnection):
)
if namespace_path is None:
namespace_path = []
return LOOP.run(
self._inner.table_names(
namespace_path=namespace_path, start_after=page_token, limit=limit
)
request = ListTablesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
response = self._namespace_client.list_tables(request)
return response.tables if response.tables else []
@override
def create_table(
@@ -608,8 +589,8 @@ class LanceNamespaceDBConnection(DBConnection):
index_cache_size=index_cache_size,
)
)
except (RuntimeError, ValueError) as e:
if "Table not found" in str(e) or "was not found" in str(e):
except RuntimeError as e:
if "Table not found" in str(e):
table_id = namespace_path + [name]
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
raise
@@ -631,9 +612,12 @@ class LanceNamespaceDBConnection(DBConnection):
@override
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
# Use namespace drop_table directly
if namespace_path is None:
namespace_path = []
LOOP.run(self._inner.drop_table(name, namespace_path=namespace_path))
table_id = namespace_path + [name]
request = DropTableRequest(id=table_id)
self._namespace_client.drop_table(request)
@override
def rename_table(
@@ -647,19 +631,14 @@ class LanceNamespaceDBConnection(DBConnection):
cur_namespace_path = []
if new_namespace_path is None:
new_namespace_path = []
try:
LOOP.run(
self._inner.rename_table(
cur_name,
new_name,
cur_namespace_path=cur_namespace_path,
new_namespace_path=new_namespace_path,
)
)
except RuntimeError as e:
if "rename_table not implemented" in str(e):
raise NotImplementedError("rename_table not implemented") from e
raise
cur_table_id = cur_namespace_path + [cur_name]
new_namespace_id = new_namespace_path if new_namespace_path else None
request = RenameTableRequest(
id=cur_table_id,
new_table_name=new_name,
new_namespace_id=new_namespace_id,
)
self._namespace_client.rename_table(request)
@override
def drop_database(self):
@@ -671,7 +650,8 @@ class LanceNamespaceDBConnection(DBConnection):
def drop_all_tables(self, namespace_path: Optional[List[str]] = None):
if namespace_path is None:
namespace_path = []
LOOP.run(self._inner.drop_all_tables(namespace_path=namespace_path))
for table_name in self.table_names(namespace_path=namespace_path):
self.drop_table(table_name, namespace_path=namespace_path)
@override
def list_namespaces(
@@ -701,10 +681,13 @@ class LanceNamespaceDBConnection(DBConnection):
"""
if namespace_path is None:
namespace_path = []
return LOOP.run(
self._inner.list_namespaces(
namespace_path=namespace_path, page_token=page_token, limit=limit
)
request = ListNamespacesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
response = self._namespace_client.list_namespaces(request)
return ListNamespacesResponse(
namespaces=response.namespaces if response.namespaces else [],
page_token=response.page_token,
)
@override
@@ -732,12 +715,14 @@ class LanceNamespaceDBConnection(DBConnection):
CreateNamespaceResponse
Response containing the properties of the created namespace.
"""
return LOOP.run(
self._inner.create_namespace(
namespace_path=namespace_path,
mode=mode,
properties=properties,
)
request = CreateNamespaceRequest(
id=namespace_path,
mode=_normalize_create_namespace_mode(mode),
properties=properties,
)
response = self._namespace_client.create_namespace(request)
return CreateNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@override
@@ -765,18 +750,20 @@ class LanceNamespaceDBConnection(DBConnection):
DropNamespaceResponse
Response containing properties and transaction_id if applicable.
"""
try:
return LOOP.run(
self._inner.drop_namespace(
namespace_path=namespace_path,
mode=mode,
behavior=behavior,
)
)
except RuntimeError as e:
if "Namespace not empty" in str(e):
raise NamespaceNotEmptyError(str(e)) from e
raise
request = DropNamespaceRequest(
id=namespace_path,
mode=_normalize_drop_namespace_mode(mode),
behavior=_normalize_drop_namespace_behavior(behavior),
)
response = self._namespace_client.drop_namespace(request)
return DropNamespaceResponse(
properties=(
response.properties if hasattr(response, "properties") else None
),
transaction_id=(
response.transaction_id if hasattr(response, "transaction_id") else None
),
)
@override
def describe_namespace(
@@ -795,7 +782,11 @@ class LanceNamespaceDBConnection(DBConnection):
DescribeNamespaceResponse
Response containing the namespace properties.
"""
return LOOP.run(self._inner.describe_namespace(namespace_path))
request = DescribeNamespaceRequest(id=namespace_path)
response = self._namespace_client.describe_namespace(request)
return DescribeNamespaceResponse(
properties=response.properties if hasattr(response, "properties") else None
)
@override
def list_tables(
@@ -825,10 +816,13 @@ class LanceNamespaceDBConnection(DBConnection):
"""
if namespace_path is None:
namespace_path = []
return LOOP.run(
self._inner.list_tables(
namespace_path=namespace_path, page_token=page_token, limit=limit
)
request = ListTablesRequest(
id=namespace_path, page_token=page_token, limit=limit
)
response = self._namespace_client.list_tables(request)
return ListTablesResponse(
tables=response.tables if response.tables else [],
page_token=response.page_token,
)
def _lance_table_from_uri(
@@ -884,18 +878,6 @@ class LanceNamespaceDBConnection(DBConnection):
LanceNamespace
The namespace client for this connection.
"""
if self._namespace_client is None:
if (
self._namespace_client_impl is None
or self._namespace_client_properties is None
):
raise ValueError(
"Cannot construct a Python namespace client without "
"namespace implementation properties"
)
self._namespace_client = namespace_connect(
self._namespace_client_impl, self._namespace_client_properties
)
return self._namespace_client
@@ -1360,33 +1342,6 @@ def connect_namespace(
LanceNamespaceDBConnection
A namespace-based connection to LanceDB
"""
if _supports_native_sync_namespace(namespace_client_impl):
inner = AsyncConnection(
_connect_namespace(
namespace_client_impl,
namespace_client_properties,
read_consistency_interval=(
read_consistency_interval.total_seconds()
if read_consistency_interval is not None
else None
),
storage_options=storage_options,
session=session,
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
)
)
return LanceNamespaceDBConnection(
namespace_client=None,
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
_inner=inner,
_route_pushdown_to_rust=True,
)
namespace_client = namespace_connect(
namespace_client_impl, namespace_client_properties
)

View File

@@ -2142,19 +2142,12 @@ class LanceTable(Table):
branch = self.current_branch()
version = None if branch is not None else self.version
namespace_client = self._namespace_client
if namespace_client is None:
conn_uri = getattr(self._conn, "uri", "")
if get_uri_scheme(conn_uri) == "namespace":
namespace_client = self._conn.namespace_client()
self._namespace_client = namespace_client
if namespace_client is not None:
if self._namespace_client is not None:
table_id = self._namespace_path + [self.name]
ds = lance.dataset(
version=version,
storage_options=self._conn.storage_options,
namespace_client=namespace_client,
namespace_client=self._namespace_client,
table_id=table_id,
**kwargs,
)

View File

@@ -5,7 +5,6 @@
import tempfile
import shutil
import importlib
import pytest
import pyarrow as pa
import lancedb
@@ -104,40 +103,6 @@ class TestNamespaceConnection:
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
assert len(list(db.table_names())) == 0
def test_sync_builtin_namespace_uses_rust_without_python_client(self, monkeypatch):
"""Built-in sync namespace connections should not construct or call the
Python namespace client for normal namespace/table management."""
namespace_module = importlib.import_module("lancedb.namespace")
def fail_namespace_connect(*args, **kwargs):
raise AssertionError("Python namespace client should not be constructed")
monkeypatch.setattr(
namespace_module, "namespace_connect", fail_namespace_connect
)
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
assert isinstance(db, lancedb.LanceNamespaceDBConnection)
assert db._namespace_client is None
assert db._route_pushdown_to_rust is True
db.create_namespace(["test_ns"])
assert "test_ns" in db.list_namespaces().namespaces
schema = pa.schema([pa.field("id", pa.int64())])
table = db.create_table("test_table", schema=schema, namespace_path=["test_ns"])
assert table.namespace == ["test_ns"]
assert "test_table" in db.table_names(namespace_path=["test_ns"])
assert "test_table" in db.list_tables(namespace_path=["test_ns"]).tables
opened = db.open_table("test_table", namespace_path=["test_ns"])
assert opened.namespace == ["test_ns"]
db.drop_table("test_table", namespace_path=["test_ns"])
assert db.list_tables(namespace_path=["test_ns"]).tables == []
db.drop_namespace(["test_ns"])
assert "test_ns" not in db.list_namespaces().namespaces
def test_create_table_through_namespace(self):
"""Test creating a table through namespace."""
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
@@ -853,11 +818,10 @@ class TestPushdownOperations:
)
assert db._route_pushdown_to_rust is True
def test_route_pushdown_to_rust_for_native_dir(self):
"""The sync dir connection is natively built and defers QueryTable
pushdown to Rust."""
def test_route_pushdown_to_rust_false_for_dir(self):
"""A non-native (dir) connection keeps the Python pushdown path."""
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
assert db._route_pushdown_to_rust is True
assert db._route_pushdown_to_rust is False
def test_async_route_pushdown_to_rust_for_native_rest(self):
"""The async connection must not silently bypass the read-freshness fix:

View File

@@ -1137,16 +1137,6 @@ def test_namespace_open_table_with_branch_version(tmp_path):
assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3
def test_namespace_root_table_to_lance_uses_namespace_client(tmp_path):
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
db = lancedb.connect_namespace("dir", {"root": str(tmp_path)})
table = db.create_table("t", [{"i": 0}])
assert table._namespace_client is None
assert table.to_lance().count_rows() == 1
assert table._namespace_client is not None
@pytest.mark.asyncio
async def test_async_namespace_open_table_with_branch_version(tmp_path):
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace

View File

@@ -655,46 +655,6 @@ pub fn connect_namespace_client(
)))
}
#[pyfunction]
#[pyo3(signature = (
namespace_client_impl,
namespace_client_properties,
read_consistency_interval=None,
storage_options=None,
session=None,
namespace_client_pushdown_operations=None,
))]
#[allow(clippy::too_many_arguments)]
pub fn connect_namespace(
namespace_client_impl: String,
namespace_client_properties: HashMap<String, String>,
read_consistency_interval: Option<f64>,
storage_options: Option<HashMap<String, String>>,
session: Option<crate::session::Session>,
namespace_client_pushdown_operations: Option<Vec<String>>,
) -> PyResult<Connection> {
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
let namespace_client_pushdown_operations =
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
let mut builder =
lancedb::connect_namespace(&namespace_client_impl, namespace_client_properties)
.pushdown_operations(namespace_client_pushdown_operations);
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
if let Some(read_consistency_interval) = read_consistency_interval {
builder = builder.read_consistency_interval(read_consistency_interval);
}
if let Some(session) = session {
builder = builder.session(session.inner.clone());
}
Ok(Connection::new(
crate::runtime::block_on(builder.execute()).infer_error()?,
))
}
/// Whether to build the namespace natively (from impl + properties) instead of
/// wrapping a pre-built client. Native construction is required for the
/// read-freshness provider to be installed

View File

@@ -2,7 +2,7 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use arrow::RecordBatchStream;
use connection::{Connection, connect, connect_namespace, connect_namespace_client};
use connection::{Connection, connect, connect_namespace_client};
use env_logger::Env;
use expr::{PyExpr, expr_col, expr_func, expr_lit};
use index::IndexConfig;
@@ -62,7 +62,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyPermutationReader>()?;
m.add_class::<PyExpr>()?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add_function(wrap_pyfunction!(connect_namespace, m)?)?;
m.add_function(wrap_pyfunction!(connect_namespace_client, m)?)?;
m.add_function(wrap_pyfunction!(permutation::async_permutation_builder, m)?)?;
m.add_function(wrap_pyfunction!(util::validate_table_name, m)?)?;

View File

@@ -166,10 +166,6 @@ required-features = ["bedrock"]
[[example]]
name = "simple"
[[example]]
name = "polars"
required-features = ["polars"]
[[example]]
name = "full_text_search"

View File

@@ -1,47 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! This example demonstrates ingesting a Polars DataFrame into LanceDB and
//! reading it back out as a Polars DataFrame.
use lancedb::arrow::IntoPolars;
use lancedb::query::ExecutableQuery;
use lancedb::{Result, connect};
use polars::prelude::{DataFrame, NamedFrom, Series};
fn make_dataframe() -> DataFrame {
let ids = Series::new("id", &[1i32, 2, 3, 4, 5]);
let names = Series::new("name", &["Alice", "Bob", "Carol", "Dave", "Eve"]);
let scores = Series::new("score", &[9.5f64, 8.1, 7.3, 9.0, 6.5]);
DataFrame::new(vec![ids, names, scores]).unwrap()
}
#[tokio::main]
async fn main() -> Result<()> {
let tmp = tempfile::tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
// Ingest a Polars DataFrame directly — DataFrame now implements Scannable.
let df = make_dataframe();
println!("Input DataFrame:\n{df}");
let table = db.create_table("people", df).execute().await?;
// Append more rows.
let more = DataFrame::new(vec![
Series::new("id", &[6i32, 7]),
Series::new("name", &["Frank", "Grace"]),
Series::new("score", &[7.8f64, 8.9]),
])
.unwrap();
table.add(more).execute().await?;
// Read back as a Polars DataFrame.
let result_df = table.query().execute().await?.into_polars().await?;
println!(
"\nRound-tripped DataFrame ({} rows):\n{result_df}",
result_df.height()
);
Ok(())
}

View File

@@ -112,14 +112,54 @@ impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> RecordBatchStream
/// A trait for converting incoming data to Arrow
///
/// Integrations should implement this trait to allow data to be
/// imported directly from the integration. For example, implementing
/// this trait for `Vec<Vec<...>>` would allow the `Vec` to be directly
/// used in methods like [`crate::connection::Connection::create_table`]
/// or [`crate::table::Table::add`]
pub trait IntoArrow {
/// Convert the data into an iterator of Arrow batches
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>>;
}
pub type BoxedRecordBatchReader = Box<dyn arrow_array::RecordBatchReader + Send>;
impl<T: arrow_array::RecordBatchReader + Send + 'static> IntoArrow for T {
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>> {
Ok(Box::new(self))
}
}
/// A trait for converting incoming data to Arrow asynchronously
///
/// Serves the same purpose as [`IntoArrow`], but for asynchronous data.
///
/// Note: Arrow has no async equivalent to RecordBatchReader and so
pub trait IntoArrowStream {
/// Convert the data into a stream of Arrow batches
fn into_arrow(self) -> Result<SendableRecordBatchStream>;
}
impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> SimpleRecordBatchStream<S> {
pub fn new(stream: S, schema: Arc<arrow_schema::Schema>) -> Self {
Self { schema, stream }
}
}
impl IntoArrowStream for SendableRecordBatchStream {
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
Ok(self)
}
}
impl IntoArrowStream for datafusion_physical_plan::SendableRecordBatchStream {
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let stream = self.map_err(|df_err| df_err.into());
Ok(Box::pin(SimpleRecordBatchStream::new(stream, schema)))
}
}
pub trait LanceDbDatagenExt {
fn into_ldb_stream(
self,
@@ -224,7 +264,9 @@ impl IntoPolars for SendableRecordBatchStream {
#[cfg(all(test, feature = "polars"))]
mod tests {
use super::SendableRecordBatchStream;
use crate::arrow::{IntoPolars, PolarsDataFrameRecordBatchReader, SimpleRecordBatchStream};
use crate::arrow::{
IntoArrow, IntoPolars, PolarsDataFrameRecordBatchReader, SimpleRecordBatchStream,
};
use polars::prelude::{DataFrame, NamedFrom, Series};
fn get_record_batch_reader_from_polars() -> Box<dyn arrow_array::RecordBatchReader + Send> {
@@ -238,7 +280,10 @@ mod tests {
float_series = Series::new("float", &[2.0]);
let df2 = DataFrame::new(vec![string_series, int_series, float_series]).unwrap();
Box::new(PolarsDataFrameRecordBatchReader::new(df1.vstack(&df2).unwrap()).unwrap())
PolarsDataFrameRecordBatchReader::new(df1.vstack(&df2).unwrap())
.unwrap()
.into_arrow()
.unwrap()
}
#[test]

View File

@@ -185,43 +185,6 @@ impl Scannable for SendableRecordBatchStream {
}
}
#[cfg(feature = "polars")]
impl Scannable for polars::frame::DataFrame {
fn schema(&self) -> SchemaRef {
crate::polars_arrow_convertors::convert_polars_df_schema_to_arrow_rb_schema(
self.schema().clone(),
)
.expect("failed to convert Polars DataFrame schema to Arrow schema")
}
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
let schema = Scannable::schema(self);
let batches: crate::Result<Vec<RecordBatch>> =
match crate::arrow::PolarsDataFrameRecordBatchReader::new(self.clone()) {
Err(e) => Err(e),
Ok(reader) => reader.map(|b| b.map_err(Into::into)).collect(),
};
match batches {
Err(e) => Box::pin(SimpleRecordBatchStream {
schema,
stream: once(async move { Err(e) }),
}),
Ok(batches) => {
let stream = futures::stream::iter(batches.into_iter().map(Ok));
Box::pin(SimpleRecordBatchStream { schema, stream })
}
}
}
fn num_rows(&self) -> Option<usize> {
Some(self.height())
}
fn rescannable(&self) -> bool {
true
}
}
#[async_trait]
impl StreamingWriteSource for Box<dyn Scannable> {
fn arrow_schema(&self) -> SchemaRef {
@@ -1126,60 +1089,4 @@ mod tests {
);
}
}
#[cfg(feature = "polars")]
mod polars_tests {
use super::*;
use crate::arrow::IntoPolars;
use crate::query::ExecutableQuery;
use polars::prelude::{DataFrame, NamedFrom, Series};
fn make_df() -> DataFrame {
DataFrame::new(vec![
Series::new("id", &[1i32, 2, 3]),
Series::new("val", &[1.1f64, 2.2, 3.3]),
])
.unwrap()
}
#[tokio::test]
async fn test_dataframe_scannable_round_trip() {
let tmp = tempfile::tempdir().unwrap();
let db = crate::connect(tmp.path().to_str().unwrap())
.execute()
.await
.unwrap();
let df = make_df();
let table = db.create_table("t", df.clone()).execute().await.unwrap();
// Append the same rows again.
table.add(df.clone()).execute().await.unwrap();
let result = table
.query()
.execute()
.await
.unwrap()
.into_polars()
.await
.unwrap();
assert_eq!(result.height(), df.height() * 2);
assert_eq!(result.schema(), df.schema());
}
#[tokio::test]
async fn test_dataframe_scannable_rescannable() {
let mut df = make_df();
assert!(df.rescannable());
let batches1: Vec<RecordBatch> = df.scan_as_stream().try_collect().await.unwrap();
assert_eq!(batches1.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
// Can be scanned again.
let batches2: Vec<RecordBatch> = df.scan_as_stream().try_collect().await.unwrap();
assert_eq!(batches2.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
}
}
}

View File

@@ -70,29 +70,18 @@ use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
const MIN_READ_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-read-version");
const VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-version");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
/// Per-table state driving the freshness headers (`x-lancedb-min-version`,
/// `x-lancedb-min-timestamp`, and `x-lancedb-min-read-version`) sent on read
/// requests.
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
/// `x-lancedb-min-timestamp`) sent on read requests.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessState {
/// Provides read-your-write within a single handle: writes that return a
/// version update this, and reads send it as `x-lancedb-min-version`.
min_version: Option<u64>,
/// Highest dataset version observed in a *read* response on this handle.
/// Reads send it as `x-lancedb-min-read-version` so a load-balanced query
/// node whose cache is behind this version must refresh before serving,
/// giving monotonic reads across nodes regardless of which one the load
/// balancer routes to. Sourced only from reads (always committed dataset
/// versions), never from writes (which may return WAL entry ids), so it is
/// unaffected by the WAL/version mismatch that retired `min_version`.
min_read_version: Option<u64>,
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`]
/// call. Subsequent reads send
/// `max(baseline, now - read_consistency_interval)` as
@@ -113,7 +102,6 @@ struct FreshnessState {
struct FreshnessHeaders {
min_version: Option<u64>,
min_timestamp: Option<SystemTime>,
min_read_version: Option<u64>,
}
impl FreshnessHeaders {
@@ -125,9 +113,6 @@ impl FreshnessHeaders {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
}
if let Some(v) = self.min_read_version {
request = request.header(MIN_READ_VERSION_HEADER, v.to_string());
}
request
}
}
@@ -899,7 +884,6 @@ impl<S: HttpSend> RemoteTable<S> {
self.client.read_consistency_interval,
SystemTime::now(),
),
min_read_version: state.min_read_version,
}
}
@@ -921,30 +905,6 @@ impl<S: HttpSend> RemoteTable<S> {
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
}
/// Record a dataset version observed in a *read* response so subsequent
/// reads request at least this version via `x-lancedb-min-read-version`,
/// giving monotonic reads across load-balanced query nodes. A returned `0`
/// (or absent header from an old server) is ignored.
fn track_read_version(&self, version: u64) {
if version == 0 {
return;
}
let mut state = self.freshness.lock().unwrap();
state.min_read_version = Some(state.min_read_version.map_or(version, |v| v.max(version)));
}
/// Parse the `x-lancedb-version` response header (the dataset version a read
/// reflects) and fold it into the read-version watermark.
fn track_read_version_from_headers(&self, headers: &reqwest::header::HeaderMap) {
if let Some(version) = headers
.get(&VERSION_HEADER)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok())
{
self.track_read_version(version);
}
}
async fn execute_query(
&self,
query: &AnyQuery,
@@ -968,7 +928,6 @@ impl<S: HttpSend> RemoteTable<S> {
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.send(req, true).await?;
self.track_read_version_from_headers(response.headers());
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures);
@@ -1586,12 +1545,11 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = None;
drop(write_guard);
// Drop any per-handle read/write tracking; subsequent reads use the
// Drop any per-handle write tracking; subsequent reads use the
// baseline timestamp captured now to guarantee freshness.
*self.freshness.lock().unwrap() = FreshnessState {
min_version: None,
checkout_baseline: Some(SystemTime::now()),
min_read_version: None,
};
// Invalidate schema cache since we're switching versions
@@ -1847,7 +1805,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
};
self.track_read_version_from_headers(response.headers());
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -7167,7 +7124,6 @@ mod tests {
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
min_read_version: None,
};
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
@@ -7192,7 +7148,6 @@ mod tests {
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
min_read_version: None,
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
@@ -7204,7 +7159,6 @@ mod tests {
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(recent_baseline),
min_read_version: None,
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
@@ -7349,106 +7303,6 @@ mod tests {
);
}
/// A handler that records every request's headers and answers each read with
/// an `x-lancedb-version` response header taken from `versions` (by call
/// index, saturating at the last entry). An empty string means "no header".
fn read_version_handler(
versions: &'static [&'static str],
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<Vec<http::HeaderMap>>>,
) {
let requests = Arc::new(std::sync::Mutex::new(Vec::new()));
let requests_c = requests.clone();
let call = Arc::new(AtomicUsize::new(0));
let handler = move |request: reqwest::Request| {
requests_c.lock().unwrap().push(request.headers().clone());
let i = call.fetch_add(1, Ordering::SeqCst).min(versions.len() - 1);
let mut builder = http::Response::builder().status(200);
if !versions[i].is_empty() {
builder = builder.header("x-lancedb-version", versions[i]);
}
builder.body("42".to_string()).unwrap()
};
(handler, requests)
}
#[tokio::test]
async fn test_read_version_watermark_tracked_and_sent() {
let (handler, requests) = read_version_handler(&["100", "100"]);
let table = Table::new_with_handler("my_table", handler);
// First read has no watermark yet; the response advertises version 100,
// so the second read must floor the server at 100.
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
let reqs = requests.lock().unwrap();
assert!(!reqs[0].contains_key("x-lancedb-min-read-version"));
assert_eq!(
reqs[1]
.get("x-lancedb-min-read-version")
.unwrap()
.to_str()
.unwrap(),
"100"
);
}
#[tokio::test]
async fn test_read_version_watermark_keeps_max() {
// Server reports 100 then a stale 50; the watermark must not regress.
let (handler, requests) = read_version_handler(&["100", "50", "50"]);
let table = Table::new_with_handler("my_table", handler);
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
let reqs = requests.lock().unwrap();
assert_eq!(
reqs[2]
.get("x-lancedb-min-read-version")
.unwrap()
.to_str()
.unwrap(),
"100"
);
}
#[tokio::test]
async fn test_read_version_absent_header_no_watermark() {
// An old server that doesn't return the version header leaves the
// watermark unset, preserving backward compatibility.
let (handler, requests) = read_version_handler(&[""]);
let table = Table::new_with_handler("my_table", handler);
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
let reqs = requests.lock().unwrap();
assert!(!reqs[1].contains_key("x-lancedb-min-read-version"));
}
#[tokio::test]
async fn test_read_version_watermark_reset_on_checkout_latest() {
let (handler, requests) = read_version_handler(&["100", "100"]);
let table = Table::new_with_handler("my_table", handler);
table.count_rows(None).await.unwrap();
table.checkout_latest().await.unwrap();
table.count_rows(None).await.unwrap();
// The read after checkout_latest starts from a clean slate.
let reqs = requests.lock().unwrap();
assert!(
!reqs
.last()
.unwrap()
.contains_key("x-lancedb-min-read-version")
);
}
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
/// from every request so tests can assert on a specific endpoint.
#[allow(clippy::type_complexity)]