Compare commits

..

6 Commits

Author SHA1 Message Date
Ghxst ☠️
394bb34fa2 fix(rust): report local write progress bytes from Lance (#3422)
Fixes #3360.

This updates native table writes so local write progress uses Lance
writer byte stats instead of Arrow in-memory batch size once write bytes
are available. The change wires the existing `WriteProgressTracker` into
`InsertExec` for native `add` writes, installs a Lance `WriteProgressFn`
only when no lower-level callback is already configured, and keeps the
existing public `InsertExec::new` signature unchanged.

Validation:
- `cargo test -p lancedb --features remote
table::write_progress::tests::test_progress_uses_lance_write_bytes_for_local_tables
-- --nocapture` passed: 1 passed, 0 failed.
- `cargo test -p lancedb --features remote table::write_progress::tests
-- --nocapture` passed: 7 passed, 0 failed.
- `cargo check --quiet --features remote --tests --examples` passed.
- `cargo fmt --all --check` passed.
- `git diff --check` passed.
- `git diff | gitleaks stdin --no-banner --redact --timeout 30` passed:
no leaks found.

I did not run the full `cargo test --quiet --features remote --tests`
suite.

Co-authored-by: Ghxst <200635707+GHX5T-SOL@users.noreply.github.com>
2026-06-17 12:05:59 -07:00
Armaan Sandhu
b2ae763254 fix(python): raise clear TypeError for bare List/Tuple in pydantic schema conversion (#3511)
Closes #3502

## Problem

A bare, unparameterised `typing.List` / `typing.Tuple` field crashes
`to_arrow_schema` with an opaque `AttributeError: __args__`:

```python
from typing import Tuple
from lancedb.pydantic import LanceModel

class Doc(LanceModel):
    items: Tuple

Doc.to_arrow_schema()  # AttributeError: __args__
```

In `_py_type_to_arrow_type`, the branch `elif getattr(py_type,
"__origin__", None) in (list, tuple)` is taken for a bare generic (its
`__origin__` is `list / tuple`), but the next line reads
`py_type.__args__[0]`, and a bare generic has no `__args__`. Other
unsupported types (e.g. `Dict[str, int]`) correctly raise a clear
`TypeError`, so this case is inconsistent.

Fix

Guard the element-type lookup with `getattr(py_type, "__args__", None)`
and raise a clear `TypeError` when it is missing, matching the existing
behavior for other unsupported types. Bare builtin list / tuple are
unaffected (their `__origin__` is `None`, so they already fall through
to the existing `TypeError`).

Testing

- Added `test_bare_generic_raises_type_error` covering both `List` and
`Tuple`.
- ruff format and ruff check clean.
2026-06-17 11:58:48 -07:00
Drew Gallardo
1bead6960c fix: pin mock clock in eventual consistency test (#3547)
This PR fixes a flaky test I hit on Windows test in #3528.

Looks like `test_eventual_consistency_background_refresh` was failing
with `v_cached` expected 1, got 2. There was a pr which swapped
`tokio::time::sleep(300ms)` for `clock::advance_by(300ms)`, which is
pretty much fine but the test necer pinned the clock so the first
`get()` locks the `cached_at` on wall time. Therefore, if our CI is
taking long enough the ttl expires before the value assertion in the
test.

So now we can add a `pin()` and call it first `get()`. After that we can
advance the clock manually with no problems.

Also, it's worth noting that I tried pinning in `BackgroundCache::new()`
first. That broke another test `test_reload_resets_consistency_timer`,
which uses real `tokio::time::sleep` and needs wall clock after
`clear_mock()`. So the pin stays in this test only. And this should
unblock us.

Failing instances:
-
https://github.com/lancedb/lancedb/actions/runs/27567527236/job/81495265474?pr=3528
-
https://github.com/lancedb/lancedb/actions/runs/27560366489/job/81470414928
2026-06-17 11:56:40 -07:00
Brendan Clement
0abf641733 feat: send read-freshness signal on the lance-namespace path (#3551)
### Description

`db://`-style connections that use the lance-namespace path
(`LanceNamespaceDatabase` → `NativeTable` + the lance-namespace REST
client) never sent a read-freshness signal. Against a server configured
to serve cached table metadata up to some staleness window, this allows
stale-read-after-write across handles and processes. The remote table
path already solved this (#3439). This brings the namespace path to
parity.

The namespace REST client doesn't let callers attach headers directly,
but it forwards a `DynamicContextProvider`'s `headers.*` context entries
as HTTP headers per request. So:

- A shared per-table baseline map is created before the namespace
client. I built and installed on the `ConnectBuilder` via a context
provider.
- On read operations the provider emits ·x-lancedb-min-timestamp =
max(baseline, now − read_consistency_interval)`
  (RFC3339), keyed by the operation's `object_id`.
- Each table handle bumps its baseline (monotonically) on
`checkout_latest()`, `restore()`, and every data/schema write.
`checkout_latest()` is the primary hook: consumers refresh a handle
there after writing elsewhere, then poll.

Read operations that carry the floor: `describe_table`,
`list_table_versions`, `query_table`, `list_tables`.
`list_table_versions` is what resolves "latest" for managed-versioning
tables (`get_latest_version`), so it's the op that makes
`checkout_latest()` actually observe a prior write.
`describe_table_version` is excluded (pinned to an immutable version).
This mirrors #3439 (timestamp baseline, `max(baseline, now − interval)`,
monotonic); no `min_version` and no body channel, since the namespace
path has no version-returning write responses.

### Testing

- Unit tests for `compute_min_timestamp` / `next_freshness_baseline` and
the provider (header at/after a bumped baseline; nothing for an empty
baseline + no interval; interval floor applies; non-read ops emit
nothing; `list_tables` uses only the interval floor).
- Verified end-to-end against a local server that honors the header:
reads carry `x-lancedb-min-timestamp`, writes don't, and read-your-write
holds.
2026-06-17 13:30:53 -04:00
Yang Cen
976edeb2ff feat(query): add approx mode to vector queries (#3549)
## Feature

### What is the new feature?

Adds Rust core API support for configuring vector query approximation
mode with `ApproxMode::{Fast, Normal, Accurate}`.

### Why do we need this feature?

Lance already exposes `lance_index::vector::ApproxMode` and scanner
support for controlling the speed/accuracy tradeoff for approximate
vector search. LanceDB Rust queries need to expose and pass this setting
through for local/native and remote vector searches.

### How does it work?

- Adds public `ApproxMode` in `rust/lancedb`, with lowercase serde,
`Default::Normal`, parse/display, and conversions to/from Lance's
`ApproxMode`.
- Adds `approx_mode: Option<ApproxMode>` to `VectorQueryRequest` and a
`VectorQuery::approx_mode(...)` builder.
- Applies the mode to native/local Lance scanners after `nearest(...)`
when explicitly set.
- Sends `approx_mode` in remote query JSON only when explicitly set;
default requests omit it.

## Validation

- `cargo fmt --all`
- `cargo test --quiet --features remote approx_mode`
- `cargo test --quiet --features remote
test_query_vector_default_values`
- `cargo check --quiet --features remote --tests --examples`
- `git diff --check`
2026-06-17 19:28:42 +08:00
Yang Cen
b46a44f873 feat(query): add approx mode to vector queries (#3549)
## Feature

### What is the new feature?

Adds Rust core API support for configuring vector query approximation
mode with `ApproxMode::{Fast, Normal, Accurate}`.

### Why do we need this feature?

Lance already exposes `lance_index::vector::ApproxMode` and scanner
support for controlling the speed/accuracy tradeoff for approximate
vector search. LanceDB Rust queries need to expose and pass this setting
through for local/native and remote vector searches.

### How does it work?

- Adds public `ApproxMode` in `rust/lancedb`, with lowercase serde,
`Default::Normal`, parse/display, and conversions to/from Lance's
`ApproxMode`.
- Adds `approx_mode: Option<ApproxMode>` to `VectorQueryRequest` and a
`VectorQuery::approx_mode(...)` builder.
- Applies the mode to native/local Lance scanners after `nearest(...)`
when explicitly set.
- Sends `approx_mode` in remote query JSON only when explicitly set;
default requests omit it.

## Validation

- `cargo fmt --all`
- `cargo test --quiet --features remote approx_mode`
- `cargo test --quiet --features remote
test_query_vector_default_values`
- `cargo check --quiet --features remote --tests --examples`
- `git diff --check`
2026-06-17 19:28:36 +08:00
13 changed files with 605 additions and 79 deletions

85
Cargo.lock generated
View File

@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arc-swap",
"arrow",
@@ -4810,8 +4810,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4832,7 +4832,7 @@ dependencies = [
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4846,7 +4846,7 @@ dependencies = [
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4855,8 +4855,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrayref",
"paste",
@@ -4865,8 +4865,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4904,8 +4904,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow",
"arrow-array",
@@ -4935,8 +4935,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow",
"arrow-array",
@@ -4953,8 +4953,8 @@ dependencies = [
[[package]]
name = "lance-derive"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"proc-macro2",
"quote",
@@ -4963,8 +4963,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4999,8 +4999,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -5030,8 +5030,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arc-swap",
"arrow",
@@ -5083,7 +5083,6 @@ dependencies = [
"rand_distr 0.5.1",
"rangemap",
"rayon",
"regex-syntax",
"roaring",
"serde",
"serde_json",
@@ -5096,8 +5095,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow",
"arrow-arith",
@@ -5138,8 +5137,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5154,8 +5153,8 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow",
"async-trait",
@@ -5167,8 +5166,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5208,9 +5207,9 @@ dependencies = [
[[package]]
name = "lance-namespace-reqwest-client"
version = "0.8.6"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3f0a235e3ed5f8805205649ccc7d7d0f3df23ce1294242c9265ad488d7f19d"
checksum = "0d287494559c22838ce34e51ea0fa29dc780d5be8283de5ab33e9395623000c8"
dependencies = [
"reqwest 0.12.28",
"serde",
@@ -5222,8 +5221,8 @@ dependencies = [
[[package]]
name = "lance-select"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5238,8 +5237,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow",
"arrow-array",
@@ -5278,8 +5277,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5292,8 +5291,8 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "8.0.0-beta.16"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
version = "8.0.0-beta.14"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
dependencies = [
"icu_segmenter",
"jieba-rs",

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=8.0.0-beta.16", default-features = false, "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-beta.16", default-features = false, "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-beta.16", default-features = false, "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>8.0.0-beta.16</lance-core.version>
<lance-core.version>8.0.0-beta.14</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -275,7 +275,18 @@ def _py_type_to_arrow_type(py_type: Type[Any], field: FieldInfo) -> pa.DataType:
tz = get_extras(field, "tz")
return pa.timestamp("us", tz=tz)
elif getattr(py_type, "__origin__", None) in (list, tuple):
child = py_type.__args__[0]
# A bare, unparameterised ``typing.List`` / ``typing.Tuple`` matches this
# branch (its ``__origin__`` is ``list`` / ``tuple``) but has no
# ``__args__``, so we cannot infer the element type. Raise a clear
# ``TypeError`` instead of crashing with an opaque ``AttributeError``.
args = getattr(py_type, "__args__", None)
if not args:
raise TypeError(
"Converting Pydantic type to Arrow Type: unsupported type "
f"{py_type}. Specify the element type, e.g. List[int] instead "
"of a bare List."
)
child = args[0]
return _pydantic_list_child_to_arrow(child, field)
raise TypeError(
f"Converting Pydantic type to Arrow Type: unsupported type {py_type}."

View File

@@ -188,6 +188,18 @@ def test_nested_struct_list():
assert schema == expect_schema
def test_bare_generic_raises_type_error():
# A bare, unparameterised List/Tuple has no element type to map to Arrow.
# It should raise a clear TypeError, not crash with AttributeError: __args__.
for bare in (List, Tuple):
class TestModel(pydantic.BaseModel):
items: bare
with pytest.raises(TypeError, match="unsupported type"):
pydantic_to_schema(TestModel)
def test_nested_struct_list_optional():
class SplitInfo(pydantic.BaseModel):
start_frame: int

View File

@@ -32,6 +32,7 @@ use crate::table::{BaseTable, WriteOptions};
pub mod listing;
pub mod namespace;
pub(crate) mod read_freshness;
pub trait DatabaseOptions {
fn serialize_into_map(&self, map: &mut HashMap<String, String>);

View File

@@ -4,7 +4,7 @@
//! Namespace-based database implementation that delegates table management to lance-namespace
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
@@ -29,6 +29,9 @@ use crate::database::listing::{
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::database::read_freshness::{
FreshnessBaselines, ReadFreshnessContextProvider, TableFreshness,
};
use crate::error::{Error, Result};
use crate::table::{NativeTable, map_namespace_lance_error};
use lance::dataset::WriteMode;
@@ -51,6 +54,10 @@ fn is_table_already_exists_namespace_error(err: &lance::Error) -> bool {
false
}
/// Object-id delimiter default (matches `RestNamespaceBuilder`'s); overridable
/// via the `delimiter` property.
const DEFAULT_NAMESPACE_DELIMITER: &str = "$";
/// A database implementation that uses lance-namespace for table management
pub struct LanceNamespaceDatabase {
namespace: Arc<dyn LanceNamespace>,
@@ -70,6 +77,17 @@ pub struct LanceNamespaceDatabase {
ns_properties: HashMap<String, String>,
// Options for tables created by this connection
new_table_config: NewTableConfig,
// Per-table read-freshness baselines, shared with the context provider.
freshness_baselines: FreshnessBaselines,
// Delimiter for building freshness keys; see `table_freshness`.
delimiter: String,
}
fn resolve_delimiter(ns_properties: &HashMap<String, String>) -> String {
ns_properties
.get("delimiter")
.cloned()
.unwrap_or_else(|| DEFAULT_NAMESPACE_DELIMITER.to_string())
}
impl LanceNamespaceDatabase {
@@ -82,6 +100,9 @@ impl LanceNamespaceDatabase {
session: Option<Arc<lance::session::Session>>,
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Self {
// Client is pre-built, so we can't install the freshness provider here;
// baselines are still tracked for a uniform bump path.
let delimiter = resolve_delimiter(&namespace_client_properties);
Self {
namespace: namespace_client,
storage_options,
@@ -92,6 +113,8 @@ impl LanceNamespaceDatabase {
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
new_table_config: NewTableConfig::default(),
freshness_baselines: Arc::new(Mutex::new(HashMap::new())),
delimiter,
}
}
@@ -136,10 +159,19 @@ impl LanceNamespaceDatabase {
if let Some(ref sess) = session {
builder = builder.session(sess.clone());
}
// Install the read-freshness provider before building the client.
let freshness_baselines: FreshnessBaselines = Arc::new(Mutex::new(HashMap::new()));
builder = builder.context_provider(Arc::new(ReadFreshnessContextProvider::new(
freshness_baselines.clone(),
read_consistency_interval,
)));
let namespace = builder.connect().await.map_err(|e| Error::InvalidInput {
message: format!("Failed to connect to namespace: {:?}", e),
})?;
let delimiter = resolve_delimiter(&ns_properties);
Ok(Self {
namespace,
storage_options,
@@ -150,9 +182,20 @@ impl LanceNamespaceDatabase {
ns_impl: ns_impl.to_string(),
ns_properties,
new_table_config,
freshness_baselines,
delimiter,
})
}
/// Build a table's freshness handle, keyed to match the `object_id` the
/// namespace client sends on reads (table-id parts joined by the delimiter).
fn table_freshness(&self, namespace_path: &[String], name: &str) -> TableFreshness {
let mut parts = namespace_path.to_vec();
parts.push(name.to_string());
let key = parts.join(&self.delimiter);
TableFreshness::new(self.freshness_baselines.clone(), key)
}
fn extract_storage_overrides(
&self,
request: &DbCreateTableRequest,
@@ -331,7 +374,8 @@ impl Database for LanceNamespaceDatabase {
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
.await?
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
return Ok(Arc::new(native_table));
}
@@ -462,7 +506,8 @@ impl Database for LanceNamespaceDatabase {
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
.await?
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
Ok(Arc::new(native_table))
}
@@ -478,7 +523,8 @@ impl Database for LanceNamespaceDatabase {
self.pushdown_operations.clone(),
self.session.clone(),
)
.await?;
.await?
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
Ok(Arc::new(native_table))
}

View File

@@ -0,0 +1,312 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Read-freshness signaling for the lance-namespace path.
//!
//! Against a server that serves cached table metadata up to some staleness
//! window, a handle that just wrote (or asked for the latest version via
//! `checkout_latest`) can still read a stale snapshot. To prevent that, reads
//! routed through the namespace client carry an `x-lancedb-min-timestamp`
//! header naming the oldest snapshot the caller will accept.
//!
//! This mirrors `remote::table`: a per-table baseline is bumped to "now" on
//! every write and on `checkout_latest()`, and reads send
//! `max(baseline, now - read_consistency_interval)`. Since the namespace client
//! takes no headers directly, a [`DynamicContextProvider`] injects it per request.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
/// Provider context keys prefixed with `headers.` become HTTP headers (prefix
/// stripped), so this emits the `x-lancedb-min-timestamp` header.
const MIN_TIMESTAMP_CONTEXT_KEY: &str = "headers.x-lancedb-min-timestamp";
/// Per-table freshness baselines (keyed by namespace object id), shared between
/// the provider that reads them and the table handles that bump them.
pub type FreshnessBaselines = Arc<Mutex<HashMap<String, SystemTime>>>;
/// `max(baseline, now - interval)`, or `None` when neither constraint applies.
fn compute_min_timestamp(
baseline: Option<SystemTime>,
interval: Option<Duration>,
now: SystemTime,
) -> Option<SystemTime> {
let interval_based = match interval {
None => None,
Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
};
match (interval_based, baseline) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)),
}
}
/// Advance the baseline to `now`, never backwards, so a concurrent handle's
/// write can't lower a floor another handle already set.
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
match prev {
Some(p) => p.max(now),
None => now,
}
}
/// A handle's view of the shared baseline map for a single table.
#[derive(Clone, Debug)]
pub struct TableFreshness {
baselines: FreshnessBaselines,
/// Namespace object id for this table (matches the read's `object_id`).
key: String,
}
impl TableFreshness {
pub fn new(baselines: FreshnessBaselines, key: String) -> Self {
Self { baselines, key }
}
pub fn bump(&self) {
let now = SystemTime::now();
let mut baselines = self.baselines.lock().unwrap();
let prev = baselines.get(&self.key).copied();
baselines.insert(self.key.clone(), next_freshness_baseline(prev, now));
}
}
/// Read ops that can be served stale and so carry the freshness floor.
/// `list_table_versions` resolves "latest" for managed-versioning tables, so it
/// is what makes `checkout_latest()` observe a prior write.
fn is_read_operation(operation: &str) -> bool {
matches!(
operation,
"describe_table" | "list_table_versions" | "query_table" | "list_tables"
)
}
/// Injects `x-lancedb-min-timestamp` on namespace reads, per addressed table.
#[derive(Debug)]
pub struct ReadFreshnessContextProvider {
baselines: FreshnessBaselines,
read_consistency_interval: Option<Duration>,
}
impl ReadFreshnessContextProvider {
pub fn new(baselines: FreshnessBaselines, read_consistency_interval: Option<Duration>) -> Self {
Self {
baselines,
read_consistency_interval,
}
}
}
impl DynamicContextProvider for ReadFreshnessContextProvider {
fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
if !is_read_operation(&info.operation) {
return HashMap::new();
}
let baseline = self.baselines.lock().unwrap().get(&info.object_id).copied();
match compute_min_timestamp(baseline, self.read_consistency_interval, SystemTime::now()) {
Some(ts) => {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
HashMap::from([(MIN_TIMESTAMP_CONTEXT_KEY.to_string(), dt.to_rfc3339())])
}
None => HashMap::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
/// Allowed slop when comparing a header timestamp against a locally
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
const TOLERANCE: Duration = Duration::from_secs(1);
fn parse_header_ts(headers: &HashMap<String, String>) -> SystemTime {
let value = headers
.get(MIN_TIMESTAMP_CONTEXT_KEY)
.expect("expected min-timestamp context key");
chrono::DateTime::parse_from_rfc3339(value)
.unwrap()
.with_timezone(&chrono::Utc)
.into()
}
#[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now();
let baseline = now - Duration::from_secs(60);
// No interval, no baseline -> no header.
assert_eq!(compute_min_timestamp(None, None, now), None);
// Baseline only -> baseline.
assert_eq!(
compute_min_timestamp(Some(baseline), None, now),
Some(baseline)
);
// ZERO interval, no baseline -> now (strong consistency).
assert_eq!(
compute_min_timestamp(None, Some(Duration::ZERO), now),
Some(now)
);
// Positive interval, no baseline -> now - interval.
assert_eq!(
compute_min_timestamp(None, Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both: pick the more-recent (tighter) constraint.
// baseline = now-60, now-interval = now-10. now-10 is newer.
assert_eq!(
compute_min_timestamp(Some(baseline), Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both, baseline newer: pick baseline.
let recent_baseline = now - Duration::from_secs(5);
assert_eq!(
compute_min_timestamp(Some(recent_baseline), Some(Duration::from_secs(60)), now),
Some(recent_baseline)
);
}
#[test]
fn test_next_freshness_baseline_is_monotonic() {
let now = SystemTime::now();
let earlier = now - Duration::from_secs(30);
let later = now + Duration::from_secs(30);
// No prior baseline -> now.
assert_eq!(next_freshness_baseline(None, now), now);
// Prior baseline older than now -> now.
assert_eq!(next_freshness_baseline(Some(earlier), now), now);
// Prior baseline newer than now -> keep the newer baseline.
assert_eq!(next_freshness_baseline(Some(later), now), later);
}
fn provider_with(
entries: &[(&str, SystemTime)],
interval: Option<Duration>,
) -> ReadFreshnessContextProvider {
let map: HashMap<String, SystemTime> =
entries.iter().map(|(k, v)| (k.to_string(), *v)).collect();
ReadFreshnessContextProvider::new(Arc::new(Mutex::new(map)), interval)
}
#[test]
fn test_provider_emits_header_at_or_after_bumped_baseline() {
// A baseline set "now" with no interval: every read op must carry a
// floor at or after that baseline. `list_table_versions` is the hook
// that makes managed-versioning `checkout_latest()` observe a write.
let baseline = SystemTime::now();
let provider = provider_with(&[("ns$tbl", baseline)], None);
// These ops are keyed by the table id, so they pick up the per-table
// baseline. (`list_tables` is keyed by the namespace, so it is covered
// separately by the interval-floor test.)
for op in ["describe_table", "list_table_versions", "query_table"] {
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
let sent = parse_header_ts(&ctx);
assert!(
sent >= baseline - TOLERANCE && sent <= baseline + TOLERANCE,
"operation {op} should carry a floor at the bumped baseline"
);
}
}
#[test]
fn test_provider_list_tables_uses_interval_floor_not_table_baseline() {
// `list_tables` is addressed by the namespace id, which never matches a
// per-table baseline key, so a bumped table baseline must not leak onto
// it. With no interval it sends nothing; with one it sends now-interval.
let provider = provider_with(&[("ns$tbl", SystemTime::now())], None);
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
assert!(
ctx.is_empty(),
"list_tables must not inherit a per-table baseline"
);
let interval = Duration::from_secs(30);
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(interval));
let before = SystemTime::now();
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
let after = SystemTime::now();
let sent = parse_header_ts(&ctx);
assert!(
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
"list_tables should carry the interval floor"
);
}
#[test]
fn test_provider_no_header_for_empty_baseline_and_no_interval() {
// Manual consistency (no interval) on a table that was never bumped:
// no floor, so the server may serve from cache.
let provider = provider_with(&[], None);
let ctx = provider.provide_context(&OperationInfo::new("describe_table", "ns$tbl"));
assert!(ctx.is_empty());
}
#[test]
fn test_provider_interval_floor_applies_without_baseline() {
// With a consistency interval and no baseline, the floor is now-interval.
let interval = Duration::from_secs(30);
let provider = provider_with(&[], Some(interval));
let before = SystemTime::now();
let ctx = provider.provide_context(&OperationInfo::new("query_table", "ns$tbl"));
let after = SystemTime::now();
let sent = parse_header_ts(&ctx);
assert!(
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
"expected floor at roughly now - interval"
);
}
#[test]
fn test_provider_non_read_ops_emit_nothing() {
// Even with a fresh baseline and a zero interval, a non-read operation
// (which establishes rather than consumes a baseline) sends no header.
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(Duration::ZERO));
for op in [
"create_table",
"register_table",
"drop_table",
"rename_table",
// Pinned to an immutable version, so it cannot be served stale.
"describe_table_version",
] {
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
assert!(
ctx.is_empty(),
"operation {op} must not send a freshness header"
);
}
}
#[test]
fn test_provider_uses_per_table_baseline() {
// The floor is looked up by object id, so an unrelated table's baseline
// does not leak onto another table's read.
let baseline = SystemTime::now();
let provider = provider_with(&[("ns$has_baseline", baseline)], None);
// The bumped table gets a header.
let hit =
provider.provide_context(&OperationInfo::new("describe_table", "ns$has_baseline"));
assert!(!hit.is_empty());
// A different table with no baseline (and no interval) gets nothing.
let miss = provider.provide_context(&OperationInfo::new("describe_table", "ns$other"));
assert!(miss.is_empty());
}
}

View File

@@ -43,6 +43,7 @@ use crate::connection::NamespaceClientPushdownOperation;
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::database::Database;
use crate::database::read_freshness::TableFreshness;
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
use crate::error::{Error, Result};
use crate::index::IndexStatistics;
@@ -1763,6 +1764,8 @@ pub struct NativeTable {
// Operations to push down to the namespace server.
// pub(crate) so query.rs can access the field for server-side query execution.
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
// Read-freshness baseline; `Some` only for namespace-backed tables.
freshness: Option<TableFreshness>,
}
impl std::fmt::Debug for NativeTable {
@@ -1923,6 +1926,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -1934,6 +1938,12 @@ impl NativeTable {
self
}
/// Attach the read-freshness baseline handle (namespace connections only).
pub(crate) fn with_freshness(mut self, freshness: TableFreshness) -> Self {
self.freshness = Some(freshness);
self
}
/// Build a sibling `NativeTable` with the same identity but a different
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
@@ -1946,6 +1956,14 @@ impl NativeTable {
read_consistency_interval: self.read_consistency_interval,
namespace_client: self.namespace_client.clone(),
pushdown_operations: self.pushdown_operations.clone(),
freshness: self.freshness.clone(),
}
}
/// Bump the read-freshness baseline; no-op for non-namespace tables.
fn bump_freshness(&self) {
if let Some(freshness) = &self.freshness {
freshness.bump();
}
}
@@ -2045,6 +2063,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client: stored_namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -2134,6 +2153,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -2265,6 +2285,7 @@ impl NativeTable {
read_consistency_interval,
namespace_client: stored_namespace_client,
pushdown_operations,
freshness: None,
})
}
@@ -2424,6 +2445,8 @@ impl BaseTable for NativeTable {
}
async fn checkout_latest(&self) -> Result<()> {
// Bump before resolving "latest" so that request carries the floor.
self.bump_freshness();
self.dataset.as_latest().await?;
self.dataset.reload().await
}
@@ -2511,6 +2534,8 @@ impl BaseTable for NativeTable {
debug_assert_eq!(dataset.version().version, version);
dataset.restore().await?;
}
// Restore moves "latest", so bump before resolving it (as RemoteTable does).
self.bump_freshness();
self.dataset.as_latest().await?;
Ok(())
}
@@ -2591,7 +2616,13 @@ impl BaseTable for NativeTable {
output.plan
};
let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
let insert_exec = Arc::new(InsertExec::new_with_tracker(
ds_wrapper.clone(),
ds,
plan,
lance_params,
output.tracker.clone(),
));
let tracker_for_tasks = output.tracker.clone();
if let Some(ref t) = tracker_for_tasks {
@@ -2624,6 +2655,7 @@ impl BaseTable for NativeTable {
}
let version = ds_wrapper.get().await?.manifest().version;
self.bump_freshness();
Ok(AddResult { version })
}
@@ -2674,7 +2706,9 @@ impl BaseTable for NativeTable {
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
// Delegate to the submodule implementation
update::execute_update(self, update).await
let result = update::execute_update(self, update).await?;
self.bump_freshness();
Ok(result)
}
async fn create_plan(
@@ -2706,7 +2740,9 @@ impl BaseTable for NativeTable {
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
merge::execute_merge_insert(self, params, new_data).await
let result = merge::execute_merge_insert(self, params, new_data).await?;
self.bump_freshness();
Ok(result)
}
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
@@ -2727,7 +2763,9 @@ impl BaseTable for NativeTable {
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
delete::execute_delete(self, predicate).await
let result = delete::execute_delete(self, predicate).await?;
self.bump_freshness();
Ok(result)
}
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
@@ -2746,22 +2784,30 @@ impl BaseTable for NativeTable {
transforms: NewColumnTransform,
read_columns: Option<Vec<String>>,
) -> Result<AddColumnsResult> {
schema_evolution::execute_add_columns(self, transforms, read_columns).await
let result = schema_evolution::execute_add_columns(self, transforms, read_columns).await?;
self.bump_freshness();
Ok(result)
}
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
schema_evolution::execute_alter_columns(self, alterations).await
let result = schema_evolution::execute_alter_columns(self, alterations).await?;
self.bump_freshness();
Ok(result)
}
async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
schema_evolution::execute_update_field_metadata(self, updates).await
let result = schema_evolution::execute_update_field_metadata(self, updates).await?;
self.bump_freshness();
Ok(result)
}
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
schema_evolution::execute_drop_columns(self, columns).await
let result = schema_evolution::execute_drop_columns(self, columns).await?;
self.bump_freshness();
Ok(result)
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {

View File

@@ -4,6 +4,7 @@
//! DataFusion ExecutionPlan for inserting data into LanceDB tables.
use std::any::Any;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, LazyLock, Mutex};
use arrow_array::{RecordBatch, UInt64Array};
@@ -20,11 +21,12 @@ use datafusion_physical_plan::{
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams};
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams, WriteProgressFn};
use lance::io::exec::utils::InstrumentedRecordBatchStreamAdapter;
use lance_table::format::Fragment;
use crate::table::dataset::DatasetConsistencyWrapper;
use crate::table::write_progress::WriteProgressTracker;
pub(crate) static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(ArrowSchema::new(vec![Field::new(
@@ -81,6 +83,7 @@ pub struct InsertExec {
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
tracker: Option<Arc<WriteProgressTracker>>,
properties: Arc<PlanProperties>,
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
metrics: ExecutionPlanMetricsSet,
@@ -92,6 +95,16 @@ impl InsertExec {
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
) -> Self {
Self::new_with_tracker(ds_wrapper, dataset, input, write_params, None)
}
pub(crate) fn new_with_tracker(
ds_wrapper: DatasetConsistencyWrapper,
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
tracker: Option<Arc<WriteProgressTracker>>,
) -> Self {
let schema = COUNT_SCHEMA.clone();
let num_partitions = input.output_partitioning().partition_count();
@@ -107,6 +120,7 @@ impl InsertExec {
dataset,
input,
write_params,
tracker,
properties: Arc::new(properties),
partial_transactions: Arc::new(Mutex::new(Vec::with_capacity(num_partitions))),
metrics: ExecutionPlanMetricsSet::new(),
@@ -161,11 +175,12 @@ impl ExecutionPlan for InsertExec {
"InsertExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
Ok(Arc::new(Self::new_with_tracker(
self.ds_wrapper.clone(),
self.dataset.clone(),
children[0].clone(),
self.write_params.clone(),
self.tracker.clone(),
)))
}
@@ -176,10 +191,11 @@ impl ExecutionPlan for InsertExec {
) -> DataFusionResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let dataset = self.dataset.clone();
let write_params = self.write_params.clone();
let mut write_params = self.write_params.clone();
let partial_transactions = self.partial_transactions.clone();
let total_partitions = self.input.output_partitioning().partition_count();
let ds_wrapper = self.ds_wrapper.clone();
let tracker = self.tracker.clone();
let output_bytes = MetricBuilder::new(&self.metrics).output_bytes(partition);
let input_schema = input_stream.schema();
@@ -195,6 +211,20 @@ impl ExecutionPlan for InsertExec {
));
let stream = futures::stream::once(async move {
if let Some(tracker) = tracker
&& write_params.write_progress.is_none()
{
let last_bytes = Arc::new(AtomicU64::new(0));
write_params.write_progress = Some(WriteProgressFn::new(move |stats| {
let previous = last_bytes.swap(stats.bytes_written, Ordering::Relaxed);
if stats.bytes_written > previous {
let delta =
usize::try_from(stats.bytes_written - previous).unwrap_or(usize::MAX);
tracker.record_bytes(delta);
}
}));
}
let transaction = InsertBuilder::new(dataset.clone())
.with_params(&write_params)
.execute_uncommitted_stream(input_stream)

View File

@@ -518,6 +518,10 @@ mod tests {
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::from_millis(200)));
// Freeze `cached_at` on the mock clock so a slow external write below can't
// expire the TTL before the explicit advance_by() does (flake on loaded CI).
clock::pin();
// Populate the cache
let v1 = wrapper.get().await.unwrap().version().version;
assert_eq!(v1, 1);

View File

@@ -142,11 +142,21 @@ impl WriteProgressTracker {
cb(&progress);
}
/// Record wire bytes from the insert layer (e.g. IPC-encoded bytes for
/// remote writes). When wire bytes are recorded, they take precedence over
/// the in-memory Arrow bytes tracked by [`record_batch`].
/// Record wire bytes from the insert layer.
///
/// These bytes may be IPC-encoded bytes for remote writes or bytes handed
/// to Lance's local writer. When wire bytes are recorded, they take
/// precedence over the in-memory Arrow bytes tracked by [`record_batch`].
pub fn record_bytes(&self, bytes: usize) {
self.wire_bytes.fetch_add(bytes, Ordering::Relaxed);
let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
let guard = self
.rows_and_bytes
.lock()
.unwrap_or_else(|e| e.into_inner());
let progress = self.snapshot(guard.0, guard.1, false);
drop(guard);
cb(&progress);
}
/// Emit the final progress callback indicating the write is complete.
@@ -169,8 +179,6 @@ impl WriteProgressTracker {
let wire = self.wire_bytes.load(Ordering::Relaxed);
// Prefer wire bytes (actual I/O size) when the insert layer is
// tracking them; fall back to in-memory Arrow size otherwise.
// TODO: for local writes, track actual bytes written by Lance
// instead of using in-memory Arrow size as a proxy.
let output_bytes = if wire > 0 { wire } else { in_memory_bytes };
WriteProgress {
elapsed: self.start.elapsed(),
@@ -383,6 +391,54 @@ mod tests {
}
}
#[tokio::test]
async fn test_progress_uses_lance_write_bytes_for_local_tables() {
let dir = tempfile::tempdir().unwrap();
let db = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let table = db
.create_table("local_write_bytes", batch)
.execute()
.await
.unwrap();
let new_data = record_batch!(("id", Int32, [4, 5, 6])).unwrap();
let in_memory_bytes = new_data.get_array_memory_size();
let final_bytes = Arc::new(AtomicUsize::new(0));
let seen_non_memory_bytes = Arc::new(std::sync::atomic::AtomicBool::new(false));
let final_bytes_cb = final_bytes.clone();
let seen_non_memory_bytes_cb = seen_non_memory_bytes.clone();
table
.add(new_data)
.write_parallelism(1)
.progress(move |p| {
if p.output_bytes() > 0 && p.output_bytes() != in_memory_bytes {
seen_non_memory_bytes_cb.store(true, Ordering::SeqCst);
}
if p.done() {
final_bytes_cb.store(p.output_bytes(), Ordering::SeqCst);
}
})
.execute()
.await
.unwrap();
assert!(
seen_non_memory_bytes.load(Ordering::SeqCst),
"progress should report Lance writer bytes, not only Arrow memory bytes"
);
assert_ne!(
final_bytes.load(Ordering::SeqCst),
in_memory_bytes,
"final progress bytes should come from Lance write stats"
);
}
#[test]
fn test_record_batch_recovers_from_poisoned_callback_lock() {
use super::{ProgressCallback, WriteProgressTracker};

View File

@@ -329,6 +329,15 @@ pub mod clock {
});
}
/// Start mock time at the current instant if not already pinned.
pub fn pin() {
MOCK_NOW.with(|mock| {
if mock.get().is_none() {
mock.set(Some(Instant::now()));
}
});
}
#[allow(dead_code)]
pub fn clear_mock() {
MOCK_NOW.with(|mock| mock.set(None));