Compare commits

..

12 Commits

Author SHA1 Message Date
Lance Release
26481a4b74 Bump version: 0.34.0-beta.1 → 0.34.0-beta.2 2026-06-23 16:21:52 +00:00
dependabot[bot]
08596f1644 chore(deps): bump the rust-minor-patch group with 2 updates (#3565)
Bumps the rust-minor-patch group with 2 updates:
[bytes](https://github.com/tokio-rs/bytes) and
[napi](https://github.com/napi-rs/napi-rs).

Updates `bytes` from 1.11.1 to 1.12.0
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/tokio-rs/bytes/releases">bytes's
releases</a>.</em></p>
<blockquote>
<h2>Bytes v1.12.0</h2>
<h1>1.12.0 (June 18th, 2026)</h1>
<h3>Added</h3>
<ul>
<li>Add <code>BytesMut::extend_from_within()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/818">#818</a>)</li>
<li>Add <code>BytesMut::try_unsplit()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/746">#746</a>)</li>
</ul>
<h3>Fixed</h3>
<ul>
<li>Fix panic in <code>get_int</code> if <code>nbytes</code> is zero (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/806">#806</a>)</li>
</ul>
<h3>Changed</h3>
<ul>
<li>Pass vtable data by value (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/826">#826</a>)</li>
<li>Exclude development scripts from published package (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/810">#810</a>)</li>
</ul>
<h3>Documented</h3>
<ul>
<li>Document that <code>BytesMut::{reserve,try_reserve}</code> doesn't
preserve unused capacity (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/808">#808</a>)</li>
</ul>
</blockquote>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/tokio-rs/bytes/blob/master/CHANGELOG.md">bytes's
changelog</a>.</em></p>
<blockquote>
<h1>1.12.0 (June 18th, 2026)</h1>
<h3>Added</h3>
<ul>
<li>Add <code>BytesMut::extend_from_within()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/818">#818</a>)</li>
<li>Add <code>BytesMut::try_unsplit()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/746">#746</a>)</li>
</ul>
<h3>Fixed</h3>
<ul>
<li>Fix panic in <code>get_int</code> if <code>nbytes</code> is zero (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/806">#806</a>)</li>
</ul>
<h3>Changed</h3>
<ul>
<li>Pass vtable data by value (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/826">#826</a>)</li>
<li>Exclude development scripts from published package (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/810">#810</a>)</li>
</ul>
<h3>Documented</h3>
<ul>
<li>Document that <code>BytesMut::{reserve,try_reserve}</code> doesn't
preserve unused capacity (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/808">#808</a>)</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="91402cee60"><code>91402ce</code></a>
Release bytes v1.12.0 (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/831">#831</a>)</li>
<li><a
href="2256e6dc3e"><code>2256e6d</code></a>
chore: add safety comments on unsafe blocks (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/827">#827</a>)</li>
<li><a
href="245adff079"><code>245adff</code></a>
Pass vtable data by value (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/826">#826</a>)</li>
<li><a
href="00cc5ff2bd"><code>00cc5ff</code></a>
Implement <code>BytesMut::extend_from_within</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/818">#818</a>)</li>
<li><a
href="5b79d316c9"><code>5b79d31</code></a>
Merge tag 'v1.11.1'</li>
<li><a
href="804ee6d039"><code>804ee6d</code></a>
Make try_unsplit method public (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/746">#746</a>)</li>
<li><a
href="fd426ca084"><code>fd426ca</code></a>
Exclude development scripts from published package (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/810">#810</a>)</li>
<li><a
href="b4ed70daee"><code>b4ed70d</code></a>
Add test for copy_to_bytes() -&gt; BytesMut avoiding clone (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/809">#809</a>)</li>
<li><a
href="94e42915a9"><code>94e4291</code></a>
Document that <code>BytesMut::{reserve,try_reserve}</code> doesn't
preserve unused capac...</li>
<li><a
href="acd1e0ffb8"><code>acd1e0f</code></a>
Fix <code>get_int</code> if <code>nbytes</code> is zero (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/806">#806</a>)</li>
<li>See full diff in <a
href="https://github.com/tokio-rs/bytes/compare/v1.11.1...v1.12.0">compare
view</a></li>
</ul>
</details>
<br />

Updates `napi` from 3.9.1 to 3.9.3
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/napi-rs/napi-rs/releases">napi's
releases</a>.</em></p>
<blockquote>
<h2>napi-v3.9.3</h2>
<h3>Fixed</h3>
<ul>
<li><em>(napi)</em> sync referred flag when creating a weak
ThreadsafeFunction (<a
href="https://redirect.github.com/napi-rs/napi-rs/pull/3337">#3337</a>)</li>
</ul>
<h3>Other</h3>
<ul>
<li><em>(napi)</em> outline non-generic core of
ThreadsafeFunction::create (<a
href="https://redirect.github.com/napi-rs/napi-rs/pull/3334">#3334</a>)</li>
</ul>
<h2>napi-v3.9.2</h2>
<h3>Fixed</h3>
<ul>
<li><em>(napi)</em> ReadableStream Reader loses chunks and aborts on
errored streams (<a
href="https://redirect.github.com/napi-rs/napi-rs/pull/3328">#3328</a>)</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="ee58383da4"><code>ee58383</code></a>
chore(napi): release v3.9.3 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3335">#3335</a>)</li>
<li><a
href="c78727667b"><code>c787276</code></a>
fix(napi): sync referred flag when creating a weak ThreadsafeFunction
(<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3337">#3337</a>)</li>
<li><a
href="d4276ca315"><code>d4276ca</code></a>
chore(deps): update dependency oxc-parser to ^0.137.0 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3336">#3336</a>)</li>
<li><a
href="a0b1831ce5"><code>a0b1831</code></a>
perf(napi): outline non-generic core of ThreadsafeFunction::create (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3334">#3334</a>)</li>
<li><a
href="3759d7b485"><code>3759d7b</code></a>
chore(deps): update rust-lang/crates-io-auth-action action to v1.0.5 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3333">#3333</a>)</li>
<li><a
href="dd41eeb921"><code>dd41eeb</code></a>
build(deps): bump protobufjs from 7.6.2 to 7.6.4 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3332">#3332</a>)</li>
<li><a
href="cdd48b3873"><code>cdd48b3</code></a>
chore(deps): update dependency oxc-parser to ^0.136.0 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3314">#3314</a>)</li>
<li><a
href="e98762de2c"><code>e98762d</code></a>
chore(deps): update yarn monorepo to v4.17.0 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3330">#3330</a>)</li>
<li><a
href="529a78d15c"><code>529a78d</code></a>
chore(napi): release v3.9.2 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3329">#3329</a>)</li>
<li><a
href="88f4b97030"><code>88f4b97</code></a>
fix(napi): ReadableStream Reader loses chunks and aborts on errored
streams (...</li>
<li>Additional commits viewable in <a
href="https://github.com/napi-rs/napi-rs/compare/napi-v3.9.1...napi-v3.9.3">compare
view</a></li>
</ul>
</details>
<br />


Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore <dependency name> major version` will close this
group update PR and stop Dependabot creating any more for the specific
dependency's major version (unless you unignore this specific
dependency's major version or upgrade to it yourself)
- `@dependabot ignore <dependency name> minor version` will close this
group update PR and stop Dependabot creating any more for the specific
dependency's minor version (unless you unignore this specific
dependency's minor version or upgrade to it yourself)
- `@dependabot ignore <dependency name>` will close this group update PR
and stop Dependabot creating any more for the specific dependency
(unless you unignore this specific dependency or upgrade to it yourself)
- `@dependabot unignore <dependency name>` will remove all of the ignore
conditions of the specified dependency
- `@dependabot unignore <dependency name> <ignore condition>` will
remove the ignore condition of the specified dependency and ignore
conditions


</details>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-06-23 09:21:05 -07:00
LanceDB Robot
f16da19b78 chore: update lance dependency to v9.0.0-beta.2 (#3569)
Updates LanceDB's Lance dependencies to v9.0.0-beta.2 across the Rust
workspace and Java lance-core dependency.\n\nNo compatibility fixes were
required; clippy and formatting pass after installing the missing
toolchain components on the runner. Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v9.0.0-beta.2
2026-06-23 09:20:13 -07:00
Drew Gallardo
41ac32a344 feat(rust): add blob read and materialization APIs (#3562)
This PR is for the Read path against blob v2. #3528 handles declare +
write, and this this adds materialization on local tables.

- blob_columns()
- fetch_blobs(column, row_ids) → bytes
- fetch_blob_files(column, row_ids) → lazy handles
- Pass _rowid from query().with_row_id(). Remote returns NotSupported.
(for now)

### Use cases

search, grab row ids, materialize images:

```rust
let row_ids = /* _rowid from hits */;
let images = table.fetch_blobs("image", &row_ids).await?;
```

Large blobs: open handles, read only what you need:

```rust
let handles = table.fetch_blob_files("image", &row_ids).await?;
let bytes = handles[0].as_ref().unwrap().read().await?;
```

Filter then batch fetch: collect ids from a filter, one call.
Multiple blob columns: image and thumbnail independently.
Row ids from before compact: still resolve.

### Alignment note
Lance `read_blobs` drops null rows. We descriptor-take first, read
non-null ids, re-expand to match input order. Null and zero-length blobs
come back null/None. Bytes path sets `preserve_order(true)`. So I added:

```
TODO(lance): expose selection_index or an aligned execute so we can drop the pre-read.
```

### Tests
`cargo test -p lancedb --test blob_integration`
- 30 tests covering nulls, reorder, dups, cross-fragment bytes + files,
compact, delete, legacy v1 errors.

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 06:58:26 -07:00
Drew Gallardo
ba1ef34481 feat(rust): add blob v2 schema declaration and write path (#3528)
First Rust PR for #3231. Lance already stores blob v2. This adds the
LanceDB write side.

```rust
let schema = Schema::new(vec![
    Field::new("id", DataType::Int64, false),
    lancedb::blob("image", true),
]);

let table = db.create_table("photos", schema).execute().await?;

table.add(batch_with_large_binary_image_column).execute().await?;
```

Read/materialize and Python are follow-up PRs.

### Testing

- cargo test -p lancedb --test blob_integration
- cargo test -p lancedb blob:: datafusion::blob_coerce
- cargo test -p lancedb (591 passed)
- cargo clippy --features remote --tests

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-19 12:33:15 -07:00
Will Jones
85d870b397 fix: parse RFC 3339 created_at and improve IndexConfig repr (#3558)
The server now serializes an index's `created_at` as an RFC 3339 string
(e.g. `"2026-06-18T21:37:36.637Z"`), but the client deserializer only
accepted a unix timestamp in milliseconds. This caused `list_indices` to
fail with:

```
Failed to parse list_indices response: invalid type: string "2026-06-18T21:37:36.637Z", expected a unix timestamp in milliseconds
```

This PR replaces the fixed millisecond deserializer with a custom one
that accepts both an RFC 3339 string (current server) and a
unix-millisecond integer (legacy deployments), so the client works
against any server version.

It also improves the `IndexConfig` repr in the Python bindings.
Previously it printed only three fields (`Index(FTS, columns=["text"],
name="text_idx")`), hiding the metadata that `list_indices` returns. It
now renders every populated field, omitting any that are `None`. Each
value is valid Python — integer counts use `_` thousands separators and
`created_at` uses the `datetime` repr — so values round-trip. The real
repr is a single line; it's wrapped here for readability:

```python
>>> table.list_indices()
[IndexConfig(
    name="text_idx",
    index_type="FTS",
    columns=["text"],
    index_uuid="aefd3e00-2f95-4bdc-92ac-06de84442bf1",
    type_url="/lance.table.InvertedIndexDetails",
    created_at=datetime.datetime(2026, 6, 18, 21, 37, 36, 637000, tzinfo=datetime.timezone.utc),
    num_indexed_rows=2,
    size_bytes=3_669,
    num_segments=1,
    index_version=1,
    index_details={
        'lance_tokenizer': None,
        'base_tokenizer': 'simple',
        'language': 'English',
        'with_position': False,
        'max_token_length': 40,
        'lower_case': True,
        'stem': True,
        'remove_stop_words': True,
        'custom_stop_words': None,
        'ascii_folding': True,
        'min_ngram_length': 3,
        'max_ngram_length': 3,
        'prefix_only': False,
    },
)]
```

Fixes #3556

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 10:40:56 -07:00
LanceDB Robot
c46d59d2ee chore: update lance dependency to v8.0.0-rc.1 (#3557)
Updates LanceDB Lance dependencies to Lance v8.0.0-rc.1.

This includes the Rust workspace Lance crates, Cargo.lock, and Java
lance-core version. Triggering tag:
https://github.com/lance-format/lance/releases/tag/v8.0.0-rc.1
2026-06-19 11:40:38 -05:00
Lance Release
113f187c2d Bump version: 0.31.0-beta.0 → 0.31.0-beta.1 2026-06-19 16:00:59 +00:00
Lance Release
3b279f5705 Bump version: 0.34.0-beta.0 → 0.34.0-beta.1 2026-06-19 15:59:43 +00:00
Ryan Green
e1334954d7 fix: overflow using sys.maxsize for k in query with namespace connection (#3561) 2026-06-19 12:57:10 -02:30
LanceDB Robot
2f65a233fe chore: update lance dependency to v8.0.0-beta.19 (#3555)
Updates LanceDB's Lance dependencies from v8.0.0-beta.17 to
v8.0.0-beta.19.

This includes the Rust workspace Lance crates, Cargo.lock refresh, and
Java lance-core version bump. Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v8.0.0-beta.19
2026-06-18 14:16:57 -05:00
Lance Release
e81356089a Bump version: 0.30.1-beta.2 → 0.31.0-beta.0 2026-06-18 18:43:22 +00:00
34 changed files with 2297 additions and 105 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.31.0-beta.0"
current_version = "0.31.0-beta.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

105
Cargo.lock generated
View File

@@ -1472,9 +1472,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.11.1"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593"
[[package]]
name = "bytes-utils"
@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arc-swap",
"arrow",
@@ -4810,8 +4810,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4832,7 +4832,7 @@ dependencies = [
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4846,7 +4846,7 @@ dependencies = [
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4855,8 +4855,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrayref",
"paste",
@@ -4865,8 +4865,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4904,8 +4904,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-array",
@@ -4935,8 +4935,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-array",
@@ -4953,8 +4953,8 @@ dependencies = [
[[package]]
name = "lance-derive"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"proc-macro2",
"quote",
@@ -4963,8 +4963,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4999,8 +4999,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -5030,8 +5030,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arc-swap",
"arrow",
@@ -5096,8 +5096,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-arith",
@@ -5138,8 +5138,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5150,12 +5150,13 @@ dependencies = [
"lance-core",
"num-traits",
"rand 0.9.4",
"rayon",
]
[[package]]
name = "lance-namespace"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"async-trait",
@@ -5167,8 +5168,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5222,8 +5223,8 @@ dependencies = [
[[package]]
name = "lance-select"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5238,8 +5239,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-array",
@@ -5278,8 +5279,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5292,20 +5293,21 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "8.0.0-beta.17"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"icu_segmenter",
"jieba-rs",
"lindera",
"rust-stemmers",
"serde",
"stop-words",
"unicode-normalization",
]
[[package]]
name = "lancedb"
version = "0.30.1-beta.2"
version = "0.31.0-beta.1"
dependencies = [
"ahash",
"anyhow",
@@ -5388,7 +5390,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.30.1-beta.2"
version = "0.31.0-beta.1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5413,7 +5415,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.33.1-beta.2"
version = "0.34.0-beta.1"
dependencies = [
"arrow",
"async-trait",
@@ -5956,9 +5958,9 @@ dependencies = [
[[package]]
name = "napi"
version = "3.9.1"
version = "3.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad513ff22558f1830b595ea6eb4091da48145d09a222ce157e781896f78be0b9"
checksum = "fbd9f9295f3ff5921e78a71222c3361a8216f7760b1a99a6ad4e8441de18bbb9"
dependencies = [
"bitflags 2.11.1",
"chrono",
@@ -9205,6 +9207,15 @@ version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51f1e89f093f99e7432c491c382b88a6860a5adbe6bf02574bf0a08efff1978"
[[package]]
name = "stop-words"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68df56303396bcfb639455b3c166804aeb7994005010aab5e9e8a1277b8871d"
dependencies = [
"serde_json",
]
[[package]]
name = "str_stack"
version = "0.1.1"

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.31.0-beta.0</version>
<version>0.31.0-beta.1</version>
</dependency>
```

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.31.0-beta.0</version>
<version>0.31.0-beta.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.31.0-beta.0</version>
<version>0.31.0-beta.1</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>8.0.0-beta.17</lance-core.version>
<lance-core.version>9.0.0-beta.2</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.31.0-beta.0"
version = "0.31.0-beta.1"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.30.1-beta.2",
"version": "0.31.0-beta.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.30.1-beta.2",
"version": "0.31.0-beta.1",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.31.0-beta.0",
"version": "0.31.0-beta.1",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.34.0-beta.0"
current_version = "0.34.0-beta.2"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.34.0-beta.0"
version = "0.34.0-beta.2"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -71,6 +71,9 @@ from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
_MAX_QUERY_K = 2**31 - 1
def _query_to_namespace_request(
table_id: List[str],
query: "Query",
@@ -148,7 +151,8 @@ def _query_to_namespace_request(
if query.limit is not None:
k = query.limit
elif query.vector is None and query.full_text_query is None:
k = sys.maxsize
# limit k to max i32 value to avoid client overflows
k = _MAX_QUERY_K
else:
k = 10

View File

@@ -91,7 +91,9 @@ async def test_create_scalar_index(some_table: AsyncTable):
# Can recreate if replace=True
await some_table.create_index("id", replace=True)
indices = await some_table.list_indices()
assert str(indices) == '[Index(BTree, columns=["id"], name="id_idx")]'
assert str(indices).startswith(
'[IndexConfig(name="id_idx", index_type="BTree", columns=["id"]'
)
assert len(indices) == 1
assert indices[0].index_type == "BTree"
assert indices[0].columns == ["id"]
@@ -106,6 +108,27 @@ async def test_create_scalar_index(some_table: AsyncTable):
assert len(indices) == 0
@pytest.mark.asyncio
async def test_index_config_repr(db_async):
# Use >= 1000 rows so the thousands separator in the repr is exercised.
nrows = 1500
table = await db_async.create_table(
"repr_table", pa.Table.from_pydict({"id": list(range(nrows))})
)
await table.create_index("id", config=BTree())
indices = await table.list_indices()
assert len(indices) == 1
r = repr(indices[0])
assert r.startswith('IndexConfig(name="id_idx", index_type="BTree", columns=["id"]')
# Integer counts use `_` thousands separators (valid Python int syntax).
assert "num_indexed_rows=1_500" in r
assert "num_unindexed_rows=0" in r
# created_at renders as a datetime so the value round-trips.
assert "created_at=datetime.datetime(" in r
assert r.endswith(")")
@pytest.mark.asyncio
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
metadata_type = pa.struct(
@@ -198,7 +221,9 @@ async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
await some_table.create_index("fsb", config=BTree())
indices = await some_table.list_indices()
assert str(indices) == '[Index(BTree, columns=["fsb"], name="fsb_idx")]'
assert str(indices).startswith(
'[IndexConfig(name="fsb_idx", index_type="BTree", columns=["fsb"]'
)
assert len(indices) == 1
assert indices[0].index_type == "BTree"
assert indices[0].columns == ["fsb"]
@@ -247,7 +272,9 @@ async def test_create_bitmap_index(some_table: AsyncTable):
async def test_create_label_list_index(some_table: AsyncTable):
await some_table.create_index("tags", config=LabelList())
indices = await some_table.list_indices()
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
assert str(indices).startswith(
'[IndexConfig(name="tags_idx", index_type="LabelList", columns=["tags"]'
)
plan = await some_table.query().where("array_has(tags, 'tag0')").explain_plan()
assert "ScalarIndexQuery" in plan
@@ -262,7 +289,9 @@ async def test_create_large_list_label_list_index(db_async):
await table.create_index("tags", config=LabelList())
indices = await table.list_indices()
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
assert str(indices).startswith(
'[IndexConfig(name="tags_idx", index_type="LabelList", columns=["tags"]'
)
plan = await table.query().where("array_has(tags, 'shared')").explain_plan()
assert "ScalarIndexQuery" in plan
@@ -299,7 +328,9 @@ async def test_create_label_list_index_rejects_list_struct(db_async):
async def test_full_text_search_index(some_table: AsyncTable):
await some_table.create_index("tags", config=FTS(with_position=False))
indices = await some_table.list_indices()
assert str(indices) == '[Index(FTS, columns=["tags"], name="tags_idx")]'
assert str(indices).startswith(
'[IndexConfig(name="tags_idx", index_type="FTS", columns=["tags"]'
)
await some_table.prewarm_index("tags_idx")

View File

@@ -5,11 +5,11 @@
import tempfile
import shutil
import sys
import pytest
import pyarrow as pa
import lancedb
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
from lancedb.namespace import _MAX_QUERY_K
from lancedb.table import AsyncTable, LanceTable
@@ -816,10 +816,13 @@ class TestPushdownOperations:
["geneva", "hist"],
["geneva", "hist"],
]
# Unlimited reads cap k at i32::MAX (the namespace query_table `k`
# field is i32); sys.maxsize would overflow the Rust binding.
assert [request.k for request in namespace_client.requests] == [
sys.maxsize,
sys.maxsize,
_MAX_QUERY_K,
_MAX_QUERY_K,
]
assert all(r.k <= 2**31 - 1 for r in namespace_client.requests)
@pytest.mark.asyncio
@@ -874,10 +877,13 @@ class TestAsyncPushdownOperations:
["geneva", "hist"],
["geneva", "hist"],
]
# Unlimited reads cap k at i32::MAX (the namespace query_table `k`
# field is i32); sys.maxsize would overflow the Rust binding.
assert [request.k for request in namespace_client.requests] == [
sys.maxsize,
sys.maxsize,
_MAX_QUERY_K,
_MAX_QUERY_K,
]
assert all(r.k <= 2**31 - 1 for r in namespace_client.requests)
def test_local_table_to_arrow_and_to_pandas_are_unchanged(tmp_path):

View File

@@ -319,11 +319,53 @@ pub struct IndexConfig {
#[pymethods]
impl IndexConfig {
pub fn __repr__(&self) -> String {
format!(
"Index({}, columns={:?}, name=\"{}\")",
self.index_type, self.columns, self.name
)
pub fn __repr__(&self, py: Python<'_>) -> String {
let mut fields = vec![
format!("name={:?}", self.name),
format!("index_type={:?}", self.index_type),
format!("columns={:?}", self.columns),
];
if let Some(v) = &self.index_uuid {
fields.push(format!("index_uuid={:?}", v));
}
if let Some(v) = &self.type_url {
fields.push(format!("type_url={:?}", v));
}
if let Some(v) = self.created_at {
// Render the datetime's own Python repr so the value round-trips,
// falling back to RFC 3339 if the conversion ever fails.
let rendered = v
.into_pyobject(py)
.ok()
.and_then(|obj| obj.into_any().repr().ok())
.map(|r| r.to_string())
.unwrap_or_else(|| v.to_rfc3339());
fields.push(format!("created_at={}", rendered));
}
if let Some(v) = self.num_indexed_rows {
fields.push(format!("num_indexed_rows={}", fmt_thousands(v)));
}
if let Some(v) = self.num_unindexed_rows {
fields.push(format!("num_unindexed_rows={}", fmt_thousands(v)));
}
if let Some(v) = self.size_bytes {
fields.push(format!("size_bytes={}", fmt_thousands(v)));
}
if let Some(v) = self.num_segments {
fields.push(format!("num_segments={}", v));
}
if let Some(v) = self.index_version {
fields.push(format!("index_version={}", v));
}
if let Some(v) = &self.index_details {
let details = v
.bind(py)
.repr()
.map(|r| r.to_string())
.unwrap_or_else(|_| "<unavailable>".to_string());
fields.push(format!("index_details={}", details));
}
format!("IndexConfig({})", fields.join(", "))
}
// For backwards-compatibility with the old sync SDK, we also support getting
@@ -352,6 +394,23 @@ impl IndexConfig {
}
}
/// Format an integer with `_` thousands separators, e.g. `24_500_213`.
///
/// Underscores are valid Python int-literal syntax, so the repr stays
/// copy-pasteable and machine-parseable while remaining readable.
fn fmt_thousands(n: u64) -> String {
let digits = n.to_string();
let bytes = digits.as_bytes();
let mut out = String::with_capacity(digits.len() + digits.len() / 3);
for (i, b) in bytes.iter().enumerate() {
if i > 0 && (bytes.len() - i).is_multiple_of(3) {
out.push('_');
}
out.push(*b as char);
}
out
}
fn parse_index_details(py: Python<'_>, s: String) -> Py<PyAny> {
let json = py.import("json").expect("json module is always available");
match json.call_method1("loads", (s.as_str(),)) {

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.31.0-beta.0"
version = "0.31.0-beta.1"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

435
rust/lancedb/src/blob.rs Normal file
View File

@@ -0,0 +1,435 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Lance blob v2 columns store large binary payloads out of line.
//!
//! Declare a column with [`blob`]. On write, [`crate::table::Table::add`] coerces
//! raw `Binary` / `LargeBinary` into the blob struct layout. Queries return
//! small descriptors, not bytes.
//!
//! Blob tables require Lance file format >= 2.2 and stable row ids at create.
use std::sync::Arc;
use arrow_array::builder::LargeBinaryBuilder;
use arrow_array::{Array, LargeBinaryArray, RecordBatch, StructArray, UInt8Array, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use lance::dataset::{Dataset, WriteParams};
use lance_arrow::FieldExt;
use lance_core::datatypes::parse_field_path;
use lance_encoding::version::LanceFileVersion;
use crate::error::{Error, Result};
pub use lance::dataset::BlobFile;
/// Creates an Arrow field for a Lance blob v2 column.
///
/// `Struct<data, uri>` with the `lance.blob.v2` marker. Same layout Lance
/// expects on write.
///
/// A blob column may be top-level or nested inside a struct or list. Nested
/// blobs are addressed by a dotted path (e.g. `info.blob`) in the read APIs.
///
/// ```
/// use arrow_schema::{DataType, Field, Schema};
///
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int64, false),
/// lancedb::blob("image", true),
/// ]);
/// ```
pub fn blob(name: impl AsRef<str>, nullable: bool) -> Field {
lance::blob::blob_field(name.as_ref(), nullable)
}
/// Returns true if `field` is a blob v2 column.
///
/// ```
/// let field = lancedb::blob("image", true);
/// assert!(lancedb::blob::is_blob(&field));
/// ```
pub fn is_blob(field: &Field) -> bool {
field.is_blob_v2()
}
/// Returns true if `field`, or any field nested under it, is a blob v2 column.
fn field_tree_has_blob_v2(field: &Field) -> bool {
if field.is_blob_v2() {
return true;
}
match field.data_type() {
DataType::Struct(children) => children.iter().any(|c| field_tree_has_blob_v2(c)),
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
field_tree_has_blob_v2(child)
}
_ => false,
}
}
/// Collects the dotted paths of blob v2 columns under `field`, into `paths`.
fn collect_blob_paths(field: &Field, prefix: &str, paths: &mut Vec<String>) {
let path = if prefix.is_empty() {
field.name().clone()
} else {
format!("{prefix}.{}", field.name())
};
if field.is_blob_v2() {
paths.push(path);
return;
}
match field.data_type() {
DataType::Struct(children) => {
for child in children {
collect_blob_paths(child, &path, paths);
}
}
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
collect_blob_paths(child, &path, paths)
}
_ => {}
}
}
/// Returns true if `schema` declares any blob v2 column, including nested ones.
pub(crate) fn has_blob_columns(schema: &Schema) -> bool {
schema.fields().iter().any(|f| field_tree_has_blob_v2(f))
}
/// Blob v2 column paths in `schema`, declaration order preserved. Nested blobs
/// are dotted paths (e.g. `info.blob`).
pub(crate) fn blob_column_names(schema: &Schema) -> Vec<String> {
let mut paths = Vec::new();
for field in schema.fields() {
collect_blob_paths(field, "", &mut paths);
}
paths
}
/// Bumps storage format to at least [`LanceFileVersion::V2_2`] for blob schemas.
pub(crate) fn ensure_blob_storage_version(schema: &Schema, params: &mut WriteParams) {
if !has_blob_columns(schema) {
return;
}
let resolved = params
.data_storage_version
.unwrap_or(LanceFileVersion::Stable)
.resolve();
if resolved < LanceFileVersion::V2_2 {
params.data_storage_version = Some(LanceFileVersion::V2_2);
}
}
/// Validate that `column` exists and is a blob v2 column.
///
/// Legacy v1 columns (`lance-encoding:blob`) error with a migration hint.
pub(crate) fn ensure_blob_v2_column(
schema: &lance_core::datatypes::Schema,
column: &str,
) -> Result<()> {
match schema.field(column) {
Some(field) if field.is_blob_v2() => Ok(()),
Some(field) if field.is_blob() => Err(Error::InvalidInput {
message: format!(
"column '{column}' is a legacy blob column; blob APIs require blob v2 columns \
(ARROW:extension:name = \"lance.blob.v2\")"
),
}),
Some(_) => Err(Error::InvalidInput {
message: format!("column '{column}' is not a blob column"),
}),
None => Err(Error::InvalidInput {
message: format!("no column named '{column}' in this table"),
}),
}
}
/// Returns the leaf descriptor `StructArray` for `column` in a descriptor batch.
fn leaf_descriptor_struct<'a>(batch: &'a RecordBatch, column: &str) -> Result<&'a StructArray> {
let path = parse_field_path(column).map_err(|e| Error::InvalidInput {
message: format!("invalid blob column path '{column}': {e}"),
})?;
let not_struct = || Error::Runtime {
message: format!("blob column '{column}' did not read back as a descriptor struct"),
};
let mut current = batch
.column_by_name(&path[0])
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.ok_or_else(not_struct)?;
for segment in &path[1..] {
current = current
.column_by_name(segment)
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.ok_or_else(not_struct)?;
}
Ok(current)
}
/// Null rows in `row_ids`, from a descriptor take.
///
/// Lance `read_blobs` / `take_blobs` skip null rows (`kind == 0 && position == 0 && size == 0`).
/// TODO(lance): aligned read API would drop this pass.
async fn blob_null_mask(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<Vec<bool>> {
let projection = dataset.schema().project(&[column])?;
let descriptors = dataset.take_builder(row_ids, projection)?.execute().await?;
if descriptors.num_rows() != row_ids.len() {
return Err(Error::InvalidInput {
message: format!(
"blob take for column '{column}' requested {} row ids but only {} exist in the \
table; pass row ids collected from this table",
row_ids.len(),
descriptors.num_rows()
),
});
}
let descriptor_struct = leaf_descriptor_struct(&descriptors, column)?;
let child = |name: &str| {
descriptor_struct
.column_by_name(name)
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor for '{column}' is missing the '{name}' field"),
})
};
let kinds = child("kind")?
.as_any()
.downcast_ref::<UInt8Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'kind' for '{column}' is not a UInt8 array"),
})?;
let positions = child("position")?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'position' for '{column}' is not a UInt64 array"),
})?;
let sizes = child("size")?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'size' for '{column}' is not a UInt64 array"),
})?;
// Match Lance `collect_blob_entries_v2` skip condition (`BlobKind::Inline` == 0).
Ok((0..descriptor_struct.len())
.map(|i| {
descriptor_struct.is_null(i)
|| kinds.is_null(i)
|| (kinds.value(i) == 0 && positions.value(i) == 0 && sizes.value(i) == 0)
})
.collect())
}
fn non_null_row_ids(row_ids: &[u64], null_mask: &[bool]) -> Vec<u64> {
row_ids
.iter()
.zip(null_mask)
.filter_map(|(row_id, is_null)| (!is_null).then_some(*row_id))
.collect()
}
/// Materialize blob bytes for `row_ids` (same length and order, nulls preserved).
pub(crate) async fn take_blobs_aligned(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<LargeBinaryArray> {
ensure_blob_v2_column(dataset.schema(), column)?;
if row_ids.is_empty() {
return Ok(LargeBinaryBuilder::new().finish());
}
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
let non_null_count = non_null_row_ids.len();
let payloads = if non_null_count == 0 {
Vec::new()
} else {
dataset
.read_blobs(column)?
.with_row_ids(non_null_row_ids)
.preserve_order(true)
.execute()
.await?
};
if payloads.len() != non_null_count {
return Err(Error::Runtime {
message: format!(
"blob read for column '{column}' returned {} payloads for {} non-null rows",
payloads.len(),
non_null_count
),
});
}
let mut builder = LargeBinaryBuilder::new();
let mut payload_idx = 0;
for is_null in &null_mask {
if *is_null {
builder.append_null();
} else {
builder.append_value(payloads[payload_idx].data.as_ref());
payload_idx += 1;
}
}
Ok(builder.finish())
}
/// Open lazy [`BlobFile`] handles for `row_ids` (same length and order, nulls as `None`).
pub(crate) async fn take_blob_files_aligned(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
ensure_blob_v2_column(dataset.schema(), column)?;
if row_ids.is_empty() {
return Ok(Vec::new());
}
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
let handles = if non_null_row_ids.is_empty() {
Vec::new()
} else {
dataset.take_blobs(&non_null_row_ids, column).await?
};
if handles.len() != non_null_row_ids.len() {
return Err(Error::Runtime {
message: format!(
"blob take for column '{column}' returned {} handles for {} non-null rows",
handles.len(),
non_null_row_ids.len()
),
});
}
let mut handles = handles.into_iter();
Ok(null_mask
.iter()
.map(|is_null| {
if *is_null {
None
} else {
Some(handles.next().unwrap())
}
})
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::DataType;
use lance_arrow::ARROW_EXT_NAME_KEY;
fn blob_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
])
}
#[test]
fn blob_field_carries_v2_extension_marker() {
let field = blob("image", true);
assert_eq!(
field.metadata().get(ARROW_EXT_NAME_KEY).map(String::as_str),
Some("lance.blob.v2")
);
assert!(matches!(field.data_type(), DataType::Struct(_)));
}
#[test]
fn has_blob_columns_detects_blob_fields() {
assert!(has_blob_columns(&blob_schema()));
let plain = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
assert!(!has_blob_columns(&plain));
}
#[test]
fn storage_version_bumps_to_v2_2() {
let mut params = WriteParams::default();
ensure_blob_storage_version(&blob_schema(), &mut params);
assert_eq!(
params.data_storage_version.unwrap().resolve(),
LanceFileVersion::V2_2
);
}
#[test]
fn storage_version_overrides_lower_explicit_version() {
let mut params = WriteParams {
data_storage_version: Some(LanceFileVersion::V2_0),
..Default::default()
};
ensure_blob_storage_version(&blob_schema(), &mut params);
assert_eq!(
params.data_storage_version.unwrap().resolve(),
LanceFileVersion::V2_2
);
}
#[test]
fn storage_version_keeps_higher_explicit_version() {
let mut params = WriteParams {
data_storage_version: Some(LanceFileVersion::V2_3),
..Default::default()
};
ensure_blob_storage_version(&blob_schema(), &mut params);
assert_eq!(params.data_storage_version.unwrap(), LanceFileVersion::V2_3);
}
#[test]
fn legacy_v1_blob_column_is_rejected_with_migration_hint() {
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([(
"lance-encoding:blob".to_string(),
"true".to_string(),
)]),
);
let arrow_schema = Schema::new(vec![legacy]);
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
let err = ensure_blob_v2_column(&lance_schema, "image").unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
assert!(err.to_string().contains("legacy blob column"));
assert!(err.to_string().contains("lance.blob.v2"));
}
#[test]
fn non_blob_and_unknown_columns_are_rejected_by_name() {
let arrow_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
let err = ensure_blob_v2_column(&lance_schema, "id").unwrap_err();
assert!(err.to_string().contains("'id' is not a blob column"));
let err = ensure_blob_v2_column(&lance_schema, "missing").unwrap_err();
assert!(err.to_string().contains("no column named 'missing'"));
}
#[test]
fn blob_column_names_includes_nested_path() {
let blob_field = blob("blob", true);
let info = Field::new(
"info",
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
true,
);
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), info]);
assert_eq!(blob_column_names(&schema), vec!["info.blob"]);
}
#[test]
fn storage_version_noop_without_blob_columns() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let mut params = WriteParams::default();
ensure_blob_storage_version(&schema, &mut params);
assert!(params.data_storage_version.is_none());
}
}

View File

@@ -18,6 +18,7 @@ use lance_table::io::commit::commit_handler_from_url;
use object_store::local::LocalFileSystem;
use snafu::ResultExt;
use crate::blob::{ensure_blob_storage_version, has_blob_columns};
use crate::connection::ConnectRequest;
use crate::database::ReadConsistency;
use crate::database::namespace::LanceNamespaceDatabase;
@@ -838,13 +839,16 @@ impl ListingDatabase {
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
// Apply enable_stable_row_ids: table-level override takes precedence over connection config
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
let data_schema = request.data.arrow_schema();
if let Some(enable_stable_row_ids) = stable_row_ids_override
.or(self.new_table_config.enable_stable_row_ids)
.or(has_blob_columns(&data_schema).then_some(true))
{
write_params.enable_stable_row_ids = enable_stable_row_ids;
}
ensure_blob_storage_version(&data_schema, &mut write_params);
if matches!(&request.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}

View File

@@ -23,6 +23,7 @@ use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::blob::{ensure_blob_storage_version, has_blob_columns};
use crate::connection::NamespaceClientPushdownOperation;
use crate::database::ReadConsistency;
use crate::database::listing::{
@@ -257,12 +258,16 @@ impl LanceNamespaceDatabase {
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
let data_schema = request.data.schema();
if let Some(enable_stable_row_ids) = stable_row_ids_override
.or(self.new_table_config.enable_stable_row_ids)
.or(has_blob_columns(data_schema.as_ref()).then_some(true))
{
params.enable_stable_row_ids = enable_stable_row_ids;
}
ensure_blob_storage_version(data_schema.as_ref(), params);
Ok(())
}
}

View File

@@ -163,6 +163,7 @@
//! ```
pub mod arrow;
pub mod blob;
pub mod connection;
pub mod data;
pub mod database;
@@ -188,6 +189,7 @@ use std::{fmt::Display, str::FromStr};
use serde::{Deserialize, Serialize};
pub use blob::{blob, is_blob};
pub use connection::{ConnectNamespaceBuilder, Connection};
pub use error::{Error, Result};
use lance_index::vector::ApproxMode as LanceApproxMode;

View File

@@ -1352,6 +1352,35 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
}
}
/// Deserialize an index's `created_at` field.
///
/// The server returns this as an RFC 3339 string (e.g. `"2026-06-18T21:37:36.637Z"`),
/// but older deployments sent a unix timestamp in milliseconds. Accept both so the
/// client works against any server version.
fn deserialize_created_at<'de, D>(
deserializer: D,
) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error as _;
#[derive(Deserialize)]
#[serde(untagged)]
enum CreatedAt {
Rfc3339(String),
Millis(i64),
}
match Option::<CreatedAt>::deserialize(deserializer)? {
None => Ok(None),
Some(CreatedAt::Rfc3339(s)) => DateTime::parse_from_rfc3339(&s)
.map(|dt| Some(dt.with_timezone(&Utc)))
.map_err(D::Error::custom),
Some(CreatedAt::Millis(ms)) => Ok(DateTime::from_timestamp_millis(ms)),
}
}
impl<S: HttpSend + 'static> RemoteTable<S> {
/// Parse the response from `/index/list/` into `IndexConfig` entries.
///
@@ -1380,7 +1409,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
// Used as the sentinel to decide whether to skip the stats call.
index_type: Option<IndexType>,
index_uuid: Option<String>,
#[serde(default, with = "chrono::serde::ts_milliseconds_option")]
#[serde(default, deserialize_with = "deserialize_created_at")]
created_at: Option<DateTime<Utc>>,
num_indexed_rows: Option<u64>,
num_unindexed_rows: Option<u64>,
@@ -4678,7 +4707,7 @@ mod tests {
"num_segments": 2,
"index_version": 1,
"index_details": "{\"num_partitions\":16}",
"created_at": 1700000000000i64,
"created_at": "2026-06-18T21:37:36.637Z",
"type_url": "type.googleapis.com/lance.index.vector.IvfPq",
},
{
@@ -4728,7 +4757,10 @@ mod tests {
vec_idx.type_url,
Some("type.googleapis.com/lance.index.vector.IvfPq".to_string())
);
assert!(vec_idx.created_at.is_some());
assert_eq!(
vec_idx.created_at,
Some("2026-06-18T21:37:36.637Z".parse::<DateTime<Utc>>().unwrap())
);
let text_idx = &indices[1];
assert_eq!(text_idx.name, "text_idx");
@@ -4749,6 +4781,36 @@ mod tests {
assert_eq!(text_idx.created_at, None);
}
#[test]
fn test_deserialize_created_at() {
#[derive(Deserialize)]
struct Wrapper {
#[serde(default, deserialize_with = "deserialize_created_at")]
created_at: Option<DateTime<Utc>>,
}
// RFC 3339 string (current server format).
let w: Wrapper =
serde_json::from_str(r#"{"created_at": "2026-06-18T21:37:36.637Z"}"#).unwrap();
assert_eq!(
w.created_at,
Some("2026-06-18T21:37:36.637Z".parse::<DateTime<Utc>>().unwrap())
);
// Unix milliseconds (legacy server format).
let w: Wrapper = serde_json::from_str(r#"{"created_at": 1700000000000}"#).unwrap();
assert_eq!(w.created_at, DateTime::from_timestamp_millis(1700000000000));
// Null and missing both yield None.
let w: Wrapper = serde_json::from_str(r#"{"created_at": null}"#).unwrap();
assert_eq!(w.created_at, None);
let w: Wrapper = serde_json::from_str(r#"{}"#).unwrap();
assert_eq!(w.created_at, None);
// A malformed string is rejected rather than silently dropped to None.
assert!(serde_json::from_str::<Wrapper>(r#"{"created_at": "not-a-date"}"#).is_err());
}
#[tokio::test]
async fn test_list_versions() {
let table = Table::new_with_handler("my_table", |request| {

View File

@@ -3,7 +3,7 @@
//! LanceDB Table APIs
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_array::{LargeBinaryArray, RecordBatch, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_execution::TaskContext;
@@ -12,6 +12,7 @@ use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use lance::dataset::BlobFile;
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
@@ -587,6 +588,28 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn close_lsm_writers(&self) -> Result<()> {
Ok(())
}
/// Names of the blob v2 columns in this table, in declaration order.
async fn blob_columns(&self) -> Result<Vec<String>> {
Err(Error::NotSupported {
message: "blob_columns is not supported on this table type".into(),
})
}
/// Materialize blob bytes for the given row ids. See [`Table::fetch_blobs`].
async fn fetch_blobs(&self, _column: &str, _row_ids: &[u64]) -> Result<LargeBinaryArray> {
Err(Error::NotSupported {
message: "fetch_blobs is not supported on this table type".into(),
})
}
/// Open lazy blob handles for the given row ids. See [`Table::fetch_blob_files`].
async fn fetch_blob_files(
&self,
_column: &str,
_row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
Err(Error::NotSupported {
message: "fetch_blob_files is not supported on this table type".into(),
})
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -927,6 +950,76 @@ impl Table {
self.inner.count_rows(filter.map(Filter::Sql)).await
}
/// Names of the blob v2 columns in this table, in declaration order.
///
/// Nested blobs use dotted paths (e.g. `info.blob`). Returns
/// [`Error::NotSupported`] on table types without blob support.
pub async fn blob_columns(&self) -> Result<Vec<String>> {
self.inner.blob_columns().await
}
/// Materialize blob bytes for the given row ids.
///
/// Output matches `row_ids` in length and order. Null and zero-length rows
/// are null. Prefer [`Self::fetch_blob_files`] for large selections.
///
/// ```
/// use arrow_array::UInt64Array;
/// use futures::TryStreamExt;
/// use lancedb::query::{ExecutableQuery, QueryBase};
///
/// # use lancedb::Table;
/// # async fn materialize(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// let mut stream = table.query().with_row_id().limit(10).execute().await?;
/// while let Some(batch) = stream.try_next().await? {
/// let row_ids = batch
/// .column_by_name("_rowid")
/// .unwrap()
/// .as_any()
/// .downcast_ref::<UInt64Array>()
/// .unwrap();
/// let images = table.fetch_blobs("image", row_ids.values()).await?;
/// let _ = images;
/// }
/// # Ok(())
/// # }
/// ```
///
/// Returns [`Error::InvalidInput`] when the column does not exist or is
/// not a blob v2 column, and [`Error::NotSupported`] on table types
/// without blob support.
pub async fn fetch_blobs(
&self,
column: impl AsRef<str>,
row_ids: &[u64],
) -> Result<LargeBinaryArray> {
self.inner.fetch_blobs(column.as_ref(), row_ids).await
}
/// Open lazy [`BlobFile`] handles for the given row ids.
///
/// Same length and order as `row_ids`. Null rows are `None`. Bytes are not
/// read from disk until a call to [`BlobFile::read`].
///
/// ```
/// # use lancedb::Table;
/// # async fn lazy_read(table: &Table, row_ids: &[u64]) -> Result<(), Box<dyn std::error::Error>> {
/// let handles = table.fetch_blob_files("image", row_ids).await?;
/// if let Some(Some(first)) = handles.first() {
/// let bytes = first.read().await?;
/// println!("first blob is {} bytes", bytes.len());
/// }
/// # Ok(())
/// # }
/// ```
pub async fn fetch_blob_files(
&self,
column: impl AsRef<str>,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
self.inner.fetch_blob_files(column.as_ref(), row_ids).await
}
/// Insert new records into this Table
///
/// # Arguments
@@ -2761,6 +2854,25 @@ impl BaseTable for NativeTable {
merge::lsm::close_lsm_writers(self).await
}
async fn blob_columns(&self) -> Result<Vec<String>> {
let schema = self.schema().await?;
Ok(crate::blob::blob_column_names(schema.as_ref()))
}
async fn fetch_blobs(&self, column: &str, row_ids: &[u64]) -> Result<LargeBinaryArray> {
let dataset = self.dataset.get().await?;
crate::blob::take_blobs_aligned(&dataset, column, row_ids).await
}
async fn fetch_blob_files(
&self,
column: &str,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
let dataset = self.dataset.get().await?;
crate::blob::take_blob_files_aligned(&dataset, column, row_ids).await
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
let result = delete::execute_delete(self, predicate).await?;

View File

@@ -26,6 +26,9 @@ pub enum AddDataMode {
#[default]
Append,
/// The existing table will be overwritten with the new data
///
/// On overwrite, raw binary is not coerced into a blob struct. The input
/// must declare blob v2 for the column to stay a blob column.
Overwrite,
}

View File

@@ -3,6 +3,7 @@
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
mod blob_coerce;
pub mod cast;
pub mod insert;
pub mod reject_nan;

View File

@@ -0,0 +1,495 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Coerces write-path input into blob v2 struct columns.
//!
//! [`super::cast::cast_to_table_schema`] calls [`coerce_blob_expr`].
use std::sync::Arc;
use arrow_schema::{DataType, Field, FieldRef};
use datafusion::functions::core::{get_field, named_struct};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::PhysicalExpr;
use crate::error::{Error, Result};
/// Build a projection expression coercing `input_expr` into the blob struct
/// declared by `table_field`, composing `named_struct` / `get_field` / `cast`.
pub(super) fn coerce_blob_expr(
input_expr: Arc<dyn PhysicalExpr>,
input_field: &Field,
table_field: &FieldRef,
config: &Arc<ConfigOptions>,
) -> Result<(Arc<dyn PhysicalExpr>, FieldRef)> {
let DataType::Struct(declared_fields) = table_field.data_type() else {
return Err(Error::InvalidInput {
message: format!(
"blob v2 column '{}' must be a struct, table declares {}",
table_field.name(),
table_field.data_type()
),
});
};
let input_struct_children = match input_field.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => None,
DataType::Struct(children) => {
if !children
.iter()
.any(|c| c.name() == "data" || c.name() == "uri")
{
return Err(Error::InvalidInput {
message: format!(
"blob struct input for column '{}' must contain a 'data' or 'uri' child",
table_field.name()
),
});
}
Some(children)
}
other => {
return Err(Error::InvalidInput {
message: format!(
"cannot coerce column '{}' with type {} into a blob v2 struct. \
expected Binary, LargeBinary, BinaryView, or a Struct with a 'data' or 'uri' child",
table_field.name(),
other,
),
});
}
};
let mut ns_args: Vec<Arc<dyn PhysicalExpr>> = Vec::with_capacity(declared_fields.len() * 2);
for declared in declared_fields.iter() {
ns_args.push(Arc::new(Literal::new(ScalarValue::from(
declared.name().as_str(),
))));
let value: Arc<dyn PhysicalExpr> = match input_struct_children {
// Raw binary lands in `data` and everything else is a typed null.
None => {
if declared.name() == "data" {
Arc::new(CastExpr::new(
input_expr.clone(),
declared.data_type().clone(),
None,
))
} else {
typed_null(declared.data_type())?
}
}
Some(children) => match children.iter().find(|c| c.name() == declared.name()) {
Some(child) => {
let field_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
&format!("get_field({})", declared.name()),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(declared.name().as_str()))),
],
Arc::new(child.as_ref().clone()),
config.clone(),
));
if child.data_type() == declared.data_type() {
field_expr
} else {
Arc::new(CastExpr::new(
field_expr,
declared.data_type().clone(),
None,
))
}
}
None => typed_null(declared.data_type())?,
},
};
ns_args.push(value);
}
let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
&format!("named_struct({})", table_field.name()),
named_struct(),
ns_args,
table_field.clone(),
config.clone(),
));
Ok((expr, table_field.clone()))
}
fn typed_null(data_type: &DataType) -> Result<Arc<dyn PhysicalExpr>> {
let scalar = ScalarValue::try_from(data_type).map_err(|e| Error::InvalidInput {
message: format!("cannot build null literal for blob child type {data_type}: {e}"),
})?;
Ok(Arc::new(Literal::new(scalar)))
}
#[cfg(test)]
mod tests {
use super::super::cast::cast_to_table_schema;
use super::*;
use crate::blob::blob;
use arrow_array::{
Array, ArrayRef, BinaryArray, BinaryViewArray, Int32Array, Int64Array, LargeBinaryArray,
RecordBatch, StringArray, StructArray, UInt8Array, UInt64Array,
};
use arrow_schema::Schema;
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use datafusion_physical_plan::ExecutionPlan;
use futures::TryStreamExt;
use lance_arrow::FieldExt;
use std::collections::HashMap;
fn wide_blob_field(name: &str) -> Field {
Field::new(
name,
DataType::Struct(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
Field::new("position", DataType::UInt64, true),
Field::new("size", DataType::UInt64, true),
]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
"ARROW:extension:name".to_string(),
"lance.blob.v2".to_string(),
)]))
}
fn blob_table_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
])
}
fn batch_with_image(image_field: Field, image: ArrayRef) -> RecordBatch {
let len = image.len();
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
image_field,
])),
vec![Arc::new(Int64Array::from_iter_values(0..len as i64)), image],
)
.unwrap()
}
fn image_struct(batch: &RecordBatch) -> &StructArray {
batch
.column_by_name("image")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
}
async fn plan_from_batch(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
let schema = batch.schema();
let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
let ctx = SessionContext::new();
ctx.register_table("t", Arc::new(table)).unwrap();
let df = ctx.table("t").await.unwrap();
df.create_physical_plan().await.unwrap()
}
async fn coerce(batch: RecordBatch, table_schema: &Schema) -> RecordBatch {
let plan = plan_from_batch(batch).await;
let plan = cast_to_table_schema(plan, table_schema).unwrap();
let ctx = SessionContext::new();
let stream = plan.execute(0, ctx.task_ctx()).unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
arrow_select::concat::concat_batches(&plan.schema(), &batches).unwrap()
}
async fn coerce_err(batch: RecordBatch, table_schema: &Schema) -> Error {
let plan = plan_from_batch(batch).await;
cast_to_table_schema(plan, table_schema).unwrap_err()
}
#[tokio::test]
async fn large_binary_coerces_to_declared_blob_struct() {
let batch = batch_with_image(
Field::new("image", DataType::LargeBinary, true),
Arc::new(LargeBinaryArray::from_iter_values([b"hello".as_slice()])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
let image_field = coerced.schema().field_with_name("image").unwrap().clone();
assert!(image_field.is_blob_v2());
assert!(matches!(image_field.data_type(), DataType::Struct(_)));
let data = image_struct(&coerced).column_by_name("data").unwrap();
let data: &LargeBinaryArray = data.as_any().downcast_ref().unwrap();
assert_eq!(data.value(0), b"hello");
}
#[tokio::test]
async fn binary_coerces_to_declared_blob_struct() {
let batch = batch_with_image(
Field::new("image", DataType::Binary, true),
Arc::new(BinaryArray::from_iter_values([b"hi".as_slice()])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
assert!(
coerced
.schema()
.field_with_name("image")
.unwrap()
.is_blob_v2()
);
}
#[tokio::test]
async fn binary_view_coerces_to_declared_blob_struct() {
let batch = batch_with_image(
Field::new("image", DataType::BinaryView, true),
Arc::new(BinaryViewArray::from_iter_values([b"view".as_slice()])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
let data = image_struct(&coerced).column_by_name("data").unwrap();
let data: &LargeBinaryArray = data.as_any().downcast_ref().unwrap();
assert_eq!(data.value(0), b"view");
}
#[tokio::test]
async fn binary_nulls_stay_null_after_coercion() {
let batch = batch_with_image(
Field::new("image", DataType::Binary, true),
Arc::new(BinaryArray::from_iter(vec![
Some(b"present".as_slice()),
None,
])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
let image = image_struct(&coerced);
let data = image.column_by_name("data").unwrap();
assert!(!data.is_null(0));
assert!(data.is_null(1));
}
#[tokio::test]
async fn binary_coerces_into_four_child_blob_layout() {
let table_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
wide_blob_field("image"),
]);
let batch = batch_with_image(
Field::new("image", DataType::LargeBinary, true),
Arc::new(LargeBinaryArray::from_iter(vec![
Some(b"alpha".as_slice()),
None,
])),
);
let coerced = coerce(batch, &table_schema).await;
let image = image_struct(&coerced);
assert_eq!(
image.num_columns(),
4,
"coerced struct keeps the declared layout"
);
assert!(image.column_by_name("position").unwrap().is_null(0));
assert!(image.column_by_name("size").unwrap().is_null(0));
assert!(!image.column_by_name("data").unwrap().is_null(0));
assert!(image.column_by_name("data").unwrap().is_null(1));
}
#[tokio::test]
async fn prebuilt_struct_gains_blob_field_metadata() {
let DataType::Struct(children) = blob("image", true).data_type().clone() else {
unreachable!("blob field is a struct")
};
let prebuilt = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"prebuilt".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let batch = batch_with_image(
Field::new("image", prebuilt.data_type().clone(), true),
Arc::new(prebuilt),
);
let coerced = coerce(batch, &blob_table_schema()).await;
assert!(
coerced
.schema()
.field_with_name("image")
.unwrap()
.is_blob_v2()
);
}
#[tokio::test]
async fn prebuilt_narrow_struct_widens_to_declared_layout() {
let DataType::Struct(narrow_children) = blob("image", true).data_type().clone() else {
unreachable!("blob field is a struct")
};
let prebuilt = StructArray::new(
narrow_children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"prebuilt".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let table_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
wide_blob_field("image"),
]);
let batch = batch_with_image(
Field::new("image", prebuilt.data_type().clone(), true),
Arc::new(prebuilt),
);
let coerced = coerce(batch, &table_schema).await;
let image = image_struct(&coerced);
assert_eq!(image.num_columns(), 4);
assert!(image.column_by_name("position").unwrap().is_null(0));
assert!(image.column_by_name("size").unwrap().is_null(0));
}
#[tokio::test]
async fn external_reference_struct_preserves_uri_position_and_size() {
let prebuilt = StructArray::new(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
Field::new("position", DataType::UInt64, true),
Field::new("size", DataType::UInt64, true),
]
.into(),
vec![
Arc::new(LargeBinaryArray::from(vec![None::<&[u8]>])) as ArrayRef,
Arc::new(StringArray::from(vec![Some("s3://bucket/blob.bin")])) as ArrayRef,
Arc::new(UInt64Array::from(vec![Some(7)])) as ArrayRef,
Arc::new(UInt64Array::from(vec![Some(6)])) as ArrayRef,
],
None,
);
let table_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
wide_blob_field("image"),
]);
let batch = batch_with_image(
Field::new("image", prebuilt.data_type().clone(), true),
Arc::new(prebuilt),
);
let coerced = coerce(batch, &table_schema).await;
let image = image_struct(&coerced);
let uri: &StringArray = image
.column_by_name("uri")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(uri.value(0), "s3://bucket/blob.bin");
let position: &UInt64Array = image
.column_by_name("position")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(position.value(0), 7);
let size: &UInt64Array = image
.column_by_name("size")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(size.value(0), 6);
assert!(image.column_by_name("data").unwrap().is_null(0));
}
#[tokio::test]
async fn descriptor_struct_without_value_child_is_rejected() {
let descriptor = StructArray::new(
vec![
Field::new("kind", DataType::UInt8, false),
Field::new("position", DataType::UInt64, false),
Field::new("size", DataType::UInt64, false),
]
.into(),
vec![
Arc::new(UInt8Array::from(vec![0])),
Arc::new(UInt64Array::from(vec![0])),
Arc::new(UInt64Array::from(vec![0])),
],
None,
);
let batch = batch_with_image(
Field::new("image", descriptor.data_type().clone(), true),
Arc::new(descriptor),
);
let err = coerce_err(batch, &blob_table_schema()).await;
assert!(err.to_string().contains("'data' or 'uri'"));
assert!(err.to_string().contains("image"));
}
#[tokio::test]
async fn unsupported_input_type_is_rejected_with_column_name() {
let batch = batch_with_image(
Field::new("image", DataType::Utf8, true),
Arc::new(StringArray::from(vec!["not bytes"])),
);
let err = coerce_err(batch, &blob_table_schema()).await;
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
assert!(err.to_string().contains("image"));
}
#[tokio::test]
async fn blob_metadata_survives_cast_of_sibling_column() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("image", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(LargeBinaryArray::from_iter_values([b"x".as_slice()])),
],
)
.unwrap();
let coerced = coerce(batch, &blob_table_schema()).await;
let image_field = coerced.schema().field_with_name("image").unwrap().clone();
assert!(
image_field.is_blob_v2(),
"expected blob marker on image field, got {:?}",
image_field.metadata()
);
assert_eq!(
coerced.schema().field_with_name("id").unwrap().data_type(),
&DataType::Int64
);
}
#[tokio::test]
async fn exact_blob_input_passes_through_unchanged() {
let DataType::Struct(children) = blob("image", true).data_type().clone() else {
unreachable!("blob field is a struct")
};
let image = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"exact".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let batch = batch_with_image(blob("image", true), Arc::new(image));
let table_schema = blob_table_schema();
let input = plan_from_batch(batch).await;
let input_ptr = Arc::as_ptr(&input);
let plan = cast_to_table_schema(input, &table_schema).unwrap();
assert_eq!(Arc::as_ptr(&plan), input_ptr, "no projection inserted");
}
}

View File

@@ -13,8 +13,10 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
use lance_arrow::FieldExt;
use lance_arrow::json::{is_arrow_json_field, is_json_field};
use super::blob_coerce::coerce_blob_expr;
use crate::{Error, Result};
pub fn cast_to_table_schema(
@@ -77,6 +79,17 @@ fn build_field_exprs(
continue;
}
// Blob columns accept raw binary on write; exact matches pass through below.
if table_field.is_blob_v2() && input_field.as_ref() != table_field.as_ref() {
result.push(coerce_blob_expr(
input_expr,
input_field,
table_field,
&config,
)?);
continue;
}
let expr = match (input_field.data_type(), table_field.data_type()) {
// Both are structs: recurse into sub-fields to handle subschemas and casts.
(DataType::Struct(in_children), DataType::Struct(tbl_children))

View File

@@ -0,0 +1,949 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use arrow_array::{
Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Fields, Schema};
use futures::TryStreamExt;
use lance_encoding::version::LanceFileVersion;
use lancedb::{
Connection, Error, Result, Table,
blob::blob,
connect, connect_namespace,
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS,
query::{ExecutableQuery, QueryBase},
table::{AddDataMode, CompactionOptions, OptimizeAction},
};
use tempfile::tempdir;
fn blob_table_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
]))
}
fn binary_input_batch(ids: &[i64], payloads: &[Option<&[u8]>]) -> RecordBatch {
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int64Array::from(ids.to_vec())),
Arc::new(LargeBinaryArray::from_iter(payloads.iter().copied())),
],
)
.unwrap()
}
async fn create_inline_blob_table(
db: &Connection,
name: &str,
ids: &[i64],
payloads: &[Option<&[u8]>],
) -> Result<Table> {
let table = db
.create_empty_table(name, blob_table_schema())
.execute()
.await?;
table
.add(binary_input_batch(ids, payloads))
.execute()
.await?;
Ok(table)
}
async fn storage_format_version(table: &Table) -> LanceFileVersion {
table
.as_native()
.unwrap()
.manifest()
.await
.unwrap()
.data_storage_format
.lance_file_version()
.unwrap()
.resolve()
}
async fn uses_stable_row_ids(table: &Table) -> bool {
table
.as_native()
.unwrap()
.manifest()
.await
.unwrap()
.uses_stable_row_ids()
}
async fn query_image_struct(table: &Table) -> StructArray {
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
batch
.column_by_name("image")
.expect("image column present")
.as_any()
.downcast_ref::<StructArray>()
.expect("image column is a descriptor struct")
.clone()
}
#[tokio::test]
async fn declaring_blob_column_bumps_format_and_enables_stable_row_ids() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn explicit_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.storage_option(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, "false")
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn non_blob_table_keeps_default_format_and_row_id_setting() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let table = db.create_empty_table("t", schema).execute().await?;
assert!(storage_format_version(&table).await < LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn creating_with_blob_data_bumps_format() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("image", true);
let DataType::Struct(children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let image = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"payload".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob_field,
])),
vec![Arc::new(Int64Array::from(vec![1])), Arc::new(image)],
)
.unwrap();
let table = db.create_table("t", batch).execute().await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
assert_eq!(table.count_rows(None).await?, 1);
Ok(())
}
#[tokio::test]
async fn add_coerces_large_binary_into_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table =
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"cat".as_slice()), Some(b"dog")])
.await?;
assert_eq!(table.count_rows(None).await?, 2);
let image = query_image_struct(&table).await;
assert_eq!(image.len(), 2);
let schema = table.schema().await?;
let field = schema.field_with_name("image").unwrap();
assert_eq!(
field
.metadata()
.get("ARROW:extension:name")
.map(String::as_str),
Some("lance.blob.v2")
);
Ok(())
}
#[tokio::test]
async fn add_coerces_binary_into_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::Binary, true),
])),
vec![
Arc::new(Int64Array::from(vec![1])),
Arc::new(BinaryArray::from_iter_values([b"small".as_slice()])),
],
)
.unwrap();
table.add(batch).execute().await?;
assert_eq!(table.count_rows(None).await?, 1);
Ok(())
}
#[tokio::test]
async fn add_accepts_null_blob_rows() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"first".as_slice()), None, Some(b"third")],
)
.await?;
assert_eq!(table.count_rows(None).await?, 3);
let image = query_image_struct(&table).await;
assert_eq!(image.len(), 3);
Ok(())
}
#[tokio::test]
async fn add_rejects_uncoercible_blob_input() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::Utf8, true),
])),
vec![
Arc::new(Int64Array::from(vec![1])),
Arc::new(StringArray::from(vec!["not bytes"])),
],
)
.unwrap();
let err = table.add(batch).execute().await.unwrap_err();
assert!(err.to_string().contains("image"));
Ok(())
}
#[tokio::test]
async fn connection_level_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap())
.storage_option(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, "false")
.execute()
.await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn namespace_create_applies_blob_defaults() -> Result<()> {
let tmp = tempdir().unwrap();
let mut properties = std::collections::HashMap::new();
properties.insert("root".to_string(), tmp.path().to_str().unwrap().to_string());
let db = connect_namespace("dir", properties).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
Ok(())
}
// Overwrite takes the input schema as-is. A raw-binary overwrite drops the blob
// marker; re-declaring blob v2 in the input restores it.
#[tokio::test]
async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"blob".as_slice())]).await?;
let raw_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
]));
let raw_batch = RecordBatch::try_new(
raw_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![2])),
Arc::new(LargeBinaryArray::from_iter_values([b"plain".as_slice()])),
],
)
.unwrap();
table
.add(raw_batch)
.mode(AddDataMode::Overwrite)
.execute()
.await?;
let schema = table.schema().await?;
assert_eq!(schema, raw_schema);
assert!(
!schema
.field_with_name("image")
.unwrap()
.metadata()
.contains_key("ARROW:extension:name")
);
let blob_field = blob("image", true);
let DataType::Struct(children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let image = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"declared".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let declared_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob_field,
])),
vec![Arc::new(Int64Array::from(vec![3])), Arc::new(image)],
)
.unwrap();
table
.add(declared_batch)
.mode(AddDataMode::Overwrite)
.execute()
.await?;
let schema = table.schema().await?;
assert_eq!(
schema
.field_with_name("image")
.unwrap()
.metadata()
.get("ARROW:extension:name")
.map(String::as_str),
Some("lance.blob.v2")
);
Ok(())
}
async fn collect_row_ids(table: &Table) -> Result<Vec<u64>> {
let batches = table
.query()
.with_row_id()
.execute()
.await?
.try_collect::<Vec<_>>()
.await?;
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
Ok(batch
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values()
.to_vec())
}
async fn collect_id_rowid(table: &Table) -> Result<Vec<(i64, u64)>> {
let batches = table
.query()
.with_row_id()
.execute()
.await?
.try_collect::<Vec<_>>()
.await?;
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let row_ids = batch
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
Ok(ids
.values()
.iter()
.copied()
.zip(row_ids.values().iter().copied())
.collect())
}
#[tokio::test]
async fn fetch_blobs_round_trips_bytes() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let payload: &[u8] = b"blob-round-trip-payload";
let table = create_inline_blob_table(&db, "t", &[1], &[Some(payload)]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 1);
assert_eq!(bytes.value(0), payload);
Ok(())
}
#[tokio::test]
async fn fetch_blobs_round_trips_nested_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("blob", true);
let DataType::Struct(blob_children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let blob_array = StructArray::new(
blob_children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([
b"hello".as_slice(),
b"world".as_slice(),
])) as ArrayRef,
Arc::new(StringArray::from(vec![None::<&str>, None::<&str>])) as ArrayRef,
],
None,
);
let info_fields: Fields = vec![Field::new("name", DataType::Utf8, false), blob_field].into();
let info_array = StructArray::new(
info_fields.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
Arc::new(blob_array) as ArrayRef,
],
None,
);
let schema = Arc::new(Schema::new(vec![Field::new(
"info",
DataType::Struct(info_fields),
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(info_array) as ArrayRef]).unwrap();
let table = db.create_table("t", batch).execute().await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("info.blob", &ids).await?;
assert_eq!(bytes.len(), 2);
let values: std::collections::HashSet<&[u8]> =
(0..bytes.len()).map(|i| bytes.value(i)).collect();
assert!(values.contains(b"hello".as_slice()));
assert!(values.contains(b"world".as_slice()));
Ok(())
}
#[tokio::test]
async fn blob_columns_lists_nested_dotted_paths() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("blob", true);
let info = Field::new(
"info",
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
true,
);
let schema = Arc::new(Schema::new(vec![
blob("thumbnail", true),
Field::new("id", DataType::Int64, false),
info,
]));
let table = db.create_empty_table("t", schema).execute().await?;
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "info.blob"]);
Ok(())
}
#[tokio::test]
async fn blob_columns_lists_blob_fields_in_order() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![
blob("thumbnail", true),
Field::new("id", DataType::Int64, false),
blob("image", true),
]));
let table = db.create_empty_table("t", schema).execute().await?;
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "image"]);
let plain = db
.create_empty_table(
"plain",
Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])),
)
.execute()
.await?;
assert!(plain.blob_columns().await?.is_empty());
Ok(())
}
#[tokio::test]
async fn fetch_blobs_preserves_null_alignment() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3, 4],
&[Some(b"a".as_slice()), None, Some(b"c"), None],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), ids.len());
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"a"),
2 | 4 => assert!(bytes.is_null(i)),
3 => assert_eq!(bytes.value(i), b"c"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_all_null_column_returns_all_nulls() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1, 2], &[None, None]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 2);
assert_eq!(bytes.null_count(), 2);
let files = table.fetch_blob_files("image", &ids).await?;
assert_eq!(files.len(), 2);
assert!(files.iter().all(Option::is_none));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_aligns_with_reordered_and_duplicate_ids() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let by_id = |want: i64| pairs.iter().find(|(id, _)| *id == want).unwrap().1;
let request = vec![by_id(3), by_id(1), by_id(3), by_id(2)];
let bytes = table.fetch_blobs("image", &request).await?;
assert_eq!(bytes.len(), 4);
assert_eq!(bytes.value(0), b"three");
assert_eq!(bytes.value(1), b"one");
assert_eq!(bytes.value(2), b"three");
assert_eq!(bytes.value(3), b"two");
Ok(())
}
#[tokio::test]
async fn fetch_blobs_empty_ids_returns_empty() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
assert_eq!(table.fetch_blobs("image", &[]).await?.len(), 0);
assert!(table.fetch_blob_files("image", &[]).await?.is_empty());
Ok(())
}
#[tokio::test]
async fn fetch_blobs_out_of_range_id_errors_without_panic() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("image", &[u64::MAX]).await.unwrap_err();
assert!(err.to_string().contains("row ids"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_non_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("id", &[0]).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
assert!(err.to_string().contains("'id' is not a blob column"));
let err = table.fetch_blob_files("id", &[0]).await.unwrap_err();
assert!(err.to_string().contains("'id' is not a blob column"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_unknown_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("missing", &[0]).await.unwrap_err();
assert!(err.to_string().contains("no column named 'missing'"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_legacy_v1_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([("lance-encoding:blob".to_string(), "true".to_string())]),
);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
legacy,
]));
let table = db.create_empty_table("t", schema).execute().await?;
let err = table.fetch_blobs("image", &[0]).await.unwrap_err();
assert!(err.to_string().contains("legacy blob column"));
Ok(())
}
#[tokio::test]
async fn fetch_blob_files_reads_lazily_and_aligns_nulls() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table =
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"lazy-bytes".as_slice()), None])
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let files = table.fetch_blob_files("image", &ids).await?;
assert_eq!(files.len(), 2);
for ((id, _), file) in pairs.iter().zip(&files) {
match id {
1 => {
let handle = file.as_ref().unwrap();
assert_eq!(handle.read().await.unwrap().as_ref(), b"lazy-bytes");
}
2 => assert!(file.is_none()),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_reads_multiple_blob_columns_independently() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
blob("thumbnail", true),
]));
let table = db.create_empty_table("t", schema).execute().await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
Field::new("thumbnail", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2])),
Arc::new(LargeBinaryArray::from_iter(vec![
Some(b"image-1".as_slice()),
None,
])),
Arc::new(LargeBinaryArray::from_iter(vec![
None,
Some(b"thumb-2".as_slice()),
])),
],
)
.unwrap();
table.add(batch).execute().await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let images = table.fetch_blobs("image", &ids).await?;
let thumbs = table.fetch_blobs("thumbnail", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => {
assert_eq!(images.value(i), b"image-1");
assert!(thumbs.is_null(i));
}
2 => {
assert!(images.is_null(i));
assert_eq!(thumbs.value(i), b"thumb-2");
}
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_spans_fragments() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
table
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
.execute()
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"frag-one"),
2 => assert_eq!(bytes.value(i), b"frag-two"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_packed_payload_round_trip() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let big = vec![0xAB_u8; 100 * 1024];
let small = b"small".to_vec();
let table = create_inline_blob_table(
&db,
"t",
&[1, 2],
&[Some(big.as_slice()), Some(small.as_slice())],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), big.as_slice()),
2 => assert_eq!(bytes.value(i), small.as_slice()),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_after_delete() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
)
.await?;
table.delete("id = 2").await?;
let pairs = collect_id_rowid(&table).await?;
assert_eq!(pairs.len(), 2);
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"one"),
3 => assert_eq!(bytes.value(i), b"three"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_with_precompaction_row_ids_survives_compaction() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
table
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
.execute()
.await?;
let pairs_before = collect_id_rowid(&table).await?;
let ids_before: Vec<u64> = pairs_before.iter().map(|(_, rowid)| *rowid).collect();
table
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await?;
let bytes_after = table.fetch_blobs("image", &ids_before).await?;
assert_eq!(bytes_after.len(), 2);
for (i, (id, _)) in pairs_before.iter().enumerate() {
match id {
1 => assert_eq!(bytes_after.value(i), b"frag-one"),
2 => assert_eq!(bytes_after.value(i), b"frag-two"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn zero_length_blob_reads_back_as_null() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"".as_slice())]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 1);
assert!(bytes.is_null(0));
Ok(())
}
const DEDICATED_BLOB_LEN: usize = 64 * 1024;
const SCRAMBLED_LOGICAL_IDS: [i64; 7] = [6, 3, 1, 4, 6, 2, 5];
fn dedicated_blob_bytes(tag: u8) -> Vec<u8> {
vec![tag; DEDICATED_BLOB_LEN]
}
async fn multi_fragment_dedicated_blob_table(db: &Connection) -> Result<Table> {
let rows: [(i64, Option<u8>); 6] = [
(1, Some(1)),
(2, Some(2)),
(3, None),
(4, Some(4)),
(5, None),
(6, Some(6)),
];
let mut table: Option<Table> = None;
for (logical_id, blob_tag) in rows {
let bytes = blob_tag.map(dedicated_blob_bytes);
let image = [bytes.as_deref()];
table = Some(match table {
None => create_inline_blob_table(db, "t", &[logical_id], &image).await?,
Some(t) => {
t.add(binary_input_batch(&[logical_id], &image))
.execute()
.await?;
t
}
});
}
Ok(table.unwrap())
}
async fn row_ids_for_logical(table: &Table, logical_ids: &[i64]) -> Result<Vec<u64>> {
let id_rowid = collect_id_rowid(table).await?;
Ok(logical_ids
.iter()
.map(|logical_id| {
id_rowid
.iter()
.find(|(id, _)| id == logical_id)
.map(|(_, row_id)| *row_id)
.unwrap()
})
.collect())
}
#[tokio::test]
async fn fetch_blobs_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = multi_fragment_dedicated_blob_table(&db).await?;
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
let bytes = table.fetch_blobs("image", &row_ids).await?;
assert_eq!(bytes.len(), SCRAMBLED_LOGICAL_IDS.len());
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
match logical_id {
3 | 5 => assert!(bytes.is_null(slot)),
id => assert_eq!(
bytes.value(slot),
dedicated_blob_bytes(*id as u8).as_slice()
),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blob_files_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = multi_fragment_dedicated_blob_table(&db).await?;
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
let files = table.fetch_blob_files("image", &row_ids).await?;
assert_eq!(files.len(), SCRAMBLED_LOGICAL_IDS.len());
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
match logical_id {
3 | 5 => assert!(files[slot].is_none()),
id => {
let payload = files[slot].as_ref().unwrap().read().await?;
assert_eq!(payload.as_ref(), dedicated_blob_bytes(*id as u8).as_slice());
}
}
}
Ok(())
}