Compare commits

...

4 Commits

Author SHA1 Message Date
Lance Release
26481a4b74 Bump version: 0.34.0-beta.1 → 0.34.0-beta.2 2026-06-23 16:21:52 +00:00
dependabot[bot]
08596f1644 chore(deps): bump the rust-minor-patch group with 2 updates (#3565)
Bumps the rust-minor-patch group with 2 updates:
[bytes](https://github.com/tokio-rs/bytes) and
[napi](https://github.com/napi-rs/napi-rs).

Updates `bytes` from 1.11.1 to 1.12.0
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/tokio-rs/bytes/releases">bytes's
releases</a>.</em></p>
<blockquote>
<h2>Bytes v1.12.0</h2>
<h1>1.12.0 (June 18th, 2026)</h1>
<h3>Added</h3>
<ul>
<li>Add <code>BytesMut::extend_from_within()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/818">#818</a>)</li>
<li>Add <code>BytesMut::try_unsplit()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/746">#746</a>)</li>
</ul>
<h3>Fixed</h3>
<ul>
<li>Fix panic in <code>get_int</code> if <code>nbytes</code> is zero (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/806">#806</a>)</li>
</ul>
<h3>Changed</h3>
<ul>
<li>Pass vtable data by value (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/826">#826</a>)</li>
<li>Exclude development scripts from published package (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/810">#810</a>)</li>
</ul>
<h3>Documented</h3>
<ul>
<li>Document that <code>BytesMut::{reserve,try_reserve}</code> doesn't
preserve unused capacity (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/808">#808</a>)</li>
</ul>
</blockquote>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/tokio-rs/bytes/blob/master/CHANGELOG.md">bytes's
changelog</a>.</em></p>
<blockquote>
<h1>1.12.0 (June 18th, 2026)</h1>
<h3>Added</h3>
<ul>
<li>Add <code>BytesMut::extend_from_within()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/818">#818</a>)</li>
<li>Add <code>BytesMut::try_unsplit()</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/746">#746</a>)</li>
</ul>
<h3>Fixed</h3>
<ul>
<li>Fix panic in <code>get_int</code> if <code>nbytes</code> is zero (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/806">#806</a>)</li>
</ul>
<h3>Changed</h3>
<ul>
<li>Pass vtable data by value (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/826">#826</a>)</li>
<li>Exclude development scripts from published package (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/810">#810</a>)</li>
</ul>
<h3>Documented</h3>
<ul>
<li>Document that <code>BytesMut::{reserve,try_reserve}</code> doesn't
preserve unused capacity (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/808">#808</a>)</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="91402cee60"><code>91402ce</code></a>
Release bytes v1.12.0 (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/831">#831</a>)</li>
<li><a
href="2256e6dc3e"><code>2256e6d</code></a>
chore: add safety comments on unsafe blocks (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/827">#827</a>)</li>
<li><a
href="245adff079"><code>245adff</code></a>
Pass vtable data by value (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/826">#826</a>)</li>
<li><a
href="00cc5ff2bd"><code>00cc5ff</code></a>
Implement <code>BytesMut::extend_from_within</code> (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/818">#818</a>)</li>
<li><a
href="5b79d316c9"><code>5b79d31</code></a>
Merge tag 'v1.11.1'</li>
<li><a
href="804ee6d039"><code>804ee6d</code></a>
Make try_unsplit method public (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/746">#746</a>)</li>
<li><a
href="fd426ca084"><code>fd426ca</code></a>
Exclude development scripts from published package (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/810">#810</a>)</li>
<li><a
href="b4ed70daee"><code>b4ed70d</code></a>
Add test for copy_to_bytes() -&gt; BytesMut avoiding clone (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/809">#809</a>)</li>
<li><a
href="94e42915a9"><code>94e4291</code></a>
Document that <code>BytesMut::{reserve,try_reserve}</code> doesn't
preserve unused capac...</li>
<li><a
href="acd1e0ffb8"><code>acd1e0f</code></a>
Fix <code>get_int</code> if <code>nbytes</code> is zero (<a
href="https://redirect.github.com/tokio-rs/bytes/issues/806">#806</a>)</li>
<li>See full diff in <a
href="https://github.com/tokio-rs/bytes/compare/v1.11.1...v1.12.0">compare
view</a></li>
</ul>
</details>
<br />

Updates `napi` from 3.9.1 to 3.9.3
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/napi-rs/napi-rs/releases">napi's
releases</a>.</em></p>
<blockquote>
<h2>napi-v3.9.3</h2>
<h3>Fixed</h3>
<ul>
<li><em>(napi)</em> sync referred flag when creating a weak
ThreadsafeFunction (<a
href="https://redirect.github.com/napi-rs/napi-rs/pull/3337">#3337</a>)</li>
</ul>
<h3>Other</h3>
<ul>
<li><em>(napi)</em> outline non-generic core of
ThreadsafeFunction::create (<a
href="https://redirect.github.com/napi-rs/napi-rs/pull/3334">#3334</a>)</li>
</ul>
<h2>napi-v3.9.2</h2>
<h3>Fixed</h3>
<ul>
<li><em>(napi)</em> ReadableStream Reader loses chunks and aborts on
errored streams (<a
href="https://redirect.github.com/napi-rs/napi-rs/pull/3328">#3328</a>)</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="ee58383da4"><code>ee58383</code></a>
chore(napi): release v3.9.3 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3335">#3335</a>)</li>
<li><a
href="c78727667b"><code>c787276</code></a>
fix(napi): sync referred flag when creating a weak ThreadsafeFunction
(<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3337">#3337</a>)</li>
<li><a
href="d4276ca315"><code>d4276ca</code></a>
chore(deps): update dependency oxc-parser to ^0.137.0 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3336">#3336</a>)</li>
<li><a
href="a0b1831ce5"><code>a0b1831</code></a>
perf(napi): outline non-generic core of ThreadsafeFunction::create (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3334">#3334</a>)</li>
<li><a
href="3759d7b485"><code>3759d7b</code></a>
chore(deps): update rust-lang/crates-io-auth-action action to v1.0.5 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3333">#3333</a>)</li>
<li><a
href="dd41eeb921"><code>dd41eeb</code></a>
build(deps): bump protobufjs from 7.6.2 to 7.6.4 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3332">#3332</a>)</li>
<li><a
href="cdd48b3873"><code>cdd48b3</code></a>
chore(deps): update dependency oxc-parser to ^0.136.0 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3314">#3314</a>)</li>
<li><a
href="e98762de2c"><code>e98762d</code></a>
chore(deps): update yarn monorepo to v4.17.0 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3330">#3330</a>)</li>
<li><a
href="529a78d15c"><code>529a78d</code></a>
chore(napi): release v3.9.2 (<a
href="https://redirect.github.com/napi-rs/napi-rs/issues/3329">#3329</a>)</li>
<li><a
href="88f4b97030"><code>88f4b97</code></a>
fix(napi): ReadableStream Reader loses chunks and aborts on errored
streams (...</li>
<li>Additional commits viewable in <a
href="https://github.com/napi-rs/napi-rs/compare/napi-v3.9.1...napi-v3.9.3">compare
view</a></li>
</ul>
</details>
<br />


Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore <dependency name> major version` will close this
group update PR and stop Dependabot creating any more for the specific
dependency's major version (unless you unignore this specific
dependency's major version or upgrade to it yourself)
- `@dependabot ignore <dependency name> minor version` will close this
group update PR and stop Dependabot creating any more for the specific
dependency's minor version (unless you unignore this specific
dependency's minor version or upgrade to it yourself)
- `@dependabot ignore <dependency name>` will close this group update PR
and stop Dependabot creating any more for the specific dependency
(unless you unignore this specific dependency or upgrade to it yourself)
- `@dependabot unignore <dependency name>` will remove all of the ignore
conditions of the specified dependency
- `@dependabot unignore <dependency name> <ignore condition>` will
remove the ignore condition of the specified dependency and ignore
conditions


</details>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-06-23 09:21:05 -07:00
LanceDB Robot
f16da19b78 chore: update lance dependency to v9.0.0-beta.2 (#3569)
Updates LanceDB's Lance dependencies to v9.0.0-beta.2 across the Rust
workspace and Java lance-core dependency.\n\nNo compatibility fixes were
required; clippy and formatting pass after installing the missing
toolchain components on the runner. Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v9.0.0-beta.2
2026-06-23 09:20:13 -07:00
Drew Gallardo
41ac32a344 feat(rust): add blob read and materialization APIs (#3562)
This PR is for the Read path against blob v2. #3528 handles declare +
write, and this this adds materialization on local tables.

- blob_columns()
- fetch_blobs(column, row_ids) → bytes
- fetch_blob_files(column, row_ids) → lazy handles
- Pass _rowid from query().with_row_id(). Remote returns NotSupported.
(for now)

### Use cases

search, grab row ids, materialize images:

```rust
let row_ids = /* _rowid from hits */;
let images = table.fetch_blobs("image", &row_ids).await?;
```

Large blobs: open handles, read only what you need:

```rust
let handles = table.fetch_blob_files("image", &row_ids).await?;
let bytes = handles[0].as_ref().unwrap().read().await?;
```

Filter then batch fetch: collect ids from a filter, one call.
Multiple blob columns: image and thumbnail independently.
Row ids from before compact: still resolve.

### Alignment note
Lance `read_blobs` drops null rows. We descriptor-take first, read
non-null ids, re-expand to match input order. Null and zero-length blobs
come back null/None. Bytes path sets `preserve_order(true)`. So I added:

```
TODO(lance): expose selection_index or an aligned execute so we can drop the pre-read.
```

### Tests
`cargo test -p lancedb --test blob_integration`
- 30 tests covering nulls, reorder, dups, cross-fragment bytes + files,
compact, delete, legacy v1 errors.

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 06:58:26 -07:00
9 changed files with 1088 additions and 97 deletions

89
Cargo.lock generated
View File

@@ -1472,9 +1472,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.11.1"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593"
[[package]]
name = "bytes-utils"
@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arc-swap",
"arrow",
@@ -4810,8 +4810,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4832,7 +4832,7 @@ dependencies = [
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4846,7 +4846,7 @@ dependencies = [
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4855,8 +4855,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrayref",
"paste",
@@ -4865,8 +4865,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4904,8 +4904,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-array",
@@ -4935,8 +4935,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-array",
@@ -4953,8 +4953,8 @@ dependencies = [
[[package]]
name = "lance-derive"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"proc-macro2",
"quote",
@@ -4963,8 +4963,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4999,8 +4999,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -5030,8 +5030,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arc-swap",
"arrow",
@@ -5096,8 +5096,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-arith",
@@ -5138,8 +5138,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5150,12 +5150,13 @@ dependencies = [
"lance-core",
"num-traits",
"rand 0.9.4",
"rayon",
]
[[package]]
name = "lance-namespace"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"async-trait",
@@ -5167,8 +5168,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5222,8 +5223,8 @@ dependencies = [
[[package]]
name = "lance-select"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5238,8 +5239,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow",
"arrow-array",
@@ -5278,8 +5279,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5292,8 +5293,8 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "8.0.0-rc.1"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-rc.1#eea4095b188bf2ba2fa95d934a2f5d6c2c9e661c"
version = "9.0.0-beta.2"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
dependencies = [
"icu_segmenter",
"jieba-rs",
@@ -5957,9 +5958,9 @@ dependencies = [
[[package]]
name = "napi"
version = "3.9.1"
version = "3.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad513ff22558f1830b595ea6eb4091da48145d09a222ce157e781896f78be0b9"
checksum = "fbd9f9295f3ff5921e78a71222c3361a8216f7760b1a99a6ad4e8441de18bbb9"
dependencies = [
"bitflags 2.11.1",
"chrono",

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=8.0.0-rc.1", default-features = false, "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-rc.1", default-features = false, "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-rc.1", default-features = false, "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-rc.1", "tag" = "v8.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>8.0.0-rc.1</lance-core.version>
<lance-core.version>9.0.0-beta.2</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.34.0-beta.1"
current_version = "0.34.0-beta.2"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.34.0-beta.1"
version = "0.34.0-beta.2"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -9,16 +9,28 @@
//!
//! Blob tables require Lance file format >= 2.2 and stable row ids at create.
use arrow_schema::{Field, Schema};
use lance::dataset::WriteParams;
use std::sync::Arc;
use arrow_array::builder::LargeBinaryBuilder;
use arrow_array::{Array, LargeBinaryArray, RecordBatch, StructArray, UInt8Array, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use lance::dataset::{Dataset, WriteParams};
use lance_arrow::FieldExt;
use lance_core::datatypes::parse_field_path;
use lance_encoding::version::LanceFileVersion;
use crate::error::{Error, Result};
pub use lance::dataset::BlobFile;
/// Creates an Arrow field for a Lance blob v2 column.
///
/// `Struct<data, uri>` with the `lance.blob.v2` marker. Same layout Lance
/// expects on write.
///
/// A blob column may be top-level or nested inside a struct or list. Nested
/// blobs are addressed by a dotted path (e.g. `info.blob`) in the read APIs.
///
/// ```
/// use arrow_schema::{DataType, Field, Schema};
///
@@ -27,15 +39,71 @@ use lance_encoding::version::LanceFileVersion;
/// lancedb::blob("image", true),
/// ]);
/// ```
///
/// Blob tables use Lance file format >= 2.2 and stable row ids at create.
pub fn blob(name: impl AsRef<str>, nullable: bool) -> Field {
lance::blob::blob_field(name.as_ref(), nullable)
}
/// Returns true if `schema` declares any blob v2 column.
/// Returns true if `field` is a blob v2 column.
///
/// ```
/// let field = lancedb::blob("image", true);
/// assert!(lancedb::blob::is_blob(&field));
/// ```
pub fn is_blob(field: &Field) -> bool {
field.is_blob_v2()
}
/// Returns true if `field`, or any field nested under it, is a blob v2 column.
fn field_tree_has_blob_v2(field: &Field) -> bool {
if field.is_blob_v2() {
return true;
}
match field.data_type() {
DataType::Struct(children) => children.iter().any(|c| field_tree_has_blob_v2(c)),
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
field_tree_has_blob_v2(child)
}
_ => false,
}
}
/// Collects the dotted paths of blob v2 columns under `field`, into `paths`.
fn collect_blob_paths(field: &Field, prefix: &str, paths: &mut Vec<String>) {
let path = if prefix.is_empty() {
field.name().clone()
} else {
format!("{prefix}.{}", field.name())
};
if field.is_blob_v2() {
paths.push(path);
return;
}
match field.data_type() {
DataType::Struct(children) => {
for child in children {
collect_blob_paths(child, &path, paths);
}
}
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
collect_blob_paths(child, &path, paths)
}
_ => {}
}
}
/// Returns true if `schema` declares any blob v2 column, including nested ones.
pub(crate) fn has_blob_columns(schema: &Schema) -> bool {
schema.fields().iter().any(|field| field.is_blob_v2())
schema.fields().iter().any(|f| field_tree_has_blob_v2(f))
}
/// Blob v2 column paths in `schema`, declaration order preserved. Nested blobs
/// are dotted paths (e.g. `info.blob`).
pub(crate) fn blob_column_names(schema: &Schema) -> Vec<String> {
let mut paths = Vec::new();
for field in schema.fields() {
collect_blob_paths(field, "", &mut paths);
}
paths
}
/// Bumps storage format to at least [`LanceFileVersion::V2_2`] for blob schemas.
@@ -53,6 +121,206 @@ pub(crate) fn ensure_blob_storage_version(schema: &Schema, params: &mut WritePar
}
}
/// Validate that `column` exists and is a blob v2 column.
///
/// Legacy v1 columns (`lance-encoding:blob`) error with a migration hint.
pub(crate) fn ensure_blob_v2_column(
schema: &lance_core::datatypes::Schema,
column: &str,
) -> Result<()> {
match schema.field(column) {
Some(field) if field.is_blob_v2() => Ok(()),
Some(field) if field.is_blob() => Err(Error::InvalidInput {
message: format!(
"column '{column}' is a legacy blob column; blob APIs require blob v2 columns \
(ARROW:extension:name = \"lance.blob.v2\")"
),
}),
Some(_) => Err(Error::InvalidInput {
message: format!("column '{column}' is not a blob column"),
}),
None => Err(Error::InvalidInput {
message: format!("no column named '{column}' in this table"),
}),
}
}
/// Returns the leaf descriptor `StructArray` for `column` in a descriptor batch.
fn leaf_descriptor_struct<'a>(batch: &'a RecordBatch, column: &str) -> Result<&'a StructArray> {
let path = parse_field_path(column).map_err(|e| Error::InvalidInput {
message: format!("invalid blob column path '{column}': {e}"),
})?;
let not_struct = || Error::Runtime {
message: format!("blob column '{column}' did not read back as a descriptor struct"),
};
let mut current = batch
.column_by_name(&path[0])
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.ok_or_else(not_struct)?;
for segment in &path[1..] {
current = current
.column_by_name(segment)
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.ok_or_else(not_struct)?;
}
Ok(current)
}
/// Null rows in `row_ids`, from a descriptor take.
///
/// Lance `read_blobs` / `take_blobs` skip null rows (`kind == 0 && position == 0 && size == 0`).
/// TODO(lance): aligned read API would drop this pass.
async fn blob_null_mask(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<Vec<bool>> {
let projection = dataset.schema().project(&[column])?;
let descriptors = dataset.take_builder(row_ids, projection)?.execute().await?;
if descriptors.num_rows() != row_ids.len() {
return Err(Error::InvalidInput {
message: format!(
"blob take for column '{column}' requested {} row ids but only {} exist in the \
table; pass row ids collected from this table",
row_ids.len(),
descriptors.num_rows()
),
});
}
let descriptor_struct = leaf_descriptor_struct(&descriptors, column)?;
let child = |name: &str| {
descriptor_struct
.column_by_name(name)
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor for '{column}' is missing the '{name}' field"),
})
};
let kinds = child("kind")?
.as_any()
.downcast_ref::<UInt8Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'kind' for '{column}' is not a UInt8 array"),
})?;
let positions = child("position")?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'position' for '{column}' is not a UInt64 array"),
})?;
let sizes = child("size")?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'size' for '{column}' is not a UInt64 array"),
})?;
// Match Lance `collect_blob_entries_v2` skip condition (`BlobKind::Inline` == 0).
Ok((0..descriptor_struct.len())
.map(|i| {
descriptor_struct.is_null(i)
|| kinds.is_null(i)
|| (kinds.value(i) == 0 && positions.value(i) == 0 && sizes.value(i) == 0)
})
.collect())
}
fn non_null_row_ids(row_ids: &[u64], null_mask: &[bool]) -> Vec<u64> {
row_ids
.iter()
.zip(null_mask)
.filter_map(|(row_id, is_null)| (!is_null).then_some(*row_id))
.collect()
}
/// Materialize blob bytes for `row_ids` (same length and order, nulls preserved).
pub(crate) async fn take_blobs_aligned(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<LargeBinaryArray> {
ensure_blob_v2_column(dataset.schema(), column)?;
if row_ids.is_empty() {
return Ok(LargeBinaryBuilder::new().finish());
}
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
let non_null_count = non_null_row_ids.len();
let payloads = if non_null_count == 0 {
Vec::new()
} else {
dataset
.read_blobs(column)?
.with_row_ids(non_null_row_ids)
.preserve_order(true)
.execute()
.await?
};
if payloads.len() != non_null_count {
return Err(Error::Runtime {
message: format!(
"blob read for column '{column}' returned {} payloads for {} non-null rows",
payloads.len(),
non_null_count
),
});
}
let mut builder = LargeBinaryBuilder::new();
let mut payload_idx = 0;
for is_null in &null_mask {
if *is_null {
builder.append_null();
} else {
builder.append_value(payloads[payload_idx].data.as_ref());
payload_idx += 1;
}
}
Ok(builder.finish())
}
/// Open lazy [`BlobFile`] handles for `row_ids` (same length and order, nulls as `None`).
pub(crate) async fn take_blob_files_aligned(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
ensure_blob_v2_column(dataset.schema(), column)?;
if row_ids.is_empty() {
return Ok(Vec::new());
}
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
let handles = if non_null_row_ids.is_empty() {
Vec::new()
} else {
dataset.take_blobs(&non_null_row_ids, column).await?
};
if handles.len() != non_null_row_ids.len() {
return Err(Error::Runtime {
message: format!(
"blob take for column '{column}' returned {} handles for {} non-null rows",
handles.len(),
non_null_row_ids.len()
),
});
}
let mut handles = handles.into_iter();
Ok(null_mask
.iter()
.map(|is_null| {
if *is_null {
None
} else {
Some(handles.next().unwrap())
}
})
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -116,6 +384,47 @@ mod tests {
assert_eq!(params.data_storage_version.unwrap(), LanceFileVersion::V2_3);
}
#[test]
fn legacy_v1_blob_column_is_rejected_with_migration_hint() {
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([(
"lance-encoding:blob".to_string(),
"true".to_string(),
)]),
);
let arrow_schema = Schema::new(vec![legacy]);
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
let err = ensure_blob_v2_column(&lance_schema, "image").unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
assert!(err.to_string().contains("legacy blob column"));
assert!(err.to_string().contains("lance.blob.v2"));
}
#[test]
fn non_blob_and_unknown_columns_are_rejected_by_name() {
let arrow_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
let err = ensure_blob_v2_column(&lance_schema, "id").unwrap_err();
assert!(err.to_string().contains("'id' is not a blob column"));
let err = ensure_blob_v2_column(&lance_schema, "missing").unwrap_err();
assert!(err.to_string().contains("no column named 'missing'"));
}
#[test]
fn blob_column_names_includes_nested_path() {
let blob_field = blob("blob", true);
let info = Field::new(
"info",
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
true,
);
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), info]);
assert_eq!(blob_column_names(&schema), vec!["info.blob"]);
}
#[test]
fn storage_version_noop_without_blob_columns() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);

View File

@@ -189,7 +189,7 @@ use std::{fmt::Display, str::FromStr};
use serde::{Deserialize, Serialize};
pub use blob::blob;
pub use blob::{blob, is_blob};
pub use connection::{ConnectNamespaceBuilder, Connection};
pub use error::{Error, Result};
use lance_index::vector::ApproxMode as LanceApproxMode;

View File

@@ -3,7 +3,7 @@
//! LanceDB Table APIs
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_array::{LargeBinaryArray, RecordBatch, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_execution::TaskContext;
@@ -12,6 +12,7 @@ use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use lance::dataset::BlobFile;
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
@@ -587,6 +588,28 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn close_lsm_writers(&self) -> Result<()> {
Ok(())
}
/// Names of the blob v2 columns in this table, in declaration order.
async fn blob_columns(&self) -> Result<Vec<String>> {
Err(Error::NotSupported {
message: "blob_columns is not supported on this table type".into(),
})
}
/// Materialize blob bytes for the given row ids. See [`Table::fetch_blobs`].
async fn fetch_blobs(&self, _column: &str, _row_ids: &[u64]) -> Result<LargeBinaryArray> {
Err(Error::NotSupported {
message: "fetch_blobs is not supported on this table type".into(),
})
}
/// Open lazy blob handles for the given row ids. See [`Table::fetch_blob_files`].
async fn fetch_blob_files(
&self,
_column: &str,
_row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
Err(Error::NotSupported {
message: "fetch_blob_files is not supported on this table type".into(),
})
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -927,6 +950,76 @@ impl Table {
self.inner.count_rows(filter.map(Filter::Sql)).await
}
/// Names of the blob v2 columns in this table, in declaration order.
///
/// Nested blobs use dotted paths (e.g. `info.blob`). Returns
/// [`Error::NotSupported`] on table types without blob support.
pub async fn blob_columns(&self) -> Result<Vec<String>> {
self.inner.blob_columns().await
}
/// Materialize blob bytes for the given row ids.
///
/// Output matches `row_ids` in length and order. Null and zero-length rows
/// are null. Prefer [`Self::fetch_blob_files`] for large selections.
///
/// ```
/// use arrow_array::UInt64Array;
/// use futures::TryStreamExt;
/// use lancedb::query::{ExecutableQuery, QueryBase};
///
/// # use lancedb::Table;
/// # async fn materialize(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// let mut stream = table.query().with_row_id().limit(10).execute().await?;
/// while let Some(batch) = stream.try_next().await? {
/// let row_ids = batch
/// .column_by_name("_rowid")
/// .unwrap()
/// .as_any()
/// .downcast_ref::<UInt64Array>()
/// .unwrap();
/// let images = table.fetch_blobs("image", row_ids.values()).await?;
/// let _ = images;
/// }
/// # Ok(())
/// # }
/// ```
///
/// Returns [`Error::InvalidInput`] when the column does not exist or is
/// not a blob v2 column, and [`Error::NotSupported`] on table types
/// without blob support.
pub async fn fetch_blobs(
&self,
column: impl AsRef<str>,
row_ids: &[u64],
) -> Result<LargeBinaryArray> {
self.inner.fetch_blobs(column.as_ref(), row_ids).await
}
/// Open lazy [`BlobFile`] handles for the given row ids.
///
/// Same length and order as `row_ids`. Null rows are `None`. Bytes are not
/// read from disk until a call to [`BlobFile::read`].
///
/// ```
/// # use lancedb::Table;
/// # async fn lazy_read(table: &Table, row_ids: &[u64]) -> Result<(), Box<dyn std::error::Error>> {
/// let handles = table.fetch_blob_files("image", row_ids).await?;
/// if let Some(Some(first)) = handles.first() {
/// let bytes = first.read().await?;
/// println!("first blob is {} bytes", bytes.len());
/// }
/// # Ok(())
/// # }
/// ```
pub async fn fetch_blob_files(
&self,
column: impl AsRef<str>,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
self.inner.fetch_blob_files(column.as_ref(), row_ids).await
}
/// Insert new records into this Table
///
/// # Arguments
@@ -2761,6 +2854,25 @@ impl BaseTable for NativeTable {
merge::lsm::close_lsm_writers(self).await
}
async fn blob_columns(&self) -> Result<Vec<String>> {
let schema = self.schema().await?;
Ok(crate::blob::blob_column_names(schema.as_ref()))
}
async fn fetch_blobs(&self, column: &str, row_ids: &[u64]) -> Result<LargeBinaryArray> {
let dataset = self.dataset.get().await?;
crate::blob::take_blobs_aligned(&dataset, column, row_ids).await
}
async fn fetch_blob_files(
&self,
column: &str,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
let dataset = self.dataset.get().await?;
crate::blob::take_blob_files_aligned(&dataset, column, row_ids).await
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
let result = delete::execute_delete(self, predicate).await?;

View File

@@ -1,17 +1,22 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Integration tests for blob v2 columns.
use std::sync::Arc;
use arrow_array::{Array, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StructArray};
use arrow_schema::{DataType, Field, Schema};
use arrow_array::{
Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Fields, Schema};
use futures::TryStreamExt;
use lance_encoding::version::LanceFileVersion;
use lancedb::{
Connection, Result, Table, blob::blob, connect,
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, query::ExecutableQuery,
Connection, Error, Result, Table,
blob::blob,
connect, connect_namespace,
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS,
query::{ExecutableQuery, QueryBase},
table::{AddDataMode, CompactionOptions, OptimizeAction},
};
use tempfile::tempdir;
@@ -91,7 +96,7 @@ async fn query_image_struct(table: &Table) -> StructArray {
.expect("image column present")
.as_any()
.downcast_ref::<StructArray>()
.expect("blob column reads back as a descriptor struct")
.expect("image column is a descriptor struct")
.clone()
}
@@ -119,10 +124,7 @@ async fn explicit_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
.execute()
.await?;
assert!(
storage_format_version(&table).await >= LanceFileVersion::V2_2,
"format bump still applies; the schema cannot be written below 2.2"
);
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
@@ -144,7 +146,6 @@ async fn creating_with_blob_data_bumps_format() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
// Batch already declares the blob field (pre-built struct).
let blob_field = blob("image", true);
let DataType::Struct(children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
@@ -153,7 +154,7 @@ async fn creating_with_blob_data_bumps_format() -> Result<()> {
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"payload".as_slice()])),
Arc::new(arrow_array::StringArray::from(vec![None::<&str>])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
@@ -184,7 +185,6 @@ async fn add_coerces_large_binary_into_blob_column() -> Result<()> {
assert_eq!(table.count_rows(None).await?, 2);
let image = query_image_struct(&table).await;
assert_eq!(image.len(), 2);
// Table schema still has the blob marker after append.
let schema = table.schema().await?;
let field = schema.field_with_name("image").unwrap();
assert_eq!(
@@ -257,12 +257,12 @@ async fn add_rejects_uncoercible_blob_input() -> Result<()> {
])),
vec![
Arc::new(Int64Array::from(vec![1])),
Arc::new(arrow_array::StringArray::from(vec!["not bytes"])),
Arc::new(StringArray::from(vec!["not bytes"])),
],
)
.unwrap();
let err = table.add(batch).execute().await.unwrap_err();
assert!(err.to_string().contains("image"), "got: {err}");
assert!(err.to_string().contains("image"));
Ok(())
}
@@ -288,9 +288,7 @@ async fn namespace_create_applies_blob_defaults() -> Result<()> {
let tmp = tempdir().unwrap();
let mut properties = std::collections::HashMap::new();
properties.insert("root".to_string(), tmp.path().to_str().unwrap().to_string());
let db = lancedb::connect_namespace("dir", properties)
.execute()
.await?;
let db = connect_namespace("dir", properties).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
@@ -301,17 +299,14 @@ async fn namespace_create_applies_blob_defaults() -> Result<()> {
Ok(())
}
// Overwrite takes the input schema as-is (same as cast skip). Raw binary
// overwrite drops the blob marker unless the input declares blob v2.
// Overwrite takes the input schema as-is. A raw-binary overwrite drops the blob
// marker; re-declaring blob v2 in the input restores it.
#[tokio::test]
async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
use lancedb::table::AddDataMode;
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"blob".as_slice())]).await?;
// Raw binary overwrite. Plain LargeBinary replaces the blob declaration.
let raw_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
@@ -336,11 +331,9 @@ async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
.field_with_name("image")
.unwrap()
.metadata()
.contains_key("ARROW:extension:name"),
"raw binary overwrite leaves a plain binary column"
.contains_key("ARROW:extension:name")
);
// Overwrite with a declared blob struct keeps the blob column.
let blob_field = blob("image", true);
let DataType::Struct(children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
@@ -349,7 +342,7 @@ async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"declared".as_slice()])),
Arc::new(arrow_array::StringArray::from(vec![None::<&str>])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
@@ -378,3 +371,579 @@ async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
);
Ok(())
}
async fn collect_row_ids(table: &Table) -> Result<Vec<u64>> {
let batches = table
.query()
.with_row_id()
.execute()
.await?
.try_collect::<Vec<_>>()
.await?;
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
Ok(batch
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values()
.to_vec())
}
async fn collect_id_rowid(table: &Table) -> Result<Vec<(i64, u64)>> {
let batches = table
.query()
.with_row_id()
.execute()
.await?
.try_collect::<Vec<_>>()
.await?;
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let row_ids = batch
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
Ok(ids
.values()
.iter()
.copied()
.zip(row_ids.values().iter().copied())
.collect())
}
#[tokio::test]
async fn fetch_blobs_round_trips_bytes() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let payload: &[u8] = b"blob-round-trip-payload";
let table = create_inline_blob_table(&db, "t", &[1], &[Some(payload)]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 1);
assert_eq!(bytes.value(0), payload);
Ok(())
}
#[tokio::test]
async fn fetch_blobs_round_trips_nested_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("blob", true);
let DataType::Struct(blob_children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let blob_array = StructArray::new(
blob_children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([
b"hello".as_slice(),
b"world".as_slice(),
])) as ArrayRef,
Arc::new(StringArray::from(vec![None::<&str>, None::<&str>])) as ArrayRef,
],
None,
);
let info_fields: Fields = vec![Field::new("name", DataType::Utf8, false), blob_field].into();
let info_array = StructArray::new(
info_fields.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
Arc::new(blob_array) as ArrayRef,
],
None,
);
let schema = Arc::new(Schema::new(vec![Field::new(
"info",
DataType::Struct(info_fields),
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(info_array) as ArrayRef]).unwrap();
let table = db.create_table("t", batch).execute().await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("info.blob", &ids).await?;
assert_eq!(bytes.len(), 2);
let values: std::collections::HashSet<&[u8]> =
(0..bytes.len()).map(|i| bytes.value(i)).collect();
assert!(values.contains(b"hello".as_slice()));
assert!(values.contains(b"world".as_slice()));
Ok(())
}
#[tokio::test]
async fn blob_columns_lists_nested_dotted_paths() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("blob", true);
let info = Field::new(
"info",
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
true,
);
let schema = Arc::new(Schema::new(vec![
blob("thumbnail", true),
Field::new("id", DataType::Int64, false),
info,
]));
let table = db.create_empty_table("t", schema).execute().await?;
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "info.blob"]);
Ok(())
}
#[tokio::test]
async fn blob_columns_lists_blob_fields_in_order() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![
blob("thumbnail", true),
Field::new("id", DataType::Int64, false),
blob("image", true),
]));
let table = db.create_empty_table("t", schema).execute().await?;
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "image"]);
let plain = db
.create_empty_table(
"plain",
Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])),
)
.execute()
.await?;
assert!(plain.blob_columns().await?.is_empty());
Ok(())
}
#[tokio::test]
async fn fetch_blobs_preserves_null_alignment() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3, 4],
&[Some(b"a".as_slice()), None, Some(b"c"), None],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), ids.len());
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"a"),
2 | 4 => assert!(bytes.is_null(i)),
3 => assert_eq!(bytes.value(i), b"c"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_all_null_column_returns_all_nulls() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1, 2], &[None, None]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 2);
assert_eq!(bytes.null_count(), 2);
let files = table.fetch_blob_files("image", &ids).await?;
assert_eq!(files.len(), 2);
assert!(files.iter().all(Option::is_none));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_aligns_with_reordered_and_duplicate_ids() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let by_id = |want: i64| pairs.iter().find(|(id, _)| *id == want).unwrap().1;
let request = vec![by_id(3), by_id(1), by_id(3), by_id(2)];
let bytes = table.fetch_blobs("image", &request).await?;
assert_eq!(bytes.len(), 4);
assert_eq!(bytes.value(0), b"three");
assert_eq!(bytes.value(1), b"one");
assert_eq!(bytes.value(2), b"three");
assert_eq!(bytes.value(3), b"two");
Ok(())
}
#[tokio::test]
async fn fetch_blobs_empty_ids_returns_empty() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
assert_eq!(table.fetch_blobs("image", &[]).await?.len(), 0);
assert!(table.fetch_blob_files("image", &[]).await?.is_empty());
Ok(())
}
#[tokio::test]
async fn fetch_blobs_out_of_range_id_errors_without_panic() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("image", &[u64::MAX]).await.unwrap_err();
assert!(err.to_string().contains("row ids"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_non_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("id", &[0]).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
assert!(err.to_string().contains("'id' is not a blob column"));
let err = table.fetch_blob_files("id", &[0]).await.unwrap_err();
assert!(err.to_string().contains("'id' is not a blob column"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_unknown_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("missing", &[0]).await.unwrap_err();
assert!(err.to_string().contains("no column named 'missing'"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_legacy_v1_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([("lance-encoding:blob".to_string(), "true".to_string())]),
);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
legacy,
]));
let table = db.create_empty_table("t", schema).execute().await?;
let err = table.fetch_blobs("image", &[0]).await.unwrap_err();
assert!(err.to_string().contains("legacy blob column"));
Ok(())
}
#[tokio::test]
async fn fetch_blob_files_reads_lazily_and_aligns_nulls() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table =
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"lazy-bytes".as_slice()), None])
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let files = table.fetch_blob_files("image", &ids).await?;
assert_eq!(files.len(), 2);
for ((id, _), file) in pairs.iter().zip(&files) {
match id {
1 => {
let handle = file.as_ref().unwrap();
assert_eq!(handle.read().await.unwrap().as_ref(), b"lazy-bytes");
}
2 => assert!(file.is_none()),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_reads_multiple_blob_columns_independently() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
blob("thumbnail", true),
]));
let table = db.create_empty_table("t", schema).execute().await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
Field::new("thumbnail", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2])),
Arc::new(LargeBinaryArray::from_iter(vec![
Some(b"image-1".as_slice()),
None,
])),
Arc::new(LargeBinaryArray::from_iter(vec![
None,
Some(b"thumb-2".as_slice()),
])),
],
)
.unwrap();
table.add(batch).execute().await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let images = table.fetch_blobs("image", &ids).await?;
let thumbs = table.fetch_blobs("thumbnail", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => {
assert_eq!(images.value(i), b"image-1");
assert!(thumbs.is_null(i));
}
2 => {
assert!(images.is_null(i));
assert_eq!(thumbs.value(i), b"thumb-2");
}
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_spans_fragments() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
table
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
.execute()
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"frag-one"),
2 => assert_eq!(bytes.value(i), b"frag-two"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_packed_payload_round_trip() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let big = vec![0xAB_u8; 100 * 1024];
let small = b"small".to_vec();
let table = create_inline_blob_table(
&db,
"t",
&[1, 2],
&[Some(big.as_slice()), Some(small.as_slice())],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), big.as_slice()),
2 => assert_eq!(bytes.value(i), small.as_slice()),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_after_delete() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
)
.await?;
table.delete("id = 2").await?;
let pairs = collect_id_rowid(&table).await?;
assert_eq!(pairs.len(), 2);
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"one"),
3 => assert_eq!(bytes.value(i), b"three"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_with_precompaction_row_ids_survives_compaction() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
table
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
.execute()
.await?;
let pairs_before = collect_id_rowid(&table).await?;
let ids_before: Vec<u64> = pairs_before.iter().map(|(_, rowid)| *rowid).collect();
table
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await?;
let bytes_after = table.fetch_blobs("image", &ids_before).await?;
assert_eq!(bytes_after.len(), 2);
for (i, (id, _)) in pairs_before.iter().enumerate() {
match id {
1 => assert_eq!(bytes_after.value(i), b"frag-one"),
2 => assert_eq!(bytes_after.value(i), b"frag-two"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn zero_length_blob_reads_back_as_null() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"".as_slice())]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 1);
assert!(bytes.is_null(0));
Ok(())
}
const DEDICATED_BLOB_LEN: usize = 64 * 1024;
const SCRAMBLED_LOGICAL_IDS: [i64; 7] = [6, 3, 1, 4, 6, 2, 5];
fn dedicated_blob_bytes(tag: u8) -> Vec<u8> {
vec![tag; DEDICATED_BLOB_LEN]
}
async fn multi_fragment_dedicated_blob_table(db: &Connection) -> Result<Table> {
let rows: [(i64, Option<u8>); 6] = [
(1, Some(1)),
(2, Some(2)),
(3, None),
(4, Some(4)),
(5, None),
(6, Some(6)),
];
let mut table: Option<Table> = None;
for (logical_id, blob_tag) in rows {
let bytes = blob_tag.map(dedicated_blob_bytes);
let image = [bytes.as_deref()];
table = Some(match table {
None => create_inline_blob_table(db, "t", &[logical_id], &image).await?,
Some(t) => {
t.add(binary_input_batch(&[logical_id], &image))
.execute()
.await?;
t
}
});
}
Ok(table.unwrap())
}
async fn row_ids_for_logical(table: &Table, logical_ids: &[i64]) -> Result<Vec<u64>> {
let id_rowid = collect_id_rowid(table).await?;
Ok(logical_ids
.iter()
.map(|logical_id| {
id_rowid
.iter()
.find(|(id, _)| id == logical_id)
.map(|(_, row_id)| *row_id)
.unwrap()
})
.collect())
}
#[tokio::test]
async fn fetch_blobs_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = multi_fragment_dedicated_blob_table(&db).await?;
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
let bytes = table.fetch_blobs("image", &row_ids).await?;
assert_eq!(bytes.len(), SCRAMBLED_LOGICAL_IDS.len());
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
match logical_id {
3 | 5 => assert!(bytes.is_null(slot)),
id => assert_eq!(
bytes.value(slot),
dedicated_blob_bytes(*id as u8).as_slice()
),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blob_files_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = multi_fragment_dedicated_blob_table(&db).await?;
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
let files = table.fetch_blob_files("image", &row_ids).await?;
assert_eq!(files.len(), SCRAMBLED_LOGICAL_IDS.len());
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
match logical_id {
3 | 5 => assert!(files[slot].is_none()),
id => {
let payload = files[slot].as_ref().unwrap().read().await?;
assert_eq!(payload.as_ref(), dedicated_blob_bytes(*id as u8).as_slice());
}
}
}
Ok(())
}