Compare commits

...

11 Commits

Author SHA1 Message Date
LanceDB Robot
c1c19cd133 chore: update lance dependency to v8.0.0-beta.17 (#3552)
Updates the Lance Rust workspace dependencies and Java lance-core
dependency to v8.0.0-beta.17.

No LanceDB compatibility code changes were required; validation passed
with cargo clippy and cargo fmt. Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v8.0.0-beta.17
2026-06-17 16:08:09 -07:00
Will Jones
ce5dadd386 fix(ci): allow shell pre-commit hooks in bumpversion configs (#3554)
The "Create release commit" workflow (`make-release-commit.yml`) has
failed on its last two runs; no release tags have been created since
June 4. Since this workflow creates the tag that the cargo/npm/pypi/java
publish workflows trigger off of, all recent releases are effectively
blocked.

The workflow installs `bump-my-version` unpinned. Version `1.4.0` added
a check that refuses to run `pre_commit_hooks` containing shell syntax
(pipes, `&&`, `if`, variable expansion) unless `allow_shell_hooks =
true` is set. Both bumpversion configs use such hooks:

- `python/.bumpversion.toml` — updates `Cargo.lock` after the bump
(fails first)
- `.bumpversion.toml` — runs `mvn versions:set` for the Java packages

The job dies at the version-bump step with:

> Hook '…' contains shell syntax (pipes, redirects, or variable
expansion). Set `allow_shell_hooks = true` in your configuration to
enable shell execution…

This sets `allow_shell_hooks = true` in both configs to restore the
previous behavior.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 15:22:05 -07:00
Armaan Sandhu
1f8ebef3cd fix(rust): return typed errors instead of panicking in Bedrock embedding path (#3512)
Closes #3506

## Problem

The Bedrock embedding compute path
(`rust/lancedb/src/embeddings/bedrock.rs`) panics instead of returning a
typed error in several places:

- `serde_json::to_vec(&request_body).unwrap()`: request serialization.
- `block_in_place(...).unwrap()`: the AWS `invoke_model` send result;
any API error terminates the worker instead of propagating.
- `v.as_f64().unwrap() as f32`: panics on non-numeric values in the
returned embedding array.
- `Handle::current()` + `block_in_place` assume a multi-threaded Tokio
runtime and panic when that assumption does not hold (no runtime, or a
current-thread runtime).

Malformed payloads, non-numeric embedding values, or an incompatible
runtime should surface as typed errors and never panic.

## Fix

- Serialize the request body before the blocking section so a
serialization failure returns `Error::Runtime` via `?`.
- Map the `invoke_model` send error to `Error::Runtime` instead of
`unwrap`.
- Add a `json_array_to_f32` helper that converts the response array to
`Vec<f32>`, returning `Error::Runtime` for a missing/non-array field or
a non-numeric element (used by both the Titan and Cohere paths).
- Add `current_multi_thread_handle()` (`Handle::try_current()` + a
`RuntimeFlavor::CurrentThread` guard) so an absent or incompatible
runtime returns a typed error rather than panicking in `block_in_place`.

Scope note: the sibling `openai.rs` provider uses the same
`block_in_place` + `block_on` bridge, so the bridge pattern itself is
kept; this change only removes the panic paths that are specific to the
Bedrock provider.

## Testing

Added 6 unit tests (no AWS credentials required):

- `json_array_to_f32`: valid numbers, non-array payload, and non-numeric
element.
- `current_multi_thread_handle`: errors with no runtime, errors on a
current-thread runtime, and succeeds on a multi-threaded runtime.

All pass; `cargo fmt` and `cargo clippy` clean. Build/test with
`--features bedrock,lance/protoc`.
2026-06-17 15:06:44 -07:00
whitewooood
217fd8491d fix(python): clarify single dictionary input error (#3537)
## Summary
- clarify the Python error for passing a single dictionary to table
creation/add paths
- add a regression test for `create_table(..., data=dict)` so it points
users to a list of dictionaries

Fixes #409

## Testing
- `python -m pytest python/tests/test_table.py -q`
- `python -m ruff format python/lancedb/table.py
python/lancedb/scannable.py python/tests/test_table.py`
- `python -m ruff check python/lancedb/table.py
python/lancedb/scannable.py python/tests/test_table.py`
2026-06-17 12:55:55 -07:00
JSap0914
9128dbcd7a fix(util): escape single quotes in struct field names in value_to_sql (#3548)
### Bug

`value_to_sql({...})` builds a DataFusion `named_struct(...)` literal
but interpolates the struct field names directly as `f"'{k}'"`. A field
name that contains a single quote therefore produces invalid SQL:

```python
>>> from lancedb.util import value_to_sql
>>> value_to_sql({"it's": 1})
"named_struct('it's', 1)"        # invalid SQL — the quote terminates the literal
```

String *values* are already escaped (single quotes doubled) by the `str`
branch of `value_to_sql`, so keys and values were handled
inconsistently. This affects `Table.update(values={...})` /
`merge_insert` when a struct column has a field name containing `'`.

### Fix

Render the key through `value_to_sql(str(k))` so field names are escaped
exactly like string values:

```python
>>> value_to_sql({"it's": 1})
"named_struct('it''s', 1)"
```

Keys without special characters are unchanged (`'a'` stays `'a'`), so
existing behavior is preserved.

### Verification

```
$ pytest python/tests/test_util.py -k value_to_sql_dict
```

The new `test_value_to_sql_dict_key_escaping` covers quoted keys (incl.
nested structs) and fails on `main` (`named_struct('it's', 1)`), passes
with this change; the existing `test_value_to_sql_dict` still passes.

Co-authored-by: JSap0914 <JSap0914@users.noreply.github.com>
2026-06-17 12:55:43 -07:00
Ghxst ☠️
394bb34fa2 fix(rust): report local write progress bytes from Lance (#3422)
Fixes #3360.

This updates native table writes so local write progress uses Lance
writer byte stats instead of Arrow in-memory batch size once write bytes
are available. The change wires the existing `WriteProgressTracker` into
`InsertExec` for native `add` writes, installs a Lance `WriteProgressFn`
only when no lower-level callback is already configured, and keeps the
existing public `InsertExec::new` signature unchanged.

Validation:
- `cargo test -p lancedb --features remote
table::write_progress::tests::test_progress_uses_lance_write_bytes_for_local_tables
-- --nocapture` passed: 1 passed, 0 failed.
- `cargo test -p lancedb --features remote table::write_progress::tests
-- --nocapture` passed: 7 passed, 0 failed.
- `cargo check --quiet --features remote --tests --examples` passed.
- `cargo fmt --all --check` passed.
- `git diff --check` passed.
- `git diff | gitleaks stdin --no-banner --redact --timeout 30` passed:
no leaks found.

I did not run the full `cargo test --quiet --features remote --tests`
suite.

Co-authored-by: Ghxst <200635707+GHX5T-SOL@users.noreply.github.com>
2026-06-17 12:05:59 -07:00
Armaan Sandhu
b2ae763254 fix(python): raise clear TypeError for bare List/Tuple in pydantic schema conversion (#3511)
Closes #3502

## Problem

A bare, unparameterised `typing.List` / `typing.Tuple` field crashes
`to_arrow_schema` with an opaque `AttributeError: __args__`:

```python
from typing import Tuple
from lancedb.pydantic import LanceModel

class Doc(LanceModel):
    items: Tuple

Doc.to_arrow_schema()  # AttributeError: __args__
```

In `_py_type_to_arrow_type`, the branch `elif getattr(py_type,
"__origin__", None) in (list, tuple)` is taken for a bare generic (its
`__origin__` is `list / tuple`), but the next line reads
`py_type.__args__[0]`, and a bare generic has no `__args__`. Other
unsupported types (e.g. `Dict[str, int]`) correctly raise a clear
`TypeError`, so this case is inconsistent.

Fix

Guard the element-type lookup with `getattr(py_type, "__args__", None)`
and raise a clear `TypeError` when it is missing, matching the existing
behavior for other unsupported types. Bare builtin list / tuple are
unaffected (their `__origin__` is `None`, so they already fall through
to the existing `TypeError`).

Testing

- Added `test_bare_generic_raises_type_error` covering both `List` and
`Tuple`.
- ruff format and ruff check clean.
2026-06-17 11:58:48 -07:00
Drew Gallardo
1bead6960c fix: pin mock clock in eventual consistency test (#3547)
This PR fixes a flaky test I hit on Windows test in #3528.

Looks like `test_eventual_consistency_background_refresh` was failing
with `v_cached` expected 1, got 2. There was a pr which swapped
`tokio::time::sleep(300ms)` for `clock::advance_by(300ms)`, which is
pretty much fine but the test necer pinned the clock so the first
`get()` locks the `cached_at` on wall time. Therefore, if our CI is
taking long enough the ttl expires before the value assertion in the
test.

So now we can add a `pin()` and call it first `get()`. After that we can
advance the clock manually with no problems.

Also, it's worth noting that I tried pinning in `BackgroundCache::new()`
first. That broke another test `test_reload_resets_consistency_timer`,
which uses real `tokio::time::sleep` and needs wall clock after
`clear_mock()`. So the pin stays in this test only. And this should
unblock us.

Failing instances:
-
https://github.com/lancedb/lancedb/actions/runs/27567527236/job/81495265474?pr=3528
-
https://github.com/lancedb/lancedb/actions/runs/27560366489/job/81470414928
2026-06-17 11:56:40 -07:00
Brendan Clement
0abf641733 feat: send read-freshness signal on the lance-namespace path (#3551)
### Description

`db://`-style connections that use the lance-namespace path
(`LanceNamespaceDatabase` → `NativeTable` + the lance-namespace REST
client) never sent a read-freshness signal. Against a server configured
to serve cached table metadata up to some staleness window, this allows
stale-read-after-write across handles and processes. The remote table
path already solved this (#3439). This brings the namespace path to
parity.

The namespace REST client doesn't let callers attach headers directly,
but it forwards a `DynamicContextProvider`'s `headers.*` context entries
as HTTP headers per request. So:

- A shared per-table baseline map is created before the namespace
client. I built and installed on the `ConnectBuilder` via a context
provider.
- On read operations the provider emits ·x-lancedb-min-timestamp =
max(baseline, now − read_consistency_interval)`
  (RFC3339), keyed by the operation's `object_id`.
- Each table handle bumps its baseline (monotonically) on
`checkout_latest()`, `restore()`, and every data/schema write.
`checkout_latest()` is the primary hook: consumers refresh a handle
there after writing elsewhere, then poll.

Read operations that carry the floor: `describe_table`,
`list_table_versions`, `query_table`, `list_tables`.
`list_table_versions` is what resolves "latest" for managed-versioning
tables (`get_latest_version`), so it's the op that makes
`checkout_latest()` actually observe a prior write.
`describe_table_version` is excluded (pinned to an immutable version).
This mirrors #3439 (timestamp baseline, `max(baseline, now − interval)`,
monotonic); no `min_version` and no body channel, since the namespace
path has no version-returning write responses.

### Testing

- Unit tests for `compute_min_timestamp` / `next_freshness_baseline` and
the provider (header at/after a bumped baseline; nothing for an empty
baseline + no interval; interval floor applies; non-read ops emit
nothing; `list_tables` uses only the interval floor).
- Verified end-to-end against a local server that honors the header:
reads carry `x-lancedb-min-timestamp`, writes don't, and read-your-write
holds.
2026-06-17 13:30:53 -04:00
Yang Cen
976edeb2ff feat(query): add approx mode to vector queries (#3549)
## Feature

### What is the new feature?

Adds Rust core API support for configuring vector query approximation
mode with `ApproxMode::{Fast, Normal, Accurate}`.

### Why do we need this feature?

Lance already exposes `lance_index::vector::ApproxMode` and scanner
support for controlling the speed/accuracy tradeoff for approximate
vector search. LanceDB Rust queries need to expose and pass this setting
through for local/native and remote vector searches.

### How does it work?

- Adds public `ApproxMode` in `rust/lancedb`, with lowercase serde,
`Default::Normal`, parse/display, and conversions to/from Lance's
`ApproxMode`.
- Adds `approx_mode: Option<ApproxMode>` to `VectorQueryRequest` and a
`VectorQuery::approx_mode(...)` builder.
- Applies the mode to native/local Lance scanners after `nearest(...)`
when explicitly set.
- Sends `approx_mode` in remote query JSON only when explicitly set;
default requests omit it.

## Validation

- `cargo fmt --all`
- `cargo test --quiet --features remote approx_mode`
- `cargo test --quiet --features remote
test_query_vector_default_values`
- `cargo check --quiet --features remote --tests --examples`
- `git diff --check`
2026-06-17 19:28:42 +08:00
Yang Cen
b46a44f873 feat(query): add approx mode to vector queries (#3549)
## Feature

### What is the new feature?

Adds Rust core API support for configuring vector query approximation
mode with `ApproxMode::{Fast, Normal, Accurate}`.

### Why do we need this feature?

Lance already exposes `lance_index::vector::ApproxMode` and scanner
support for controlling the speed/accuracy tradeoff for approximate
vector search. LanceDB Rust queries need to expose and pass this setting
through for local/native and remote vector searches.

### How does it work?

- Adds public `ApproxMode` in `rust/lancedb`, with lowercase serde,
`Default::Normal`, parse/display, and conversions to/from Lance's
`ApproxMode`.
- Adds `approx_mode: Option<ApproxMode>` to `VectorQueryRequest` and a
`VectorQuery::approx_mode(...)` builder.
- Applies the mode to native/local Lance scanners after `nearest(...)`
when explicitly set.
- Sends `approx_mode` in remote query JSON only when explicitly set;
default requests omit it.

## Validation

- `cargo fmt --all`
- `cargo test --quiet --features remote approx_mode`
- `cargo test --quiet --features remote
test_query_vector_default_values`
- `cargo check --quiet --features remote --tests --examples`
- `git diff --check`
2026-06-17 19:28:36 +08:00
25 changed files with 1152 additions and 118 deletions

View File

@@ -23,6 +23,8 @@ allow_dirty = true
commit = true
message = "Bump version: {current_version} → {new_version}"
commit_args = ""
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
allow_shell_hooks = true
# Java maven files
pre_commit_hooks = [

85
Cargo.lock generated
View File

@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arc-swap",
"arrow",
@@ -4810,8 +4810,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4832,7 +4832,7 @@ dependencies = [
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4846,7 +4846,7 @@ dependencies = [
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4855,8 +4855,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrayref",
"paste",
@@ -4865,8 +4865,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4904,8 +4904,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow",
"arrow-array",
@@ -4935,8 +4935,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow",
"arrow-array",
@@ -4953,8 +4953,8 @@ dependencies = [
[[package]]
name = "lance-derive"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"proc-macro2",
"quote",
@@ -4963,8 +4963,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4999,8 +4999,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -5030,8 +5030,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arc-swap",
"arrow",
@@ -5083,6 +5083,7 @@ dependencies = [
"rand_distr 0.5.1",
"rangemap",
"rayon",
"regex-syntax",
"roaring",
"serde",
"serde_json",
@@ -5095,8 +5096,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow",
"arrow-arith",
@@ -5137,8 +5138,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5153,8 +5154,8 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow",
"async-trait",
@@ -5166,8 +5167,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5207,9 +5208,9 @@ dependencies = [
[[package]]
name = "lance-namespace-reqwest-client"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d287494559c22838ce34e51ea0fa29dc780d5be8283de5ab33e9395623000c8"
checksum = "ba3f0a235e3ed5f8805205649ccc7d7d0f3df23ce1294242c9265ad488d7f19d"
dependencies = [
"reqwest 0.12.28",
"serde",
@@ -5221,8 +5222,8 @@ dependencies = [
[[package]]
name = "lance-select"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5237,8 +5238,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow",
"arrow-array",
@@ -5277,8 +5278,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5291,8 +5292,8 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
dependencies = [
"icu_segmenter",
"jieba-rs",

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>8.0.0-beta.14</lance-core.version>
<lance-core.version>8.0.0-beta.17</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -23,6 +23,8 @@ allow_dirty = true
commit = true
message = "Bump version: {current_version} → {new_version}"
commit_args = ""
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
allow_shell_hooks = true
# Update Cargo.lock after version bump
pre_commit_hooks = [

View File

@@ -275,7 +275,18 @@ def _py_type_to_arrow_type(py_type: Type[Any], field: FieldInfo) -> pa.DataType:
tz = get_extras(field, "tz")
return pa.timestamp("us", tz=tz)
elif getattr(py_type, "__origin__", None) in (list, tuple):
child = py_type.__args__[0]
# A bare, unparameterised ``typing.List`` / ``typing.Tuple`` matches this
# branch (its ``__origin__`` is ``list`` / ``tuple``) but has no
# ``__args__``, so we cannot infer the element type. Raise a clear
# ``TypeError`` instead of crashing with an opaque ``AttributeError``.
args = getattr(py_type, "__args__", None)
if not args:
raise TypeError(
"Converting Pydantic type to Arrow Type: unsupported type "
f"{py_type}. Specify the element type, e.g. List[int] instead "
"of a bare List."
)
child = args[0]
return _pydantic_list_child_to_arrow(child, field)
raise TypeError(
f"Converting Pydantic type to Arrow Type: unsupported type {py_type}."

View File

@@ -86,7 +86,10 @@ def _from_list(data: list) -> Scannable:
@to_scannable.register(dict)
def _from_dict(data: dict) -> Scannable:
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
raise ValueError(
"Cannot create or add rows from a single dictionary. "
"Use a list of dictionaries instead."
)
@to_scannable.register(LanceModel)

View File

@@ -243,7 +243,10 @@ def _into_pyarrow_reader(
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
if isinstance(data, dict):
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
raise ValueError(
"Cannot create or add rows from a single dictionary. "
"Use a list of dictionaries instead."
)
if isinstance(data, list):
# Handle empty list case

View File

@@ -373,9 +373,15 @@ def _(value: list):
@value_to_sql.register(dict)
def _(value: dict):
# https://datafusion.apache.org/user-guide/sql/scalar_functions.html#named-struct
# Render the field name through value_to_sql(str(...)) as well so that keys
# containing characters meaningful in SQL (e.g. a single quote) are escaped
# the same way string values are. A bare f"'{k}'" would emit invalid SQL for
# a key like "it's".
return (
"named_struct("
+ ", ".join(f"'{k}', {value_to_sql(v)}" for k, v in value.items())
+ ", ".join(
f"{value_to_sql(str(k))}, {value_to_sql(v)}" for k, v in value.items()
)
+ ")"
)

View File

@@ -188,6 +188,18 @@ def test_nested_struct_list():
assert schema == expect_schema
def test_bare_generic_raises_type_error():
# A bare, unparameterised List/Tuple has no element type to map to Arrow.
# It should raise a clear TypeError, not crash with AttributeError: __args__.
for bare in (List, Tuple):
class TestModel(pydantic.BaseModel):
items: bare
with pytest.raises(TypeError, match="unsupported type"):
pydantic_to_schema(TestModel)
def test_nested_struct_list_optional():
class SplitInfo(pydantic.BaseModel):
start_frame: int

View File

@@ -301,6 +301,16 @@ def test_create_table(mem_db: DBConnection):
assert expected == tbl
def test_create_table_rejects_single_dictionary(mem_db: DBConnection):
data = {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}
with pytest.raises(ValueError) as excep_info:
mem_db.create_table("test", data=data)
assert (
str(excep_info.value) == "Cannot create or add rows from a single dictionary. "
"Use a list of dictionaries instead."
)
def test_empty_table(mem_db: DBConnection):
schema = pa.schema(
[
@@ -330,8 +340,8 @@ def test_add_dictionary(mem_db: DBConnection):
with pytest.raises(ValueError) as excep_info:
tbl.add(data=data)
assert (
str(excep_info.value)
== "Cannot add a single dictionary to a table. Use a list."
str(excep_info.value) == "Cannot create or add rows from a single dictionary. "
"Use a list of dictionaries instead."
)

View File

@@ -149,6 +149,21 @@ def test_value_to_sql_dict():
assert value_to_sql({}) == "named_struct()"
def test_value_to_sql_dict_key_escaping():
# Struct field names that contain a single quote must be escaped (doubled)
# the same way string values are, otherwise value_to_sql emits invalid SQL
# such as named_struct('it's', 1).
assert value_to_sql({"it's": 1}) == "named_struct('it''s', 1)"
assert (
value_to_sql({"o'brien": "d'angelo"}) == "named_struct('o''brien', 'd''angelo')"
)
# Escaping also applies to keys of nested structs.
assert (
value_to_sql({"outer": {"in'r": 1}})
== "named_struct('outer', named_struct('in''r', 1))"
)
def test_value_to_sql_numpy_scalars():
# numpy scalars (e.g. pulled from an ndarray or a pandas column) must
# convert the same way as their native Python counterparts. np.float64

View File

@@ -32,6 +32,7 @@ use crate::table::{BaseTable, WriteOptions};
pub mod listing;
pub mod namespace;
pub(crate) mod read_freshness;
pub trait DatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>);

View File

@@ -4,7 +4,7 @@
//! Namespace-based database implementation that delegates table management to lance-namespace
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
@@ -29,6 +29,9 @@ use crate::database::listing::{
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::database::read_freshness::{
FreshnessBaselines, ReadFreshnessContextProvider, TableFreshness,
};
use crate::error::{Error, Result};
use crate::table::{NativeTable, map_namespace_lance_error};
use lance::dataset::WriteMode;
@@ -51,6 +54,10 @@ fn is_table_already_exists_namespace_error(err: &lance::Error) -> bool {
false
}
/// Object-id delimiter default (matches `RestNamespaceBuilder`'s); overridable
/// via the `delimiter` property.
const DEFAULT_NAMESPACE_DELIMITER: &str = "$";
/// A database implementation that uses lance-namespace for table management
pub struct LanceNamespaceDatabase {
namespace: Arc<dyn LanceNamespace>,
@@ -70,6 +77,17 @@ pub struct LanceNamespaceDatabase {
ns_properties: HashMap<String, String>,
// Options for tables created by this connection
new_table_config: NewTableConfig,
// Per-table read-freshness baselines, shared with the context provider.
freshness_baselines: FreshnessBaselines,
// Delimiter for building freshness keys; see `table_freshness`.
delimiter: String,
}
fn resolve_delimiter(ns_properties: &HashMap<String, String>) -> String {
ns_properties
.get("delimiter")
.cloned()
.unwrap_or_else(|| DEFAULT_NAMESPACE_DELIMITER.to_string())
}
impl LanceNamespaceDatabase {
@@ -82,6 +100,9 @@ impl LanceNamespaceDatabase {
session: Option<Arc<lance::session::Session>>,
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Self {
// Client is pre-built, so we can't install the freshness provider here;
// baselines are still tracked for a uniform bump path.
let delimiter = resolve_delimiter(&namespace_client_properties);
Self {
namespace: namespace_client,
storage_options,
@@ -92,6 +113,8 @@ impl LanceNamespaceDatabase {
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
new_table_config: NewTableConfig::default(),
freshness_baselines: Arc::new(Mutex::new(HashMap::new())),
delimiter,
}
}
@@ -136,10 +159,19 @@ impl LanceNamespaceDatabase {
if let Some(ref sess) = session {
builder = builder.session(sess.clone());
}
// Install the read-freshness provider before building the client.
let freshness_baselines: FreshnessBaselines = Arc::new(Mutex::new(HashMap::new()));
builder = builder.context_provider(Arc::new(ReadFreshnessContextProvider::new(
freshness_baselines.clone(),
read_consistency_interval,
)));
let namespace = builder.connect().await.map_err(|e| Error::InvalidInput {
message: format!("Failed to connect to namespace: {:?}", e),
})?;
let delimiter = resolve_delimiter(&ns_properties);
Ok(Self {
namespace,
storage_options,
@@ -150,9 +182,20 @@ impl LanceNamespaceDatabase {
ns_impl: ns_impl.to_string(),
ns_properties,
new_table_config,
freshness_baselines,
delimiter,
})
}
/// Build a table's freshness handle, keyed to match the `object_id` the
/// namespace client sends on reads (table-id parts joined by the delimiter).
fn table_freshness(&self, namespace_path: &[String], name: &str) -> TableFreshness {
let mut parts = namespace_path.to_vec();
parts.push(name.to_string());
let key = parts.join(&self.delimiter);
TableFreshness::new(self.freshness_baselines.clone(), key)
}
fn extract_storage_overrides(
&self,
request: &DbCreateTableRequest,
@@ -331,7 +374,8 @@ impl Database for LanceNamespaceDatabase {
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
.await?
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
return Ok(Arc::new(native_table));
}
@@ -462,7 +506,8 @@ impl Database for LanceNamespaceDatabase {
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
.await?
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
Ok(Arc::new(native_table))
}
@@ -478,7 +523,8 @@ impl Database for LanceNamespaceDatabase {
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
.await?
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
Ok(Arc::new(native_table))
}

View File

@@ -0,0 +1,312 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Read-freshness signaling for the lance-namespace path.
//!
//! Against a server that serves cached table metadata up to some staleness
//! window, a handle that just wrote (or asked for the latest version via
//! `checkout_latest`) can still read a stale snapshot. To prevent that, reads
//! routed through the namespace client carry an `x-lancedb-min-timestamp`
//! header naming the oldest snapshot the caller will accept.
//!
//! This mirrors `remote::table`: a per-table baseline is bumped to "now" on
//! every write and on `checkout_latest()`, and reads send
//! `max(baseline, now - read_consistency_interval)`. Since the namespace client
//! takes no headers directly, a [`DynamicContextProvider`] injects it per request.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
/// Provider context keys prefixed with `headers.` become HTTP headers (prefix
/// stripped), so this emits the `x-lancedb-min-timestamp` header.
const MIN_TIMESTAMP_CONTEXT_KEY: &str = "headers.x-lancedb-min-timestamp";
/// Per-table freshness baselines (keyed by namespace object id), shared between
/// the provider that reads them and the table handles that bump them.
pub type FreshnessBaselines = Arc<Mutex<HashMap<String, SystemTime>>>;
/// `max(baseline, now - interval)`, or `None` when neither constraint applies.
fn compute_min_timestamp(
baseline: Option<SystemTime>,
interval: Option<Duration>,
now: SystemTime,
) -> Option<SystemTime> {
let interval_based = match interval {
None => None,
Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
};
match (interval_based, baseline) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)),
}
}
/// Advance the baseline to `now`, never backwards, so a concurrent handle's
/// write can't lower a floor another handle already set.
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
match prev {
Some(p) => p.max(now),
None => now,
}
}
/// A handle's view of the shared baseline map for a single table.
#[derive(Clone, Debug)]
pub struct TableFreshness {
baselines: FreshnessBaselines,
/// Namespace object id for this table (matches the read's `object_id`).
key: String,
}
impl TableFreshness {
pub fn new(baselines: FreshnessBaselines, key: String) -> Self {
Self { baselines, key }
}
pub fn bump(&self) {
let now = SystemTime::now();
let mut baselines = self.baselines.lock().unwrap();
let prev = baselines.get(&self.key).copied();
baselines.insert(self.key.clone(), next_freshness_baseline(prev, now));
}
}
/// Read ops that can be served stale and so carry the freshness floor.
/// `list_table_versions` resolves "latest" for managed-versioning tables, so it
/// is what makes `checkout_latest()` observe a prior write.
fn is_read_operation(operation: &str) -> bool {
matches!(
operation,
"describe_table" | "list_table_versions" | "query_table" | "list_tables"
)
}
/// Injects `x-lancedb-min-timestamp` on namespace reads, per addressed table.
#[derive(Debug)]
pub struct ReadFreshnessContextProvider {
baselines: FreshnessBaselines,
read_consistency_interval: Option<Duration>,
}
impl ReadFreshnessContextProvider {
pub fn new(baselines: FreshnessBaselines, read_consistency_interval: Option<Duration>) -> Self {
Self {
baselines,
read_consistency_interval,
}
}
}
impl DynamicContextProvider for ReadFreshnessContextProvider {
fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
if !is_read_operation(&info.operation) {
return HashMap::new();
}
let baseline = self.baselines.lock().unwrap().get(&info.object_id).copied();
match compute_min_timestamp(baseline, self.read_consistency_interval, SystemTime::now()) {
Some(ts) => {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
HashMap::from([(MIN_TIMESTAMP_CONTEXT_KEY.to_string(), dt.to_rfc3339())])
}
None => HashMap::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
/// Allowed slop when comparing a header timestamp against a locally
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
const TOLERANCE: Duration = Duration::from_secs(1);
fn parse_header_ts(headers: &HashMap<String, String>) -> SystemTime {
let value = headers
.get(MIN_TIMESTAMP_CONTEXT_KEY)
.expect("expected min-timestamp context key");
chrono::DateTime::parse_from_rfc3339(value)
.unwrap()
.with_timezone(&chrono::Utc)
.into()
}
#[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now();
let baseline = now - Duration::from_secs(60);
// No interval, no baseline -> no header.
assert_eq!(compute_min_timestamp(None, None, now), None);
// Baseline only -> baseline.
assert_eq!(
compute_min_timestamp(Some(baseline), None, now),
Some(baseline)
);
// ZERO interval, no baseline -> now (strong consistency).
assert_eq!(
compute_min_timestamp(None, Some(Duration::ZERO), now),
Some(now)
);
// Positive interval, no baseline -> now - interval.
assert_eq!(
compute_min_timestamp(None, Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both: pick the more-recent (tighter) constraint.
// baseline = now-60, now-interval = now-10. now-10 is newer.
assert_eq!(
compute_min_timestamp(Some(baseline), Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both, baseline newer: pick baseline.
let recent_baseline = now - Duration::from_secs(5);
assert_eq!(
compute_min_timestamp(Some(recent_baseline), Some(Duration::from_secs(60)), now),
Some(recent_baseline)
);
}
#[test]
fn test_next_freshness_baseline_is_monotonic() {
let now = SystemTime::now();
let earlier = now - Duration::from_secs(30);
let later = now + Duration::from_secs(30);
// No prior baseline -> now.
assert_eq!(next_freshness_baseline(None, now), now);
// Prior baseline older than now -> now.
assert_eq!(next_freshness_baseline(Some(earlier), now), now);
// Prior baseline newer than now -> keep the newer baseline.
assert_eq!(next_freshness_baseline(Some(later), now), later);
}
fn provider_with(
entries: &[(&str, SystemTime)],
interval: Option<Duration>,
) -> ReadFreshnessContextProvider {
let map: HashMap<String, SystemTime> =
entries.iter().map(|(k, v)| (k.to_string(), *v)).collect();
ReadFreshnessContextProvider::new(Arc::new(Mutex::new(map)), interval)
}
#[test]
fn test_provider_emits_header_at_or_after_bumped_baseline() {
// A baseline set "now" with no interval: every read op must carry a
// floor at or after that baseline. `list_table_versions` is the hook
// that makes managed-versioning `checkout_latest()` observe a write.
let baseline = SystemTime::now();
let provider = provider_with(&[("ns$tbl", baseline)], None);
// These ops are keyed by the table id, so they pick up the per-table
// baseline. (`list_tables` is keyed by the namespace, so it is covered
// separately by the interval-floor test.)
for op in ["describe_table", "list_table_versions", "query_table"] {
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
let sent = parse_header_ts(&ctx);
assert!(
sent >= baseline - TOLERANCE && sent <= baseline + TOLERANCE,
"operation {op} should carry a floor at the bumped baseline"
);
}
}
#[test]
fn test_provider_list_tables_uses_interval_floor_not_table_baseline() {
// `list_tables` is addressed by the namespace id, which never matches a
// per-table baseline key, so a bumped table baseline must not leak onto
// it. With no interval it sends nothing; with one it sends now-interval.
let provider = provider_with(&[("ns$tbl", SystemTime::now())], None);
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
assert!(
ctx.is_empty(),
"list_tables must not inherit a per-table baseline"
);
let interval = Duration::from_secs(30);
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(interval));
let before = SystemTime::now();
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
let after = SystemTime::now();
let sent = parse_header_ts(&ctx);
assert!(
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
"list_tables should carry the interval floor"
);
}
#[test]
fn test_provider_no_header_for_empty_baseline_and_no_interval() {
// Manual consistency (no interval) on a table that was never bumped:
// no floor, so the server may serve from cache.
let provider = provider_with(&[], None);
let ctx = provider.provide_context(&OperationInfo::new("describe_table", "ns$tbl"));
assert!(ctx.is_empty());
}
#[test]
fn test_provider_interval_floor_applies_without_baseline() {
// With a consistency interval and no baseline, the floor is now-interval.
let interval = Duration::from_secs(30);
let provider = provider_with(&[], Some(interval));
let before = SystemTime::now();
let ctx = provider.provide_context(&OperationInfo::new("query_table", "ns$tbl"));
let after = SystemTime::now();
let sent = parse_header_ts(&ctx);
assert!(
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
"expected floor at roughly now - interval"
);
}
#[test]
fn test_provider_non_read_ops_emit_nothing() {
// Even with a fresh baseline and a zero interval, a non-read operation
// (which establishes rather than consumes a baseline) sends no header.
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(Duration::ZERO));
for op in [
"create_table",
"register_table",
"drop_table",
"rename_table",
// Pinned to an immutable version, so it cannot be served stale.
"describe_table_version",
] {
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
assert!(
ctx.is_empty(),
"operation {op} must not send a freshness header"
);
}
}
#[test]
fn test_provider_uses_per_table_baseline() {
// The floor is looked up by object id, so an unrelated table's baseline
// does not leak onto another table's read.
let baseline = SystemTime::now();
let provider = provider_with(&[("ns$has_baseline", baseline)], None);
// The bumped table gets a header.
let hit =
provider.provide_context(&OperationInfo::new("describe_table", "ns$has_baseline"));
assert!(!hit.is_empty());
// A different table with no baseline (and no interval) gets nothing.
let miss = provider.provide_context(&OperationInfo::new("describe_table", "ns$other"));
assert!(miss.is_empty());
}
}

View File

@@ -13,7 +13,7 @@ use serde_json::{Value, json};
use super::EmbeddingFunction;
use crate::{Error, Result};
use tokio::runtime::Handle;
use tokio::runtime::{Handle, RuntimeFlavor};
use tokio::task::block_in_place;
#[derive(Debug)]
@@ -148,6 +148,12 @@ impl BedrockEmbeddingFunction {
_ => unreachable!(),
};
// Bedrock's SDK is async but this trait method is synchronous, so we
// bridge with `block_in_place` + `block_on`. That requires a
// multi-threaded Tokio runtime; return a typed error instead of
// panicking when no compatible runtime is available.
let handle = current_multi_thread_handle()?;
for text in texts {
let request_body = match self.model {
BedrockEmbeddingModel::TitanEmbedding => {
@@ -163,24 +169,28 @@ impl BedrockEmbeddingFunction {
}
};
// Serialize before entering the blocking section so a serialization
// failure surfaces as a typed error rather than an `unwrap` panic.
let body = serde_json::to_vec(&request_body).map_err(|e| Error::Runtime {
message: format!("Failed to serialize Bedrock request: {e}"),
})?;
let client = self.client.clone();
let model_id = self.model.model_id().to_string();
let request_body = request_body.clone();
let response = block_in_place(move || {
Handle::current().block_on(async move {
let response = block_in_place(|| {
handle.block_on(async move {
client
.invoke_model()
.model_id(model_id)
.body(aws_sdk_bedrockruntime::primitives::Blob::new(
serde_json::to_vec(&request_body).unwrap(),
))
.body(aws_sdk_bedrockruntime::primitives::Blob::new(body))
.send()
.await
.map_err(Box::new)
.map_err(|e| Error::Runtime {
message: format!("Bedrock invoke_model request failed: {e}"),
})
})
})
.unwrap();
})?;
let response_json: Value =
serde_json::from_slice(response.body.as_ref()).map_err(|e| Error::Runtime {
@@ -188,22 +198,12 @@ impl BedrockEmbeddingFunction {
})?;
let embedding = match self.model {
BedrockEmbeddingModel::TitanEmbedding => response_json["embedding"]
.as_array()
.ok_or_else(|| Error::Runtime {
message: "Missing embedding in response".to_string(),
})?
.iter()
.map(|v| v.as_f64().unwrap() as f32)
.collect::<Vec<f32>>(),
BedrockEmbeddingModel::CohereLarge => response_json["embeddings"][0]
.as_array()
.ok_or_else(|| Error::Runtime {
message: "Missing embeddings in response".to_string(),
})?
.iter()
.map(|v| v.as_f64().unwrap() as f32)
.collect::<Vec<f32>>(),
BedrockEmbeddingModel::TitanEmbedding => {
json_array_to_f32(&response_json["embedding"], "embedding")?
}
BedrockEmbeddingModel::CohereLarge => {
json_array_to_f32(&response_json["embeddings"][0], "embeddings")?
}
};
builder.append_slice(&embedding);
@@ -212,3 +212,86 @@ impl BedrockEmbeddingFunction {
Ok(builder.finish())
}
}
/// Returns a handle to the current multi-threaded Tokio runtime, or a typed
/// [`Error::Runtime`] when called outside a runtime or on the current-thread
/// runtime. This keeps the synchronous-over-async bridge in
/// [`BedrockEmbeddingFunction::compute_inner`] from panicking on runtime
/// configurations that cannot support `block_in_place`.
fn current_multi_thread_handle() -> Result<Handle> {
let handle = Handle::try_current().map_err(|e| Error::Runtime {
message: format!("Bedrock embedding must be called from within a Tokio runtime: {e}"),
})?;
if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
return Err(Error::Runtime {
message: "Bedrock embedding requires a multi-threaded Tokio runtime; the \
current-thread runtime cannot use `block_in_place`"
.to_string(),
});
}
Ok(handle)
}
/// Converts a JSON value expected to be an array of numbers into `Vec<f32>`.
///
/// Returns a typed [`Error::Runtime`] (rather than panicking) when the value is
/// not an array or contains a non-numeric element, so malformed provider
/// responses degrade gracefully.
fn json_array_to_f32(value: &Value, field: &str) -> Result<Vec<f32>> {
let arr = value.as_array().ok_or_else(|| Error::Runtime {
message: format!("Missing or non-array '{field}' field in Bedrock response"),
})?;
arr.iter()
.map(|v| {
v.as_f64().map(|f| f as f32).ok_or_else(|| Error::Runtime {
message: format!("Non-numeric value in Bedrock '{field}' embedding: {v}"),
})
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn json_array_to_f32_parses_numbers() {
let v = json!([1.0, 2, -3.5]);
let out = json_array_to_f32(&v, "embedding").unwrap();
assert_eq!(out, vec![1.0_f32, 2.0, -3.5]);
}
#[test]
fn json_array_to_f32_rejects_non_array() {
// Missing field indexes to `Value::Null`; a malformed payload should be
// a typed error, not a panic.
let v = json!({"unexpected": "shape"});
let err = json_array_to_f32(&v["embedding"], "embedding").unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[test]
fn json_array_to_f32_rejects_non_numeric_element() {
let v = json!([1.0, "not-a-number", 3.0]);
let err = json_array_to_f32(&v, "embedding").unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[test]
fn handle_errors_without_runtime() {
// No Tokio runtime in scope -> typed error instead of a panic.
let err = current_multi_thread_handle().unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[tokio::test(flavor = "current_thread")]
async fn handle_errors_on_current_thread_runtime() {
let err = current_multi_thread_handle().unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_ok_on_multi_thread_runtime() {
current_multi_thread_handle().expect("multi-threaded runtime should be accepted");
}
}

View File

@@ -184,12 +184,13 @@ pub mod table;
pub mod test_utils;
pub mod utils;
use std::fmt::Display;
use std::{fmt::Display, str::FromStr};
use serde::{Deserialize, Serialize};
pub use connection::{ConnectNamespaceBuilder, Connection};
pub use error::{Error, Result};
use lance_index::vector::ApproxMode as LanceApproxMode;
use lance_linalg::distance::DistanceType as LanceDistanceType;
pub use table::Table;
@@ -258,6 +259,79 @@ impl Display for DistanceType {
}
}
/// Controls the speed / accuracy tradeoff for approximate vector search.
///
/// This currently only affects RQ-quantized vector indexes, such as IVF_RQ.
/// Other index types ignore this setting.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[non_exhaustive]
#[serde(rename_all = "lowercase")]
pub enum ApproxMode {
/// Prefer lower query latency, which can reduce recall.
Fast,
/// Use the default balance between query latency and recall.
#[default]
Normal,
/// Prefer higher recall, which can increase query latency.
Accurate,
}
impl From<ApproxMode> for LanceApproxMode {
fn from(value: ApproxMode) -> Self {
match value {
ApproxMode::Fast => Self::Fast,
ApproxMode::Normal => Self::Normal,
ApproxMode::Accurate => Self::Accurate,
}
}
}
impl From<LanceApproxMode> for ApproxMode {
fn from(value: LanceApproxMode) -> Self {
match value {
LanceApproxMode::Fast => Self::Fast,
LanceApproxMode::Normal => Self::Normal,
LanceApproxMode::Accurate => Self::Accurate,
}
}
}
impl TryFrom<&str> for ApproxMode {
type Error = Error;
fn try_from(value: &str) -> std::prelude::v1::Result<Self, Self::Error> {
Self::from_str(value)
}
}
impl FromStr for ApproxMode {
type Err = Error;
fn from_str(value: &str) -> std::prelude::v1::Result<Self, Self::Err> {
match value.to_ascii_lowercase().as_str() {
"fast" => Ok(Self::Fast),
"normal" => Ok(Self::Normal),
"accurate" => Ok(Self::Accurate),
_ => Err(Error::InvalidInput {
message: format!(
"approx_mode must be one of 'fast', 'normal', or 'accurate', got '{}'",
value
),
}),
}
}
}
impl Display for ApproxMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Fast => write!(f, "fast"),
Self::Normal => write!(f, "normal"),
Self::Accurate => write!(f, "accurate"),
}
}
}
/// Connect to a database
pub use connection::connect;
/// Connect to a namespace-backed database

View File

@@ -20,12 +20,12 @@ use lance_index::scalar::FullTextSearchQuery;
use lance_index::scalar::inverted::SCORE_COL;
use lance_index::vector::DIST_COL;
use crate::DistanceType;
use crate::error::{Error, Result};
use crate::rerankers::rrf::RRFReranker;
use crate::rerankers::{NormalizeMethod, Reranker, check_reranker_result};
use crate::table::BaseTable;
use crate::utils::{MaxBatchLengthStream, TimeoutStream};
use crate::{ApproxMode, DistanceType};
use crate::{
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
table::AnyQuery,
@@ -935,6 +935,8 @@ pub struct VectorQueryRequest {
pub refine_factor: Option<u32>,
/// The distance type to use for the search
pub distance_type: Option<DistanceType>,
/// The speed / accuracy tradeoff to use for approximate vector search
pub approx_mode: Option<ApproxMode>,
/// Default is true. Set to false to enforce a brute force search.
pub use_index: bool,
}
@@ -952,6 +954,7 @@ impl Default for VectorQueryRequest {
ef: None,
refine_factor: None,
distance_type: None,
approx_mode: None,
use_index: true,
}
}
@@ -1192,6 +1195,15 @@ impl VectorQuery {
self
}
/// Set the speed / accuracy tradeoff for approximate vector search.
///
/// This setting is currently only used by RQ-quantized indexes, such as
/// IVF_RQ. Other index types ignore this setting.
pub fn approx_mode(mut self, approx_mode: ApproxMode) -> Self {
self.request.approx_mode = Some(approx_mode);
self
}
/// If this is called then any vector index is skipped
///
/// An exhaustive (flat) search will be performed. The query vector will
@@ -1546,6 +1558,7 @@ mod tests {
.nprobes(1000)
.postfilter()
.distance_type(DistanceType::Cosine)
.approx_mode(ApproxMode::Accurate)
.refine_factor(999);
assert_eq!(
@@ -1564,9 +1577,49 @@ mod tests {
assert_eq!(query.request.maximum_nprobes, Some(1000));
assert!(query.request.use_index);
assert_eq!(query.request.distance_type, Some(DistanceType::Cosine));
assert_eq!(query.request.approx_mode, Some(ApproxMode::Accurate));
assert_eq!(query.request.refine_factor, Some(999));
}
#[test]
fn test_approx_mode_serde_parse_default_and_display() {
assert_eq!(ApproxMode::default(), ApproxMode::Normal);
assert_eq!(
serde_json::to_string(&ApproxMode::Fast).unwrap(),
"\"fast\""
);
assert_eq!(
serde_json::from_str::<ApproxMode>("\"accurate\"").unwrap(),
ApproxMode::Accurate
);
assert_eq!("normal".parse::<ApproxMode>().unwrap(), ApproxMode::Normal);
assert_eq!(ApproxMode::try_from("FAST").unwrap(), ApproxMode::Fast);
assert_eq!(ApproxMode::Accurate.to_string(), "accurate");
assert!(ApproxMode::try_from("invalid").is_err());
}
#[tokio::test]
async fn test_vector_query_approx_mode_builder() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri).execute().await.unwrap();
let table = conn
.create_table("my_table", make_test_batches())
.execute()
.await
.unwrap();
let query = table
.query()
.nearest_to(&[0.1, 0.2])
.unwrap()
.approx_mode(ApproxMode::Fast);
assert_eq!(query.request.approx_mode, Some(ApproxMode::Fast));
}
#[tokio::test]
async fn test_execute() {
// TODO: Switch back to memory://foo after https://github.com/lancedb/lancedb/issues/1051

View File

@@ -706,6 +706,9 @@ impl<S: HttpSend> RemoteTable<S> {
if let Some(distance_type) = query.distance_type {
body["distance_type"] = serde_json::json!(distance_type);
}
if let Some(approx_mode) = query.approx_mode {
body["approx_mode"] = serde_json::json!(approx_mode);
}
// In 0.23.1 we migrated from `nprobes` to `minimum_nprobes` and `maximum_nprobes`.
// Old client / new server: since minimum_nprobes is missing, fallback to nprobes
// New client / old server: old server will only see nprobes, make sure to set both
@@ -3610,6 +3613,61 @@ mod tests {
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
}
#[tokio::test]
async fn test_query_vector_approx_mode_sent_when_set() {
let expected_data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let expected_data_ref = expected_data.clone();
let table = Table::new_with_handler("my_table", move |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
let mut expected_body = serde_json::json!({
"prefilter": true,
"nprobes": 20,
"minimum_nprobes": 20,
"maximum_nprobes": 20,
"approx_mode": "accurate",
"lower_bound": Option::<f32>::None,
"upper_bound": Option::<f32>::None,
"k": 10,
"ef": Option::<usize>::None,
"refine_factor": null,
"version": null,
});
expected_body["vector"] = vec![0.1f32, 0.2, 0.3].into();
assert_eq!(body, expected_body);
let response_body = write_ipc_file(&expected_data_ref);
http::Response::builder()
.status(200)
.header(CONTENT_TYPE, ARROW_FILE_CONTENT_TYPE)
.body(response_body)
.unwrap()
});
let data = table
.query()
.nearest_to(vec![0.1, 0.2, 0.3])
.unwrap()
.approx_mode(crate::ApproxMode::Accurate)
.execute()
.await;
let data = data.unwrap().collect::<Vec<_>>().await;
assert_eq!(data.len(), 1);
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
}
#[tokio::test]
async fn test_query_fts_default_values() {
let expected_data = RecordBatch::try_new(

View File

@@ -43,6 +43,7 @@ use crate::connection::NamespaceClientPushdownOperation;
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::database::Database;
use crate::database::read_freshness::TableFreshness;
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
use crate::error::{Error, Result};
use crate::index::IndexStatistics;
@@ -1763,6 +1764,8 @@ pub struct NativeTable {
// Operations to push down to the namespace server.
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
// Read-freshness baseline; `Some` only for namespace-backed tables.
freshness: Option<TableFreshness>,
}
impl std::fmt::Debug for NativeTable {
@@ -1923,6 +1926,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -1934,6 +1938,12 @@ impl NativeTable {
self
}
/// Attach the read-freshness baseline handle (namespace connections only).
pub(crate) fn with_freshness(mut self, freshness: TableFreshness) -> Self {
self.freshness = Some(freshness);
self
}
/// Build a sibling `NativeTable` with the same identity but a different
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
@@ -1946,6 +1956,14 @@ impl NativeTable {
read_consistency_interval: self.read_consistency_interval,
namespace_client: self.namespace_client.clone(),
pushdown_operations: self.pushdown_operations.clone(),
freshness: self.freshness.clone(),
}
}
/// Bump the read-freshness baseline; no-op for non-namespace tables.
fn bump_freshness(&self) {
if let Some(freshness) = &self.freshness {
freshness.bump();
}
}
@@ -2045,6 +2063,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client: stored_namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -2134,6 +2153,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -2265,6 +2285,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client: stored_namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -2424,6 +2445,8 @@ impl BaseTable for NativeTable {
}
async fn checkout_latest(&self) -> Result<()> {
// Bump before resolving "latest" so that request carries the floor.
self.bump_freshness();
self.dataset.as_latest().await?;
self.dataset.reload().await
}
@@ -2511,6 +2534,8 @@ impl BaseTable for NativeTable {
debug_assert_eq!(dataset.version().version, version);
dataset.restore().await?;
}
// Restore moves "latest", so bump before resolving it (as RemoteTable does).
self.bump_freshness();
self.dataset.as_latest().await?;
Ok(())
}
@@ -2591,7 +2616,13 @@ impl BaseTable for NativeTable {
output.plan
};
let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
let insert_exec = Arc::new(InsertExec::new_with_tracker(
ds_wrapper.clone(),
ds,
plan,
lance_params,
output.tracker.clone(),
));
let tracker_for_tasks = output.tracker.clone();
if let Some(ref t) = tracker_for_tasks {
@@ -2624,6 +2655,7 @@ impl BaseTable for NativeTable {
}
let version = ds_wrapper.get().await?.manifest().version;
self.bump_freshness();
Ok(AddResult { version })
}
@@ -2674,7 +2706,9 @@ impl BaseTable for NativeTable {
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
// Delegate to the submodule implementation
update::execute_update(self, update).await
let result = update::execute_update(self, update).await?;
self.bump_freshness();
Ok(result)
}
async fn create_plan(
@@ -2706,7 +2740,9 @@ impl BaseTable for NativeTable {
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
merge::execute_merge_insert(self, params, new_data).await
let result = merge::execute_merge_insert(self, params, new_data).await?;
self.bump_freshness();
Ok(result)
}
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
@@ -2727,7 +2763,9 @@ impl BaseTable for NativeTable {
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
delete::execute_delete(self, predicate).await
let result = delete::execute_delete(self, predicate).await?;
self.bump_freshness();
Ok(result)
}
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
@@ -2746,22 +2784,30 @@ impl BaseTable for NativeTable {
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
) -> Result<AddColumnsResult> {
schema_evolution::execute_add_columns(self, transforms, read_columns).await
let result = schema_evolution::execute_add_columns(self, transforms, read_columns).await?;
self.bump_freshness();
Ok(result)
}
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
schema_evolution::execute_alter_columns(self, alterations).await
let result = schema_evolution::execute_alter_columns(self, alterations).await?;
self.bump_freshness();
Ok(result)
}
async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
schema_evolution::execute_update_field_metadata(self, updates).await
let result = schema_evolution::execute_update_field_metadata(self, updates).await?;
self.bump_freshness();
Ok(result)
}
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
schema_evolution::execute_drop_columns(self, columns).await
let result = schema_evolution::execute_drop_columns(self, columns).await?;
self.bump_freshness();
Ok(result)
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {

View File

@@ -4,6 +4,7 @@
//! DataFusion ExecutionPlan for inserting data into LanceDB tables.
use std::any::Any;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, LazyLock, Mutex};
use arrow_array::{RecordBatch, UInt64Array};
@@ -20,11 +21,12 @@ use datafusion_physical_plan::{
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams};
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams, WriteProgressFn};
use lance::io::exec::utils::InstrumentedRecordBatchStreamAdapter;
use lance_table::format::Fragment;
use crate::table::dataset::DatasetConsistencyWrapper;
use crate::table::write_progress::WriteProgressTracker;
pub(crate) static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(ArrowSchema::new(vec![Field::new(
@@ -81,6 +83,7 @@ pub struct InsertExec {
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
tracker: Option<Arc<WriteProgressTracker>>,
properties: Arc<PlanProperties>,
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
metrics: ExecutionPlanMetricsSet,
@@ -92,6 +95,16 @@ impl InsertExec {
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
) -> Self {
Self::new_with_tracker(ds_wrapper, dataset, input, write_params, None)
}
pub(crate) fn new_with_tracker(
ds_wrapper: DatasetConsistencyWrapper,
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
tracker: Option<Arc<WriteProgressTracker>>,
) -> Self {
let schema = COUNT_SCHEMA.clone();
let num_partitions = input.output_partitioning().partition_count();
@@ -107,6 +120,7 @@ impl InsertExec {
dataset,
input,
write_params,
tracker,
properties: Arc::new(properties),
partial_transactions: Arc::new(Mutex::new(Vec::with_capacity(num_partitions))),
metrics: ExecutionPlanMetricsSet::new(),
@@ -161,11 +175,12 @@ impl ExecutionPlan for InsertExec {
"InsertExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
Ok(Arc::new(Self::new_with_tracker(
self.ds_wrapper.clone(),
self.dataset.clone(),
children[0].clone(),
self.write_params.clone(),
self.tracker.clone(),
)))
}
@@ -176,10 +191,11 @@ impl ExecutionPlan for InsertExec {
) -> DataFusionResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let dataset = self.dataset.clone();
let write_params = self.write_params.clone();
let mut write_params = self.write_params.clone();
let partial_transactions = self.partial_transactions.clone();
let total_partitions = self.input.output_partitioning().partition_count();
let ds_wrapper = self.ds_wrapper.clone();
let tracker = self.tracker.clone();
let output_bytes = MetricBuilder::new(&self.metrics).output_bytes(partition);
let input_schema = input_stream.schema();
@@ -195,6 +211,20 @@ impl ExecutionPlan for InsertExec {
));
let stream = futures::stream::once(async move {
if let Some(tracker) = tracker
&& write_params.write_progress.is_none()
{
let last_bytes = Arc::new(AtomicU64::new(0));
write_params.write_progress = Some(WriteProgressFn::new(move |stats| {
let previous = last_bytes.swap(stats.bytes_written, Ordering::Relaxed);
if stats.bytes_written > previous {
let delta =
usize::try_from(stats.bytes_written - previous).unwrap_or(usize::MAX);
tracker.record_bytes(delta);
}
}));
}
let transaction = InsertBuilder::new(dataset.clone())
.with_params(&write_params)
.execute_uncommitted_stream(input_stream)

View File

@@ -518,6 +518,10 @@ mod tests {
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::from_millis(200)));
// Freeze `cached_at` on the mock clock so a slow external write below can't
// expire the TTL before the explicit advance_by() does (flake on loaded CI).
clock::pin();
// Populate the cache
let v1 = wrapper.get().await.unwrap().version().version;
assert_eq!(v1, 1);

View File

@@ -44,17 +44,35 @@ pub async fn execute_query(
// QueryTable pushdown runs the query server-side, but only on the main
// branch: the namespace request carries no branch yet, so a branch handle
// must fall through to local execution.
if table
.pushdown_operations
.contains(&NamespaceClientPushdownOperation::QueryTable)
if can_execute_namespace_query(table, query)
&& let Some(ref namespace_client) = table.namespace_client
&& table.dataset.current_branch().is_none()
{
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
}
execute_generic_query(table, query, options).await
}
fn can_execute_namespace_query(table: &NativeTable, query: &AnyQuery) -> bool {
table
.pushdown_operations
.contains(&NamespaceClientPushdownOperation::QueryTable)
&& table.namespace_client.is_some()
&& table.dataset.current_branch().is_none()
&& !requires_local_namespace_execution(query)
}
fn requires_local_namespace_execution(query: &AnyQuery) -> bool {
// The namespace QueryTable request has no approx_mode field yet, so
// pushing this query down would silently ignore the user's setting.
matches!(
query,
AnyQuery::VectorQuery(VectorQueryRequest {
approx_mode: Some(_),
..
})
)
}
pub async fn analyze_query_plan(
table: &NativeTable,
query: &AnyQuery,
@@ -167,6 +185,10 @@ pub async fn create_plan(
scanner.nearest(&column, query_vector.as_ref(), top_k)?;
}
if let Some(approx_mode) = query.approx_mode {
scanner.approx_mode(approx_mode.into());
}
scanner.minimum_nprobes(query.minimum_nprobes);
if let Some(maximum_nprobes) = query.maximum_nprobes {
scanner.maximum_nprobes(maximum_nprobes);
@@ -587,12 +609,20 @@ async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result<DatasetRecordBa
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use arrow_array::Float32Array;
use arrow_array::{ArrayRef, FixedSizeListArray, Float32Array};
use futures::TryStreamExt;
use std::sync::Arc;
use lance_arrow::FixedSizeListArrayExt;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use super::*;
use crate::query::QueryExecutionOptions;
use crate::query::{QueryExecutionOptions, QueryRequest};
fn fixed_size_list_array(values: Vec<f32>, dimension: i32) -> FixedSizeListArray {
FixedSizeListArray::try_new_from_values(Float32Array::from(values), dimension).unwrap()
}
#[test]
fn test_convert_to_namespace_query_vector() {
@@ -715,6 +745,80 @@ mod tests {
assert_eq!(count, 2); // 4 and 5
}
#[derive(Debug, Default)]
struct CountingNamespaceClient {
query_table_calls: AtomicUsize,
}
#[async_trait::async_trait]
impl LanceNamespace for CountingNamespaceClient {
fn namespace_id(&self) -> String {
"counting".to_string()
}
async fn query_table(&self, _request: NsQueryTableRequest) -> lance::Result<bytes::Bytes> {
self.query_table_calls.fetch_add(1, Ordering::SeqCst);
panic!("approx_mode queries must not be pushed down to namespace query_table");
}
}
#[tokio::test]
async fn test_execute_query_approx_mode_with_namespace_pushdown_runs_locally() {
use crate::connect;
use crate::table::query::execute_query;
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
let conn = connect("memory://").execute().await.unwrap();
let vectors = Arc::new(fixed_size_list_array(
vec![0.0, 0.0, 10.0, 10.0, 20.0, 20.0],
2,
));
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("vector", vectors.data_type().clone(), false),
]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3])), vectors],
)
.unwrap();
let table = conn
.create_table("test_approx_mode_namespace_fallback", batch)
.execute()
.await
.unwrap();
let namespace_client = Arc::new(CountingNamespaceClient::default());
let mut native_table = table.as_native().unwrap().clone();
native_table.namespace_client = Some(namespace_client.clone());
native_table
.pushdown_operations
.insert(NamespaceClientPushdownOperation::QueryTable);
let query_vector = Arc::new(Float32Array::from(vec![0.0, 0.0]));
let query = AnyQuery::VectorQuery(VectorQueryRequest {
base: QueryRequest {
limit: Some(1),
..Default::default()
},
column: Some("vector".to_string()),
query_vector: vec![query_vector as ArrayRef],
approx_mode: Some(crate::ApproxMode::Accurate),
..Default::default()
});
let stream = execute_query(&native_table, &query, QueryExecutionOptions::default())
.await
.unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
let count: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(count, 1);
assert_eq!(namespace_client.query_table_calls.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn test_create_plan_multivector_structure() {
use arrow_array::{Float32Array, RecordBatch};
@@ -779,4 +883,97 @@ mod tests {
"Plan should add query_index column"
);
}
#[tokio::test]
async fn test_create_plan_applies_approx_mode_to_ann_query() {
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use datafusion_physical_plan::ExecutionPlan;
use lance::io::exec::{ANNIvfPartitionExec, ANNIvfSubIndexExec};
use lance_index::vector::ApproxMode;
use crate::connect;
use crate::index::{Index, vector::IvfRqIndexBuilder};
use crate::table::query::create_plan;
fn find_ann_approx_mode(plan: &dyn ExecutionPlan) -> Option<ApproxMode> {
if let Some(ann) = plan.as_any().downcast_ref::<ANNIvfSubIndexExec>() {
return Some(ann.query().approx_mode);
}
if let Some(ann) = plan.as_any().downcast_ref::<ANNIvfPartitionExec>() {
return Some(ann.query.approx_mode);
}
plan.children()
.into_iter()
.find_map(|child| find_ann_approx_mode(child.as_ref()))
}
let conn = connect("memory://").execute().await.unwrap();
let dimension = 8;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dimension,
),
false,
),
]));
let vectors = Arc::new(fixed_size_list_array(
(0..512 * dimension)
.map(|value| value as f32 / dimension as f32)
.collect(),
dimension,
));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(arrow_array::Int32Array::from_iter_values(0..512)),
vectors,
],
)
.unwrap();
let table = conn
.create_table("test_approx_mode_plan", vec![batch])
.execute()
.await
.unwrap();
table
.create_index(
&["vector"],
Index::IvfRq(
IvfRqIndexBuilder::default()
.num_partitions(1)
.sample_rate(1)
.max_iterations(1)
.num_bits(1),
),
)
.execute()
.await
.unwrap();
let native_table = table.as_native().unwrap();
let query_vector = Arc::new(Float32Array::from(vec![0.0; dimension as usize]));
let query = AnyQuery::VectorQuery(VectorQueryRequest {
column: Some("vector".to_string()),
query_vector: vec![query_vector as ArrayRef],
base: QueryRequest {
limit: Some(1),
..Default::default()
},
approx_mode: Some(crate::ApproxMode::Accurate),
..Default::default()
});
let plan = create_plan(native_table, &query, QueryExecutionOptions::default())
.await
.unwrap();
assert_eq!(
find_ann_approx_mode(plan.as_ref()),
Some(ApproxMode::Accurate)
);
}
}

View File

@@ -142,11 +142,21 @@ impl WriteProgressTracker {
cb(&progress);
}
/// Record wire bytes from the insert layer (e.g. IPC-encoded bytes for
/// remote writes). When wire bytes are recorded, they take precedence over
/// the in-memory Arrow bytes tracked by [`record_batch`].
/// Record wire bytes from the insert layer.
///
/// These bytes may be IPC-encoded bytes for remote writes or bytes handed
/// to Lance's local writer. When wire bytes are recorded, they take
/// precedence over the in-memory Arrow bytes tracked by [`record_batch`].
pub fn record_bytes(&self, bytes: usize) {
self.wire_bytes.fetch_add(bytes, Ordering::Relaxed);
let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
let guard = self
.rows_and_bytes
.lock()
.unwrap_or_else(|e| e.into_inner());
let progress = self.snapshot(guard.0, guard.1, false);
drop(guard);
cb(&progress);
}
/// Emit the final progress callback indicating the write is complete.
@@ -169,8 +179,6 @@ impl WriteProgressTracker {
let wire = self.wire_bytes.load(Ordering::Relaxed);
// Prefer wire bytes (actual I/O size) when the insert layer is
// tracking them; fall back to in-memory Arrow size otherwise.
// TODO: for local writes, track actual bytes written by Lance
// instead of using in-memory Arrow size as a proxy.
let output_bytes = if wire > 0 { wire } else { in_memory_bytes };
WriteProgress {
elapsed: self.start.elapsed(),
@@ -383,6 +391,54 @@ mod tests {
}
}
#[tokio::test]
async fn test_progress_uses_lance_write_bytes_for_local_tables() {
let dir = tempfile::tempdir().unwrap();
let db = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let table = db
.create_table("local_write_bytes", batch)
.execute()
.await
.unwrap();
let new_data = record_batch!(("id", Int32, [4, 5, 6])).unwrap();
let in_memory_bytes = new_data.get_array_memory_size();
let final_bytes = Arc::new(AtomicUsize::new(0));
let seen_non_memory_bytes = Arc::new(std::sync::atomic::AtomicBool::new(false));
let final_bytes_cb = final_bytes.clone();
let seen_non_memory_bytes_cb = seen_non_memory_bytes.clone();
table
.add(new_data)
.write_parallelism(1)
.progress(move |p| {
if p.output_bytes() > 0 && p.output_bytes() != in_memory_bytes {
seen_non_memory_bytes_cb.store(true, Ordering::SeqCst);
}
if p.done() {
final_bytes_cb.store(p.output_bytes(), Ordering::SeqCst);
}
})
.execute()
.await
.unwrap();
assert!(
seen_non_memory_bytes.load(Ordering::SeqCst),
"progress should report Lance writer bytes, not only Arrow memory bytes"
);
assert_ne!(
final_bytes.load(Ordering::SeqCst),
in_memory_bytes,
"final progress bytes should come from Lance write stats"
);
}
#[test]
fn test_record_batch_recovers_from_poisoned_callback_lock() {
use super::{ProgressCallback, WriteProgressTracker};

View File

@@ -329,6 +329,15 @@ pub mod clock {
});
}
/// Start mock time at the current instant if not already pinned.
pub fn pin() {
MOCK_NOW.with(|mock| {
if mock.get().is_none() {
mock.set(Some(Instant::now()));
}
});
}
#[allow(dead_code)]
pub fn clear_mock() {
MOCK_NOW.with(|mock| mock.set(None));