mirror of
https://github.com/lancedb/lancedb.git
synced 2026-07-04 03:20:40 +00:00
Compare commits
24 Commits
yang/appro
...
python-v0.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26481a4b74 | ||
|
|
08596f1644 | ||
|
|
f16da19b78 | ||
|
|
41ac32a344 | ||
|
|
ba1ef34481 | ||
|
|
85d870b397 | ||
|
|
c46d59d2ee | ||
|
|
113f187c2d | ||
|
|
3b279f5705 | ||
|
|
e1334954d7 | ||
|
|
2f65a233fe | ||
|
|
e81356089a | ||
|
|
4f4cce3f64 | ||
|
|
c1c19cd133 | ||
|
|
ce5dadd386 | ||
|
|
1f8ebef3cd | ||
|
|
217fd8491d | ||
|
|
9128dbcd7a | ||
|
|
394bb34fa2 | ||
|
|
b2ae763254 | ||
|
|
1bead6960c | ||
|
|
0abf641733 | ||
|
|
976edeb2ff | ||
|
|
b46a44f873 |
@@ -1,5 +1,5 @@
|
|||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.30.1-beta.2"
|
current_version = "0.31.0-beta.1"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
@@ -23,6 +23,8 @@ allow_dirty = true
|
|||||||
commit = true
|
commit = true
|
||||||
message = "Bump version: {current_version} → {new_version}"
|
message = "Bump version: {current_version} → {new_version}"
|
||||||
commit_args = ""
|
commit_args = ""
|
||||||
|
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
|
||||||
|
allow_shell_hooks = true
|
||||||
|
|
||||||
# Java maven files
|
# Java maven files
|
||||||
pre_commit_hooks = [
|
pre_commit_hooks = [
|
||||||
|
|||||||
105
Cargo.lock
generated
105
Cargo.lock
generated
@@ -1472,9 +1472,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bytes"
|
name = "bytes"
|
||||||
version = "1.11.1"
|
version = "1.12.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
|
checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bytes-utils"
|
name = "bytes-utils"
|
||||||
@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fsst"
|
name = "fsst"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"rand 0.9.4",
|
"rand 0.9.4",
|
||||||
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance"
|
name = "lance"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -4810,8 +4810,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-arrow"
|
name = "lance-arrow"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -4832,7 +4832,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-arrow-scalar"
|
name = "lance-arrow-scalar"
|
||||||
version = "58.0.0"
|
version = "58.0.0"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -4846,7 +4846,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-arrow-stats"
|
name = "lance-arrow-stats"
|
||||||
version = "58.0.0"
|
version = "58.0.0"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-schema",
|
"arrow-schema",
|
||||||
@@ -4855,8 +4855,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-bitpacking"
|
name = "lance-bitpacking"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrayref",
|
"arrayref",
|
||||||
"paste",
|
"paste",
|
||||||
@@ -4865,8 +4865,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-core"
|
name = "lance-core"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -4904,8 +4904,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-datafusion"
|
name = "lance-datafusion"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4935,8 +4935,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-datagen"
|
name = "lance-datagen"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4953,8 +4953,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-derive"
|
name = "lance-derive"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
@@ -4963,8 +4963,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-encoding"
|
name = "lance-encoding"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -4999,8 +4999,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-file"
|
name = "lance-file"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -5030,8 +5030,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-index"
|
name = "lance-index"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"arrow",
|
"arrow",
|
||||||
@@ -5096,8 +5096,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-io"
|
name = "lance-io"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-arith",
|
"arrow-arith",
|
||||||
@@ -5138,8 +5138,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-linalg"
|
name = "lance-linalg"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -5150,12 +5150,13 @@ dependencies = [
|
|||||||
"lance-core",
|
"lance-core",
|
||||||
"num-traits",
|
"num-traits",
|
||||||
"rand 0.9.4",
|
"rand 0.9.4",
|
||||||
|
"rayon",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-namespace"
|
name = "lance-namespace"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -5167,8 +5168,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-namespace-impls"
|
name = "lance-namespace-impls"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-ipc",
|
"arrow-ipc",
|
||||||
@@ -5222,8 +5223,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-select"
|
name = "lance-select"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -5238,8 +5239,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-table"
|
name = "lance-table"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
@@ -5278,8 +5279,8 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-testing"
|
name = "lance-testing"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-schema",
|
"arrow-schema",
|
||||||
@@ -5292,20 +5293,21 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lance-tokenizer"
|
name = "lance-tokenizer"
|
||||||
version = "8.0.0-beta.16"
|
version = "9.0.0-beta.2"
|
||||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.16#6e734df607f2841fe3bba82f05a90f3174933bab"
|
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.2#23211989de648fefc4454f5eee09ec176f0a465b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"icu_segmenter",
|
"icu_segmenter",
|
||||||
"jieba-rs",
|
"jieba-rs",
|
||||||
"lindera",
|
"lindera",
|
||||||
"rust-stemmers",
|
"rust-stemmers",
|
||||||
"serde",
|
"serde",
|
||||||
|
"stop-words",
|
||||||
"unicode-normalization",
|
"unicode-normalization",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lancedb"
|
name = "lancedb"
|
||||||
version = "0.30.1-beta.2"
|
version = "0.31.0-beta.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash",
|
"ahash",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
@@ -5388,7 +5390,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lancedb-nodejs"
|
name = "lancedb-nodejs"
|
||||||
version = "0.30.1-beta.2"
|
version = "0.31.0-beta.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow-array",
|
"arrow-array",
|
||||||
"arrow-buffer",
|
"arrow-buffer",
|
||||||
@@ -5413,7 +5415,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lancedb-python"
|
name = "lancedb-python"
|
||||||
version = "0.33.1-beta.2"
|
version = "0.34.0-beta.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arrow",
|
"arrow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
@@ -5956,9 +5958,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "napi"
|
name = "napi"
|
||||||
version = "3.9.1"
|
version = "3.9.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ad513ff22558f1830b595ea6eb4091da48145d09a222ce157e781896f78be0b9"
|
checksum = "fbd9f9295f3ff5921e78a71222c3361a8216f7760b1a99a6ad4e8441de18bbb9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 2.11.1",
|
"bitflags 2.11.1",
|
||||||
"chrono",
|
"chrono",
|
||||||
@@ -9205,6 +9207,15 @@ version = "0.2.7"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e51f1e89f093f99e7432c491c382b88a6860a5adbe6bf02574bf0a08efff1978"
|
checksum = "e51f1e89f093f99e7432c491c382b88a6860a5adbe6bf02574bf0a08efff1978"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "stop-words"
|
||||||
|
version = "0.10.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d68df56303396bcfb639455b3c166804aeb7994005010aab5e9e8a1277b8871d"
|
||||||
|
dependencies = [
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "str_stack"
|
name = "str_stack"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
|
|||||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
|||||||
rust-version = "1.91.0"
|
rust-version = "1.91.0"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
lance = { "version" = "=8.0.0-beta.16", default-features = false, "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-core = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-core = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-datagen = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-datagen = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-file = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-file = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-io = { "version" = "=8.0.0-beta.16", default-features = false, "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-io = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-index = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-index = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-linalg = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-linalg = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-namespace = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-namespace = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-namespace-impls = { "version" = "=8.0.0-beta.16", default-features = false, "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-namespace-impls = { "version" = "=9.0.0-beta.2", default-features = false, "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-table = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-table = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-testing = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-testing = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-datafusion = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-datafusion = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-encoding = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-encoding = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
lance-arrow = { "version" = "=8.0.0-beta.16", "tag" = "v8.0.0-beta.16", "git" = "https://github.com/lance-format/lance.git" }
|
lance-arrow = { "version" = "=9.0.0-beta.2", "tag" = "v9.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||||
ahash = "0.8"
|
ahash = "0.8"
|
||||||
# Note that this one does not include pyarrow
|
# Note that this one does not include pyarrow
|
||||||
arrow = { version = "58.0.0", optional = false }
|
arrow = { version = "58.0.0", optional = false }
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.lancedb</groupId>
|
<groupId>com.lancedb</groupId>
|
||||||
<artifactId>lancedb-core</artifactId>
|
<artifactId>lancedb-core</artifactId>
|
||||||
<version>0.30.1-beta.2</version>
|
<version>0.31.0-beta.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<groupId>com.lancedb</groupId>
|
<groupId>com.lancedb</groupId>
|
||||||
<artifactId>lancedb-parent</artifactId>
|
<artifactId>lancedb-parent</artifactId>
|
||||||
<version>0.30.1-beta.2</version>
|
<version>0.31.0-beta.1</version>
|
||||||
<relativePath>../pom.xml</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
<groupId>com.lancedb</groupId>
|
<groupId>com.lancedb</groupId>
|
||||||
<artifactId>lancedb-parent</artifactId>
|
<artifactId>lancedb-parent</artifactId>
|
||||||
<version>0.30.1-beta.2</version>
|
<version>0.31.0-beta.1</version>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
<name>${project.artifactId}</name>
|
<name>${project.artifactId}</name>
|
||||||
<description>LanceDB Java SDK Parent POM</description>
|
<description>LanceDB Java SDK Parent POM</description>
|
||||||
@@ -28,7 +28,7 @@
|
|||||||
<properties>
|
<properties>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<arrow.version>15.0.0</arrow.version>
|
<arrow.version>15.0.0</arrow.version>
|
||||||
<lance-core.version>8.0.0-beta.16</lance-core.version>
|
<lance-core.version>9.0.0-beta.2</lance-core.version>
|
||||||
<spotless.skip>false</spotless.skip>
|
<spotless.skip>false</spotless.skip>
|
||||||
<spotless.version>2.30.0</spotless.version>
|
<spotless.version>2.30.0</spotless.version>
|
||||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb-nodejs"
|
name = "lancedb-nodejs"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
version = "0.30.1-beta.2"
|
version = "0.31.0-beta.1"
|
||||||
publish = false
|
publish = false
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
description.workspace = true
|
description.workspace = true
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-darwin-arm64",
|
"name": "@lancedb/lancedb-darwin-arm64",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": ["darwin"],
|
"os": ["darwin"],
|
||||||
"cpu": ["arm64"],
|
"cpu": ["arm64"],
|
||||||
"main": "lancedb.darwin-arm64.node",
|
"main": "lancedb.darwin-arm64.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["arm64"],
|
"cpu": ["arm64"],
|
||||||
"main": "lancedb.linux-arm64-gnu.node",
|
"main": "lancedb.linux-arm64-gnu.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["arm64"],
|
"cpu": ["arm64"],
|
||||||
"main": "lancedb.linux-arm64-musl.node",
|
"main": "lancedb.linux-arm64-musl.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["x64"],
|
"cpu": ["x64"],
|
||||||
"main": "lancedb.linux-x64-gnu.node",
|
"main": "lancedb.linux-x64-gnu.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": ["linux"],
|
"os": ["linux"],
|
||||||
"cpu": ["x64"],
|
"cpu": ["x64"],
|
||||||
"main": "lancedb.linux-x64-musl.node",
|
"main": "lancedb.linux-x64-musl.node",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": [
|
"os": [
|
||||||
"win32"
|
"win32"
|
||||||
],
|
],
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"os": ["win32"],
|
"os": ["win32"],
|
||||||
"cpu": ["x64"],
|
"cpu": ["x64"],
|
||||||
"main": "lancedb.win32-x64-msvc.node",
|
"main": "lancedb.win32-x64-msvc.node",
|
||||||
|
|||||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "@lancedb/lancedb",
|
"name": "@lancedb/lancedb",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"lockfileVersion": 3,
|
"lockfileVersion": 3,
|
||||||
"requires": true,
|
"requires": true,
|
||||||
"packages": {
|
"packages": {
|
||||||
"": {
|
"": {
|
||||||
"name": "@lancedb/lancedb",
|
"name": "@lancedb/lancedb",
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"cpu": [
|
"cpu": [
|
||||||
"x64",
|
"x64",
|
||||||
"arm64"
|
"arm64"
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
"ann"
|
"ann"
|
||||||
],
|
],
|
||||||
"private": false,
|
"private": false,
|
||||||
"version": "0.30.1-beta.2",
|
"version": "0.31.0-beta.1",
|
||||||
"main": "dist/index.js",
|
"main": "dist/index.js",
|
||||||
"exports": {
|
"exports": {
|
||||||
".": "./dist/index.js",
|
".": "./dist/index.js",
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
[tool.bumpversion]
|
[tool.bumpversion]
|
||||||
current_version = "0.33.1-beta.2"
|
current_version = "0.34.0-beta.2"
|
||||||
parse = """(?x)
|
parse = """(?x)
|
||||||
(?P<major>0|[1-9]\\d*)\\.
|
(?P<major>0|[1-9]\\d*)\\.
|
||||||
(?P<minor>0|[1-9]\\d*)\\.
|
(?P<minor>0|[1-9]\\d*)\\.
|
||||||
@@ -23,6 +23,8 @@ allow_dirty = true
|
|||||||
commit = true
|
commit = true
|
||||||
message = "Bump version: {current_version} → {new_version}"
|
message = "Bump version: {current_version} → {new_version}"
|
||||||
commit_args = ""
|
commit_args = ""
|
||||||
|
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
|
||||||
|
allow_shell_hooks = true
|
||||||
|
|
||||||
# Update Cargo.lock after version bump
|
# Update Cargo.lock after version bump
|
||||||
pre_commit_hooks = [
|
pre_commit_hooks = [
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb-python"
|
name = "lancedb-python"
|
||||||
version = "0.33.1-beta.2"
|
version = "0.34.0-beta.2"
|
||||||
publish = false
|
publish = false
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
description = "Python bindings for LanceDB"
|
description = "Python bindings for LanceDB"
|
||||||
|
|||||||
@@ -71,6 +71,9 @@ from lancedb.embeddings import EmbeddingFunctionConfig
|
|||||||
from ._lancedb import Session
|
from ._lancedb import Session
|
||||||
|
|
||||||
|
|
||||||
|
_MAX_QUERY_K = 2**31 - 1
|
||||||
|
|
||||||
|
|
||||||
def _query_to_namespace_request(
|
def _query_to_namespace_request(
|
||||||
table_id: List[str],
|
table_id: List[str],
|
||||||
query: "Query",
|
query: "Query",
|
||||||
@@ -148,7 +151,8 @@ def _query_to_namespace_request(
|
|||||||
if query.limit is not None:
|
if query.limit is not None:
|
||||||
k = query.limit
|
k = query.limit
|
||||||
elif query.vector is None and query.full_text_query is None:
|
elif query.vector is None and query.full_text_query is None:
|
||||||
k = sys.maxsize
|
# limit k to max i32 value to avoid client overflows
|
||||||
|
k = _MAX_QUERY_K
|
||||||
else:
|
else:
|
||||||
k = 10
|
k = 10
|
||||||
|
|
||||||
|
|||||||
@@ -275,7 +275,18 @@ def _py_type_to_arrow_type(py_type: Type[Any], field: FieldInfo) -> pa.DataType:
|
|||||||
tz = get_extras(field, "tz")
|
tz = get_extras(field, "tz")
|
||||||
return pa.timestamp("us", tz=tz)
|
return pa.timestamp("us", tz=tz)
|
||||||
elif getattr(py_type, "__origin__", None) in (list, tuple):
|
elif getattr(py_type, "__origin__", None) in (list, tuple):
|
||||||
child = py_type.__args__[0]
|
# A bare, unparameterised ``typing.List`` / ``typing.Tuple`` matches this
|
||||||
|
# branch (its ``__origin__`` is ``list`` / ``tuple``) but has no
|
||||||
|
# ``__args__``, so we cannot infer the element type. Raise a clear
|
||||||
|
# ``TypeError`` instead of crashing with an opaque ``AttributeError``.
|
||||||
|
args = getattr(py_type, "__args__", None)
|
||||||
|
if not args:
|
||||||
|
raise TypeError(
|
||||||
|
"Converting Pydantic type to Arrow Type: unsupported type "
|
||||||
|
f"{py_type}. Specify the element type, e.g. List[int] instead "
|
||||||
|
"of a bare List."
|
||||||
|
)
|
||||||
|
child = args[0]
|
||||||
return _pydantic_list_child_to_arrow(child, field)
|
return _pydantic_list_child_to_arrow(child, field)
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f"Converting Pydantic type to Arrow Type: unsupported type {py_type}."
|
f"Converting Pydantic type to Arrow Type: unsupported type {py_type}."
|
||||||
|
|||||||
@@ -86,7 +86,10 @@ def _from_list(data: list) -> Scannable:
|
|||||||
|
|
||||||
@to_scannable.register(dict)
|
@to_scannable.register(dict)
|
||||||
def _from_dict(data: dict) -> Scannable:
|
def _from_dict(data: dict) -> Scannable:
|
||||||
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
|
raise ValueError(
|
||||||
|
"Cannot create or add rows from a single dictionary. "
|
||||||
|
"Use a list of dictionaries instead."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@to_scannable.register(LanceModel)
|
@to_scannable.register(LanceModel)
|
||||||
|
|||||||
@@ -243,7 +243,10 @@ def _into_pyarrow_reader(
|
|||||||
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
|
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
|
||||||
|
|
||||||
if isinstance(data, dict):
|
if isinstance(data, dict):
|
||||||
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
|
raise ValueError(
|
||||||
|
"Cannot create or add rows from a single dictionary. "
|
||||||
|
"Use a list of dictionaries instead."
|
||||||
|
)
|
||||||
|
|
||||||
if isinstance(data, list):
|
if isinstance(data, list):
|
||||||
# Handle empty list case
|
# Handle empty list case
|
||||||
|
|||||||
@@ -373,9 +373,15 @@ def _(value: list):
|
|||||||
@value_to_sql.register(dict)
|
@value_to_sql.register(dict)
|
||||||
def _(value: dict):
|
def _(value: dict):
|
||||||
# https://datafusion.apache.org/user-guide/sql/scalar_functions.html#named-struct
|
# https://datafusion.apache.org/user-guide/sql/scalar_functions.html#named-struct
|
||||||
|
# Render the field name through value_to_sql(str(...)) as well so that keys
|
||||||
|
# containing characters meaningful in SQL (e.g. a single quote) are escaped
|
||||||
|
# the same way string values are. A bare f"'{k}'" would emit invalid SQL for
|
||||||
|
# a key like "it's".
|
||||||
return (
|
return (
|
||||||
"named_struct("
|
"named_struct("
|
||||||
+ ", ".join(f"'{k}', {value_to_sql(v)}" for k, v in value.items())
|
+ ", ".join(
|
||||||
|
f"{value_to_sql(str(k))}, {value_to_sql(v)}" for k, v in value.items()
|
||||||
|
)
|
||||||
+ ")"
|
+ ")"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -91,7 +91,9 @@ async def test_create_scalar_index(some_table: AsyncTable):
|
|||||||
# Can recreate if replace=True
|
# Can recreate if replace=True
|
||||||
await some_table.create_index("id", replace=True)
|
await some_table.create_index("id", replace=True)
|
||||||
indices = await some_table.list_indices()
|
indices = await some_table.list_indices()
|
||||||
assert str(indices) == '[Index(BTree, columns=["id"], name="id_idx")]'
|
assert str(indices).startswith(
|
||||||
|
'[IndexConfig(name="id_idx", index_type="BTree", columns=["id"]'
|
||||||
|
)
|
||||||
assert len(indices) == 1
|
assert len(indices) == 1
|
||||||
assert indices[0].index_type == "BTree"
|
assert indices[0].index_type == "BTree"
|
||||||
assert indices[0].columns == ["id"]
|
assert indices[0].columns == ["id"]
|
||||||
@@ -106,6 +108,27 @@ async def test_create_scalar_index(some_table: AsyncTable):
|
|||||||
assert len(indices) == 0
|
assert len(indices) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_index_config_repr(db_async):
|
||||||
|
# Use >= 1000 rows so the thousands separator in the repr is exercised.
|
||||||
|
nrows = 1500
|
||||||
|
table = await db_async.create_table(
|
||||||
|
"repr_table", pa.Table.from_pydict({"id": list(range(nrows))})
|
||||||
|
)
|
||||||
|
await table.create_index("id", config=BTree())
|
||||||
|
indices = await table.list_indices()
|
||||||
|
assert len(indices) == 1
|
||||||
|
|
||||||
|
r = repr(indices[0])
|
||||||
|
assert r.startswith('IndexConfig(name="id_idx", index_type="BTree", columns=["id"]')
|
||||||
|
# Integer counts use `_` thousands separators (valid Python int syntax).
|
||||||
|
assert "num_indexed_rows=1_500" in r
|
||||||
|
assert "num_unindexed_rows=0" in r
|
||||||
|
# created_at renders as a datetime so the value round-trips.
|
||||||
|
assert "created_at=datetime.datetime(" in r
|
||||||
|
assert r.endswith(")")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
|
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
|
||||||
metadata_type = pa.struct(
|
metadata_type = pa.struct(
|
||||||
@@ -198,7 +221,9 @@ async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
|
|||||||
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
|
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
|
||||||
await some_table.create_index("fsb", config=BTree())
|
await some_table.create_index("fsb", config=BTree())
|
||||||
indices = await some_table.list_indices()
|
indices = await some_table.list_indices()
|
||||||
assert str(indices) == '[Index(BTree, columns=["fsb"], name="fsb_idx")]'
|
assert str(indices).startswith(
|
||||||
|
'[IndexConfig(name="fsb_idx", index_type="BTree", columns=["fsb"]'
|
||||||
|
)
|
||||||
assert len(indices) == 1
|
assert len(indices) == 1
|
||||||
assert indices[0].index_type == "BTree"
|
assert indices[0].index_type == "BTree"
|
||||||
assert indices[0].columns == ["fsb"]
|
assert indices[0].columns == ["fsb"]
|
||||||
@@ -247,7 +272,9 @@ async def test_create_bitmap_index(some_table: AsyncTable):
|
|||||||
async def test_create_label_list_index(some_table: AsyncTable):
|
async def test_create_label_list_index(some_table: AsyncTable):
|
||||||
await some_table.create_index("tags", config=LabelList())
|
await some_table.create_index("tags", config=LabelList())
|
||||||
indices = await some_table.list_indices()
|
indices = await some_table.list_indices()
|
||||||
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
|
assert str(indices).startswith(
|
||||||
|
'[IndexConfig(name="tags_idx", index_type="LabelList", columns=["tags"]'
|
||||||
|
)
|
||||||
plan = await some_table.query().where("array_has(tags, 'tag0')").explain_plan()
|
plan = await some_table.query().where("array_has(tags, 'tag0')").explain_plan()
|
||||||
assert "ScalarIndexQuery" in plan
|
assert "ScalarIndexQuery" in plan
|
||||||
|
|
||||||
@@ -262,7 +289,9 @@ async def test_create_large_list_label_list_index(db_async):
|
|||||||
|
|
||||||
await table.create_index("tags", config=LabelList())
|
await table.create_index("tags", config=LabelList())
|
||||||
indices = await table.list_indices()
|
indices = await table.list_indices()
|
||||||
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
|
assert str(indices).startswith(
|
||||||
|
'[IndexConfig(name="tags_idx", index_type="LabelList", columns=["tags"]'
|
||||||
|
)
|
||||||
plan = await table.query().where("array_has(tags, 'shared')").explain_plan()
|
plan = await table.query().where("array_has(tags, 'shared')").explain_plan()
|
||||||
assert "ScalarIndexQuery" in plan
|
assert "ScalarIndexQuery" in plan
|
||||||
|
|
||||||
@@ -299,7 +328,9 @@ async def test_create_label_list_index_rejects_list_struct(db_async):
|
|||||||
async def test_full_text_search_index(some_table: AsyncTable):
|
async def test_full_text_search_index(some_table: AsyncTable):
|
||||||
await some_table.create_index("tags", config=FTS(with_position=False))
|
await some_table.create_index("tags", config=FTS(with_position=False))
|
||||||
indices = await some_table.list_indices()
|
indices = await some_table.list_indices()
|
||||||
assert str(indices) == '[Index(FTS, columns=["tags"], name="tags_idx")]'
|
assert str(indices).startswith(
|
||||||
|
'[IndexConfig(name="tags_idx", index_type="FTS", columns=["tags"]'
|
||||||
|
)
|
||||||
|
|
||||||
await some_table.prewarm_index("tags_idx")
|
await some_table.prewarm_index("tags_idx")
|
||||||
|
|
||||||
|
|||||||
@@ -5,11 +5,11 @@
|
|||||||
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
|
||||||
import pytest
|
import pytest
|
||||||
import pyarrow as pa
|
import pyarrow as pa
|
||||||
import lancedb
|
import lancedb
|
||||||
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
|
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
|
||||||
|
from lancedb.namespace import _MAX_QUERY_K
|
||||||
from lancedb.table import AsyncTable, LanceTable
|
from lancedb.table import AsyncTable, LanceTable
|
||||||
|
|
||||||
|
|
||||||
@@ -816,10 +816,13 @@ class TestPushdownOperations:
|
|||||||
["geneva", "hist"],
|
["geneva", "hist"],
|
||||||
["geneva", "hist"],
|
["geneva", "hist"],
|
||||||
]
|
]
|
||||||
|
# Unlimited reads cap k at i32::MAX (the namespace query_table `k`
|
||||||
|
# field is i32); sys.maxsize would overflow the Rust binding.
|
||||||
assert [request.k for request in namespace_client.requests] == [
|
assert [request.k for request in namespace_client.requests] == [
|
||||||
sys.maxsize,
|
_MAX_QUERY_K,
|
||||||
sys.maxsize,
|
_MAX_QUERY_K,
|
||||||
]
|
]
|
||||||
|
assert all(r.k <= 2**31 - 1 for r in namespace_client.requests)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -874,10 +877,13 @@ class TestAsyncPushdownOperations:
|
|||||||
["geneva", "hist"],
|
["geneva", "hist"],
|
||||||
["geneva", "hist"],
|
["geneva", "hist"],
|
||||||
]
|
]
|
||||||
|
# Unlimited reads cap k at i32::MAX (the namespace query_table `k`
|
||||||
|
# field is i32); sys.maxsize would overflow the Rust binding.
|
||||||
assert [request.k for request in namespace_client.requests] == [
|
assert [request.k for request in namespace_client.requests] == [
|
||||||
sys.maxsize,
|
_MAX_QUERY_K,
|
||||||
sys.maxsize,
|
_MAX_QUERY_K,
|
||||||
]
|
]
|
||||||
|
assert all(r.k <= 2**31 - 1 for r in namespace_client.requests)
|
||||||
|
|
||||||
|
|
||||||
def test_local_table_to_arrow_and_to_pandas_are_unchanged(tmp_path):
|
def test_local_table_to_arrow_and_to_pandas_are_unchanged(tmp_path):
|
||||||
|
|||||||
@@ -188,6 +188,18 @@ def test_nested_struct_list():
|
|||||||
assert schema == expect_schema
|
assert schema == expect_schema
|
||||||
|
|
||||||
|
|
||||||
|
def test_bare_generic_raises_type_error():
|
||||||
|
# A bare, unparameterised List/Tuple has no element type to map to Arrow.
|
||||||
|
# It should raise a clear TypeError, not crash with AttributeError: __args__.
|
||||||
|
for bare in (List, Tuple):
|
||||||
|
|
||||||
|
class TestModel(pydantic.BaseModel):
|
||||||
|
items: bare
|
||||||
|
|
||||||
|
with pytest.raises(TypeError, match="unsupported type"):
|
||||||
|
pydantic_to_schema(TestModel)
|
||||||
|
|
||||||
|
|
||||||
def test_nested_struct_list_optional():
|
def test_nested_struct_list_optional():
|
||||||
class SplitInfo(pydantic.BaseModel):
|
class SplitInfo(pydantic.BaseModel):
|
||||||
start_frame: int
|
start_frame: int
|
||||||
|
|||||||
@@ -301,6 +301,16 @@ def test_create_table(mem_db: DBConnection):
|
|||||||
assert expected == tbl
|
assert expected == tbl
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_table_rejects_single_dictionary(mem_db: DBConnection):
|
||||||
|
data = {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}
|
||||||
|
with pytest.raises(ValueError) as excep_info:
|
||||||
|
mem_db.create_table("test", data=data)
|
||||||
|
assert (
|
||||||
|
str(excep_info.value) == "Cannot create or add rows from a single dictionary. "
|
||||||
|
"Use a list of dictionaries instead."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_empty_table(mem_db: DBConnection):
|
def test_empty_table(mem_db: DBConnection):
|
||||||
schema = pa.schema(
|
schema = pa.schema(
|
||||||
[
|
[
|
||||||
@@ -330,8 +340,8 @@ def test_add_dictionary(mem_db: DBConnection):
|
|||||||
with pytest.raises(ValueError) as excep_info:
|
with pytest.raises(ValueError) as excep_info:
|
||||||
tbl.add(data=data)
|
tbl.add(data=data)
|
||||||
assert (
|
assert (
|
||||||
str(excep_info.value)
|
str(excep_info.value) == "Cannot create or add rows from a single dictionary. "
|
||||||
== "Cannot add a single dictionary to a table. Use a list."
|
"Use a list of dictionaries instead."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -149,6 +149,21 @@ def test_value_to_sql_dict():
|
|||||||
assert value_to_sql({}) == "named_struct()"
|
assert value_to_sql({}) == "named_struct()"
|
||||||
|
|
||||||
|
|
||||||
|
def test_value_to_sql_dict_key_escaping():
|
||||||
|
# Struct field names that contain a single quote must be escaped (doubled)
|
||||||
|
# the same way string values are, otherwise value_to_sql emits invalid SQL
|
||||||
|
# such as named_struct('it's', 1).
|
||||||
|
assert value_to_sql({"it's": 1}) == "named_struct('it''s', 1)"
|
||||||
|
assert (
|
||||||
|
value_to_sql({"o'brien": "d'angelo"}) == "named_struct('o''brien', 'd''angelo')"
|
||||||
|
)
|
||||||
|
# Escaping also applies to keys of nested structs.
|
||||||
|
assert (
|
||||||
|
value_to_sql({"outer": {"in'r": 1}})
|
||||||
|
== "named_struct('outer', named_struct('in''r', 1))"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_value_to_sql_numpy_scalars():
|
def test_value_to_sql_numpy_scalars():
|
||||||
# numpy scalars (e.g. pulled from an ndarray or a pandas column) must
|
# numpy scalars (e.g. pulled from an ndarray or a pandas column) must
|
||||||
# convert the same way as their native Python counterparts. np.float64
|
# convert the same way as their native Python counterparts. np.float64
|
||||||
|
|||||||
@@ -319,11 +319,53 @@ pub struct IndexConfig {
|
|||||||
|
|
||||||
#[pymethods]
|
#[pymethods]
|
||||||
impl IndexConfig {
|
impl IndexConfig {
|
||||||
pub fn __repr__(&self) -> String {
|
pub fn __repr__(&self, py: Python<'_>) -> String {
|
||||||
format!(
|
let mut fields = vec![
|
||||||
"Index({}, columns={:?}, name=\"{}\")",
|
format!("name={:?}", self.name),
|
||||||
self.index_type, self.columns, self.name
|
format!("index_type={:?}", self.index_type),
|
||||||
)
|
format!("columns={:?}", self.columns),
|
||||||
|
];
|
||||||
|
if let Some(v) = &self.index_uuid {
|
||||||
|
fields.push(format!("index_uuid={:?}", v));
|
||||||
|
}
|
||||||
|
if let Some(v) = &self.type_url {
|
||||||
|
fields.push(format!("type_url={:?}", v));
|
||||||
|
}
|
||||||
|
if let Some(v) = self.created_at {
|
||||||
|
// Render the datetime's own Python repr so the value round-trips,
|
||||||
|
// falling back to RFC 3339 if the conversion ever fails.
|
||||||
|
let rendered = v
|
||||||
|
.into_pyobject(py)
|
||||||
|
.ok()
|
||||||
|
.and_then(|obj| obj.into_any().repr().ok())
|
||||||
|
.map(|r| r.to_string())
|
||||||
|
.unwrap_or_else(|| v.to_rfc3339());
|
||||||
|
fields.push(format!("created_at={}", rendered));
|
||||||
|
}
|
||||||
|
if let Some(v) = self.num_indexed_rows {
|
||||||
|
fields.push(format!("num_indexed_rows={}", fmt_thousands(v)));
|
||||||
|
}
|
||||||
|
if let Some(v) = self.num_unindexed_rows {
|
||||||
|
fields.push(format!("num_unindexed_rows={}", fmt_thousands(v)));
|
||||||
|
}
|
||||||
|
if let Some(v) = self.size_bytes {
|
||||||
|
fields.push(format!("size_bytes={}", fmt_thousands(v)));
|
||||||
|
}
|
||||||
|
if let Some(v) = self.num_segments {
|
||||||
|
fields.push(format!("num_segments={}", v));
|
||||||
|
}
|
||||||
|
if let Some(v) = self.index_version {
|
||||||
|
fields.push(format!("index_version={}", v));
|
||||||
|
}
|
||||||
|
if let Some(v) = &self.index_details {
|
||||||
|
let details = v
|
||||||
|
.bind(py)
|
||||||
|
.repr()
|
||||||
|
.map(|r| r.to_string())
|
||||||
|
.unwrap_or_else(|_| "<unavailable>".to_string());
|
||||||
|
fields.push(format!("index_details={}", details));
|
||||||
|
}
|
||||||
|
format!("IndexConfig({})", fields.join(", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
// For backwards-compatibility with the old sync SDK, we also support getting
|
// For backwards-compatibility with the old sync SDK, we also support getting
|
||||||
@@ -352,6 +394,23 @@ impl IndexConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Format an integer with `_` thousands separators, e.g. `24_500_213`.
|
||||||
|
///
|
||||||
|
/// Underscores are valid Python int-literal syntax, so the repr stays
|
||||||
|
/// copy-pasteable and machine-parseable while remaining readable.
|
||||||
|
fn fmt_thousands(n: u64) -> String {
|
||||||
|
let digits = n.to_string();
|
||||||
|
let bytes = digits.as_bytes();
|
||||||
|
let mut out = String::with_capacity(digits.len() + digits.len() / 3);
|
||||||
|
for (i, b) in bytes.iter().enumerate() {
|
||||||
|
if i > 0 && (bytes.len() - i).is_multiple_of(3) {
|
||||||
|
out.push('_');
|
||||||
|
}
|
||||||
|
out.push(*b as char);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_index_details(py: Python<'_>, s: String) -> Py<PyAny> {
|
fn parse_index_details(py: Python<'_>, s: String) -> Py<PyAny> {
|
||||||
let json = py.import("json").expect("json module is always available");
|
let json = py.import("json").expect("json module is always available");
|
||||||
match json.call_method1("loads", (s.as_str(),)) {
|
match json.call_method1("loads", (s.as_str(),)) {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lancedb"
|
name = "lancedb"
|
||||||
version = "0.30.1-beta.2"
|
version = "0.31.0-beta.1"
|
||||||
edition.workspace = true
|
edition.workspace = true
|
||||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
|
|||||||
435
rust/lancedb/src/blob.rs
Normal file
435
rust/lancedb/src/blob.rs
Normal file
@@ -0,0 +1,435 @@
|
|||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
//! Lance blob v2 columns store large binary payloads out of line.
|
||||||
|
//!
|
||||||
|
//! Declare a column with [`blob`]. On write, [`crate::table::Table::add`] coerces
|
||||||
|
//! raw `Binary` / `LargeBinary` into the blob struct layout. Queries return
|
||||||
|
//! small descriptors, not bytes.
|
||||||
|
//!
|
||||||
|
//! Blob tables require Lance file format >= 2.2 and stable row ids at create.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrow_array::builder::LargeBinaryBuilder;
|
||||||
|
use arrow_array::{Array, LargeBinaryArray, RecordBatch, StructArray, UInt8Array, UInt64Array};
|
||||||
|
use arrow_schema::{DataType, Field, Schema};
|
||||||
|
use lance::dataset::{Dataset, WriteParams};
|
||||||
|
use lance_arrow::FieldExt;
|
||||||
|
use lance_core::datatypes::parse_field_path;
|
||||||
|
use lance_encoding::version::LanceFileVersion;
|
||||||
|
|
||||||
|
use crate::error::{Error, Result};
|
||||||
|
|
||||||
|
pub use lance::dataset::BlobFile;
|
||||||
|
|
||||||
|
/// Creates an Arrow field for a Lance blob v2 column.
|
||||||
|
///
|
||||||
|
/// `Struct<data, uri>` with the `lance.blob.v2` marker. Same layout Lance
|
||||||
|
/// expects on write.
|
||||||
|
///
|
||||||
|
/// A blob column may be top-level or nested inside a struct or list. Nested
|
||||||
|
/// blobs are addressed by a dotted path (e.g. `info.blob`) in the read APIs.
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use arrow_schema::{DataType, Field, Schema};
|
||||||
|
///
|
||||||
|
/// let schema = Schema::new(vec![
|
||||||
|
/// Field::new("id", DataType::Int64, false),
|
||||||
|
/// lancedb::blob("image", true),
|
||||||
|
/// ]);
|
||||||
|
/// ```
|
||||||
|
pub fn blob(name: impl AsRef<str>, nullable: bool) -> Field {
|
||||||
|
lance::blob::blob_field(name.as_ref(), nullable)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if `field` is a blob v2 column.
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// let field = lancedb::blob("image", true);
|
||||||
|
/// assert!(lancedb::blob::is_blob(&field));
|
||||||
|
/// ```
|
||||||
|
pub fn is_blob(field: &Field) -> bool {
|
||||||
|
field.is_blob_v2()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if `field`, or any field nested under it, is a blob v2 column.
|
||||||
|
fn field_tree_has_blob_v2(field: &Field) -> bool {
|
||||||
|
if field.is_blob_v2() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
match field.data_type() {
|
||||||
|
DataType::Struct(children) => children.iter().any(|c| field_tree_has_blob_v2(c)),
|
||||||
|
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
|
||||||
|
field_tree_has_blob_v2(child)
|
||||||
|
}
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collects the dotted paths of blob v2 columns under `field`, into `paths`.
|
||||||
|
fn collect_blob_paths(field: &Field, prefix: &str, paths: &mut Vec<String>) {
|
||||||
|
let path = if prefix.is_empty() {
|
||||||
|
field.name().clone()
|
||||||
|
} else {
|
||||||
|
format!("{prefix}.{}", field.name())
|
||||||
|
};
|
||||||
|
if field.is_blob_v2() {
|
||||||
|
paths.push(path);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
match field.data_type() {
|
||||||
|
DataType::Struct(children) => {
|
||||||
|
for child in children {
|
||||||
|
collect_blob_paths(child, &path, paths);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
|
||||||
|
collect_blob_paths(child, &path, paths)
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if `schema` declares any blob v2 column, including nested ones.
|
||||||
|
pub(crate) fn has_blob_columns(schema: &Schema) -> bool {
|
||||||
|
schema.fields().iter().any(|f| field_tree_has_blob_v2(f))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Blob v2 column paths in `schema`, declaration order preserved. Nested blobs
|
||||||
|
/// are dotted paths (e.g. `info.blob`).
|
||||||
|
pub(crate) fn blob_column_names(schema: &Schema) -> Vec<String> {
|
||||||
|
let mut paths = Vec::new();
|
||||||
|
for field in schema.fields() {
|
||||||
|
collect_blob_paths(field, "", &mut paths);
|
||||||
|
}
|
||||||
|
paths
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bumps storage format to at least [`LanceFileVersion::V2_2`] for blob schemas.
|
||||||
|
pub(crate) fn ensure_blob_storage_version(schema: &Schema, params: &mut WriteParams) {
|
||||||
|
if !has_blob_columns(schema) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let resolved = params
|
||||||
|
.data_storage_version
|
||||||
|
.unwrap_or(LanceFileVersion::Stable)
|
||||||
|
.resolve();
|
||||||
|
if resolved < LanceFileVersion::V2_2 {
|
||||||
|
params.data_storage_version = Some(LanceFileVersion::V2_2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Validate that `column` exists and is a blob v2 column.
|
||||||
|
///
|
||||||
|
/// Legacy v1 columns (`lance-encoding:blob`) error with a migration hint.
|
||||||
|
pub(crate) fn ensure_blob_v2_column(
|
||||||
|
schema: &lance_core::datatypes::Schema,
|
||||||
|
column: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
match schema.field(column) {
|
||||||
|
Some(field) if field.is_blob_v2() => Ok(()),
|
||||||
|
Some(field) if field.is_blob() => Err(Error::InvalidInput {
|
||||||
|
message: format!(
|
||||||
|
"column '{column}' is a legacy blob column; blob APIs require blob v2 columns \
|
||||||
|
(ARROW:extension:name = \"lance.blob.v2\")"
|
||||||
|
),
|
||||||
|
}),
|
||||||
|
Some(_) => Err(Error::InvalidInput {
|
||||||
|
message: format!("column '{column}' is not a blob column"),
|
||||||
|
}),
|
||||||
|
None => Err(Error::InvalidInput {
|
||||||
|
message: format!("no column named '{column}' in this table"),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the leaf descriptor `StructArray` for `column` in a descriptor batch.
|
||||||
|
fn leaf_descriptor_struct<'a>(batch: &'a RecordBatch, column: &str) -> Result<&'a StructArray> {
|
||||||
|
let path = parse_field_path(column).map_err(|e| Error::InvalidInput {
|
||||||
|
message: format!("invalid blob column path '{column}': {e}"),
|
||||||
|
})?;
|
||||||
|
let not_struct = || Error::Runtime {
|
||||||
|
message: format!("blob column '{column}' did not read back as a descriptor struct"),
|
||||||
|
};
|
||||||
|
let mut current = batch
|
||||||
|
.column_by_name(&path[0])
|
||||||
|
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
|
||||||
|
.ok_or_else(not_struct)?;
|
||||||
|
for segment in &path[1..] {
|
||||||
|
current = current
|
||||||
|
.column_by_name(segment)
|
||||||
|
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
|
||||||
|
.ok_or_else(not_struct)?;
|
||||||
|
}
|
||||||
|
Ok(current)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Null rows in `row_ids`, from a descriptor take.
|
||||||
|
///
|
||||||
|
/// Lance `read_blobs` / `take_blobs` skip null rows (`kind == 0 && position == 0 && size == 0`).
|
||||||
|
/// TODO(lance): aligned read API would drop this pass.
|
||||||
|
async fn blob_null_mask(
|
||||||
|
dataset: &Arc<Dataset>,
|
||||||
|
column: &str,
|
||||||
|
row_ids: &[u64],
|
||||||
|
) -> Result<Vec<bool>> {
|
||||||
|
let projection = dataset.schema().project(&[column])?;
|
||||||
|
let descriptors = dataset.take_builder(row_ids, projection)?.execute().await?;
|
||||||
|
if descriptors.num_rows() != row_ids.len() {
|
||||||
|
return Err(Error::InvalidInput {
|
||||||
|
message: format!(
|
||||||
|
"blob take for column '{column}' requested {} row ids but only {} exist in the \
|
||||||
|
table; pass row ids collected from this table",
|
||||||
|
row_ids.len(),
|
||||||
|
descriptors.num_rows()
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let descriptor_struct = leaf_descriptor_struct(&descriptors, column)?;
|
||||||
|
let child = |name: &str| {
|
||||||
|
descriptor_struct
|
||||||
|
.column_by_name(name)
|
||||||
|
.ok_or_else(|| Error::Runtime {
|
||||||
|
message: format!("blob descriptor for '{column}' is missing the '{name}' field"),
|
||||||
|
})
|
||||||
|
};
|
||||||
|
let kinds = child("kind")?
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<UInt8Array>()
|
||||||
|
.ok_or_else(|| Error::Runtime {
|
||||||
|
message: format!("blob descriptor 'kind' for '{column}' is not a UInt8 array"),
|
||||||
|
})?;
|
||||||
|
let positions = child("position")?
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<UInt64Array>()
|
||||||
|
.ok_or_else(|| Error::Runtime {
|
||||||
|
message: format!("blob descriptor 'position' for '{column}' is not a UInt64 array"),
|
||||||
|
})?;
|
||||||
|
let sizes = child("size")?
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<UInt64Array>()
|
||||||
|
.ok_or_else(|| Error::Runtime {
|
||||||
|
message: format!("blob descriptor 'size' for '{column}' is not a UInt64 array"),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Match Lance `collect_blob_entries_v2` skip condition (`BlobKind::Inline` == 0).
|
||||||
|
Ok((0..descriptor_struct.len())
|
||||||
|
.map(|i| {
|
||||||
|
descriptor_struct.is_null(i)
|
||||||
|
|| kinds.is_null(i)
|
||||||
|
|| (kinds.value(i) == 0 && positions.value(i) == 0 && sizes.value(i) == 0)
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn non_null_row_ids(row_ids: &[u64], null_mask: &[bool]) -> Vec<u64> {
|
||||||
|
row_ids
|
||||||
|
.iter()
|
||||||
|
.zip(null_mask)
|
||||||
|
.filter_map(|(row_id, is_null)| (!is_null).then_some(*row_id))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Materialize blob bytes for `row_ids` (same length and order, nulls preserved).
|
||||||
|
pub(crate) async fn take_blobs_aligned(
|
||||||
|
dataset: &Arc<Dataset>,
|
||||||
|
column: &str,
|
||||||
|
row_ids: &[u64],
|
||||||
|
) -> Result<LargeBinaryArray> {
|
||||||
|
ensure_blob_v2_column(dataset.schema(), column)?;
|
||||||
|
if row_ids.is_empty() {
|
||||||
|
return Ok(LargeBinaryBuilder::new().finish());
|
||||||
|
}
|
||||||
|
|
||||||
|
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
|
||||||
|
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
|
||||||
|
let non_null_count = non_null_row_ids.len();
|
||||||
|
let payloads = if non_null_count == 0 {
|
||||||
|
Vec::new()
|
||||||
|
} else {
|
||||||
|
dataset
|
||||||
|
.read_blobs(column)?
|
||||||
|
.with_row_ids(non_null_row_ids)
|
||||||
|
.preserve_order(true)
|
||||||
|
.execute()
|
||||||
|
.await?
|
||||||
|
};
|
||||||
|
|
||||||
|
if payloads.len() != non_null_count {
|
||||||
|
return Err(Error::Runtime {
|
||||||
|
message: format!(
|
||||||
|
"blob read for column '{column}' returned {} payloads for {} non-null rows",
|
||||||
|
payloads.len(),
|
||||||
|
non_null_count
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut builder = LargeBinaryBuilder::new();
|
||||||
|
let mut payload_idx = 0;
|
||||||
|
for is_null in &null_mask {
|
||||||
|
if *is_null {
|
||||||
|
builder.append_null();
|
||||||
|
} else {
|
||||||
|
builder.append_value(payloads[payload_idx].data.as_ref());
|
||||||
|
payload_idx += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(builder.finish())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Open lazy [`BlobFile`] handles for `row_ids` (same length and order, nulls as `None`).
|
||||||
|
pub(crate) async fn take_blob_files_aligned(
|
||||||
|
dataset: &Arc<Dataset>,
|
||||||
|
column: &str,
|
||||||
|
row_ids: &[u64],
|
||||||
|
) -> Result<Vec<Option<BlobFile>>> {
|
||||||
|
ensure_blob_v2_column(dataset.schema(), column)?;
|
||||||
|
if row_ids.is_empty() {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
|
||||||
|
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
|
||||||
|
let handles = if non_null_row_ids.is_empty() {
|
||||||
|
Vec::new()
|
||||||
|
} else {
|
||||||
|
dataset.take_blobs(&non_null_row_ids, column).await?
|
||||||
|
};
|
||||||
|
if handles.len() != non_null_row_ids.len() {
|
||||||
|
return Err(Error::Runtime {
|
||||||
|
message: format!(
|
||||||
|
"blob take for column '{column}' returned {} handles for {} non-null rows",
|
||||||
|
handles.len(),
|
||||||
|
non_null_row_ids.len()
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut handles = handles.into_iter();
|
||||||
|
Ok(null_mask
|
||||||
|
.iter()
|
||||||
|
.map(|is_null| {
|
||||||
|
if *is_null {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(handles.next().unwrap())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use arrow_schema::DataType;
|
||||||
|
use lance_arrow::ARROW_EXT_NAME_KEY;
|
||||||
|
|
||||||
|
fn blob_schema() -> Schema {
|
||||||
|
Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob("image", true),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn blob_field_carries_v2_extension_marker() {
|
||||||
|
let field = blob("image", true);
|
||||||
|
assert_eq!(
|
||||||
|
field.metadata().get(ARROW_EXT_NAME_KEY).map(String::as_str),
|
||||||
|
Some("lance.blob.v2")
|
||||||
|
);
|
||||||
|
assert!(matches!(field.data_type(), DataType::Struct(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn has_blob_columns_detects_blob_fields() {
|
||||||
|
assert!(has_blob_columns(&blob_schema()));
|
||||||
|
let plain = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
|
||||||
|
assert!(!has_blob_columns(&plain));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn storage_version_bumps_to_v2_2() {
|
||||||
|
let mut params = WriteParams::default();
|
||||||
|
ensure_blob_storage_version(&blob_schema(), &mut params);
|
||||||
|
assert_eq!(
|
||||||
|
params.data_storage_version.unwrap().resolve(),
|
||||||
|
LanceFileVersion::V2_2
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn storage_version_overrides_lower_explicit_version() {
|
||||||
|
let mut params = WriteParams {
|
||||||
|
data_storage_version: Some(LanceFileVersion::V2_0),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
ensure_blob_storage_version(&blob_schema(), &mut params);
|
||||||
|
assert_eq!(
|
||||||
|
params.data_storage_version.unwrap().resolve(),
|
||||||
|
LanceFileVersion::V2_2
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn storage_version_keeps_higher_explicit_version() {
|
||||||
|
let mut params = WriteParams {
|
||||||
|
data_storage_version: Some(LanceFileVersion::V2_3),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
ensure_blob_storage_version(&blob_schema(), &mut params);
|
||||||
|
assert_eq!(params.data_storage_version.unwrap(), LanceFileVersion::V2_3);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn legacy_v1_blob_column_is_rejected_with_migration_hint() {
|
||||||
|
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
|
||||||
|
std::collections::HashMap::from([(
|
||||||
|
"lance-encoding:blob".to_string(),
|
||||||
|
"true".to_string(),
|
||||||
|
)]),
|
||||||
|
);
|
||||||
|
let arrow_schema = Schema::new(vec![legacy]);
|
||||||
|
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
|
||||||
|
|
||||||
|
let err = ensure_blob_v2_column(&lance_schema, "image").unwrap_err();
|
||||||
|
assert!(matches!(err, Error::InvalidInput { .. }));
|
||||||
|
assert!(err.to_string().contains("legacy blob column"));
|
||||||
|
assert!(err.to_string().contains("lance.blob.v2"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn non_blob_and_unknown_columns_are_rejected_by_name() {
|
||||||
|
let arrow_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
|
||||||
|
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
|
||||||
|
|
||||||
|
let err = ensure_blob_v2_column(&lance_schema, "id").unwrap_err();
|
||||||
|
assert!(err.to_string().contains("'id' is not a blob column"));
|
||||||
|
|
||||||
|
let err = ensure_blob_v2_column(&lance_schema, "missing").unwrap_err();
|
||||||
|
assert!(err.to_string().contains("no column named 'missing'"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn blob_column_names_includes_nested_path() {
|
||||||
|
let blob_field = blob("blob", true);
|
||||||
|
let info = Field::new(
|
||||||
|
"info",
|
||||||
|
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), info]);
|
||||||
|
assert_eq!(blob_column_names(&schema), vec!["info.blob"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn storage_version_noop_without_blob_columns() {
|
||||||
|
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
|
||||||
|
let mut params = WriteParams::default();
|
||||||
|
ensure_blob_storage_version(&schema, &mut params);
|
||||||
|
assert!(params.data_storage_version.is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,6 +32,7 @@ use crate::table::{BaseTable, WriteOptions};
|
|||||||
|
|
||||||
pub mod listing;
|
pub mod listing;
|
||||||
pub mod namespace;
|
pub mod namespace;
|
||||||
|
pub(crate) mod read_freshness;
|
||||||
|
|
||||||
pub trait DatabaseOptions {
|
pub trait DatabaseOptions {
|
||||||
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
|
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use lance_table::io::commit::commit_handler_from_url;
|
|||||||
use object_store::local::LocalFileSystem;
|
use object_store::local::LocalFileSystem;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
use crate::blob::{ensure_blob_storage_version, has_blob_columns};
|
||||||
use crate::connection::ConnectRequest;
|
use crate::connection::ConnectRequest;
|
||||||
use crate::database::ReadConsistency;
|
use crate::database::ReadConsistency;
|
||||||
use crate::database::namespace::LanceNamespaceDatabase;
|
use crate::database::namespace::LanceNamespaceDatabase;
|
||||||
@@ -838,13 +839,16 @@ impl ListingDatabase {
|
|||||||
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply enable_stable_row_ids: table-level override takes precedence over connection config
|
let data_schema = request.data.arrow_schema();
|
||||||
if let Some(enable_stable_row_ids) =
|
if let Some(enable_stable_row_ids) = stable_row_ids_override
|
||||||
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
|
.or(self.new_table_config.enable_stable_row_ids)
|
||||||
|
.or(has_blob_columns(&data_schema).then_some(true))
|
||||||
{
|
{
|
||||||
write_params.enable_stable_row_ids = enable_stable_row_ids;
|
write_params.enable_stable_row_ids = enable_stable_row_ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ensure_blob_storage_version(&data_schema, &mut write_params);
|
||||||
|
|
||||||
if matches!(&request.mode, CreateTableMode::Overwrite) {
|
if matches!(&request.mode, CreateTableMode::Overwrite) {
|
||||||
write_params.mode = WriteMode::Overwrite;
|
write_params.mode = WriteMode::Overwrite;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
//! Namespace-based database implementation that delegates table management to lance-namespace
|
//! Namespace-based database implementation that delegates table management to lance-namespace
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
|
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
|
||||||
@@ -23,12 +23,16 @@ use lance_namespace_impls::ConnectBuilder;
|
|||||||
use lance_table::io::commit::CommitHandler;
|
use lance_table::io::commit::CommitHandler;
|
||||||
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
|
||||||
|
|
||||||
|
use crate::blob::{ensure_blob_storage_version, has_blob_columns};
|
||||||
use crate::connection::NamespaceClientPushdownOperation;
|
use crate::connection::NamespaceClientPushdownOperation;
|
||||||
use crate::database::ReadConsistency;
|
use crate::database::ReadConsistency;
|
||||||
use crate::database::listing::{
|
use crate::database::listing::{
|
||||||
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
|
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
|
||||||
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
|
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
|
||||||
};
|
};
|
||||||
|
use crate::database::read_freshness::{
|
||||||
|
FreshnessBaselines, ReadFreshnessContextProvider, TableFreshness,
|
||||||
|
};
|
||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
use crate::table::{NativeTable, map_namespace_lance_error};
|
use crate::table::{NativeTable, map_namespace_lance_error};
|
||||||
use lance::dataset::WriteMode;
|
use lance::dataset::WriteMode;
|
||||||
@@ -51,6 +55,10 @@ fn is_table_already_exists_namespace_error(err: &lance::Error) -> bool {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Object-id delimiter default (matches `RestNamespaceBuilder`'s); overridable
|
||||||
|
/// via the `delimiter` property.
|
||||||
|
const DEFAULT_NAMESPACE_DELIMITER: &str = "$";
|
||||||
|
|
||||||
/// A database implementation that uses lance-namespace for table management
|
/// A database implementation that uses lance-namespace for table management
|
||||||
pub struct LanceNamespaceDatabase {
|
pub struct LanceNamespaceDatabase {
|
||||||
namespace: Arc<dyn LanceNamespace>,
|
namespace: Arc<dyn LanceNamespace>,
|
||||||
@@ -70,6 +78,17 @@ pub struct LanceNamespaceDatabase {
|
|||||||
ns_properties: HashMap<String, String>,
|
ns_properties: HashMap<String, String>,
|
||||||
// Options for tables created by this connection
|
// Options for tables created by this connection
|
||||||
new_table_config: NewTableConfig,
|
new_table_config: NewTableConfig,
|
||||||
|
// Per-table read-freshness baselines, shared with the context provider.
|
||||||
|
freshness_baselines: FreshnessBaselines,
|
||||||
|
// Delimiter for building freshness keys; see `table_freshness`.
|
||||||
|
delimiter: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn resolve_delimiter(ns_properties: &HashMap<String, String>) -> String {
|
||||||
|
ns_properties
|
||||||
|
.get("delimiter")
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_else(|| DEFAULT_NAMESPACE_DELIMITER.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LanceNamespaceDatabase {
|
impl LanceNamespaceDatabase {
|
||||||
@@ -82,6 +101,9 @@ impl LanceNamespaceDatabase {
|
|||||||
session: Option<Arc<lance::session::Session>>,
|
session: Option<Arc<lance::session::Session>>,
|
||||||
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
// Client is pre-built, so we can't install the freshness provider here;
|
||||||
|
// baselines are still tracked for a uniform bump path.
|
||||||
|
let delimiter = resolve_delimiter(&namespace_client_properties);
|
||||||
Self {
|
Self {
|
||||||
namespace: namespace_client,
|
namespace: namespace_client,
|
||||||
storage_options,
|
storage_options,
|
||||||
@@ -92,6 +114,8 @@ impl LanceNamespaceDatabase {
|
|||||||
ns_impl: namespace_client_impl,
|
ns_impl: namespace_client_impl,
|
||||||
ns_properties: namespace_client_properties,
|
ns_properties: namespace_client_properties,
|
||||||
new_table_config: NewTableConfig::default(),
|
new_table_config: NewTableConfig::default(),
|
||||||
|
freshness_baselines: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
delimiter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -136,10 +160,19 @@ impl LanceNamespaceDatabase {
|
|||||||
if let Some(ref sess) = session {
|
if let Some(ref sess) = session {
|
||||||
builder = builder.session(sess.clone());
|
builder = builder.session(sess.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Install the read-freshness provider before building the client.
|
||||||
|
let freshness_baselines: FreshnessBaselines = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
builder = builder.context_provider(Arc::new(ReadFreshnessContextProvider::new(
|
||||||
|
freshness_baselines.clone(),
|
||||||
|
read_consistency_interval,
|
||||||
|
)));
|
||||||
|
|
||||||
let namespace = builder.connect().await.map_err(|e| Error::InvalidInput {
|
let namespace = builder.connect().await.map_err(|e| Error::InvalidInput {
|
||||||
message: format!("Failed to connect to namespace: {:?}", e),
|
message: format!("Failed to connect to namespace: {:?}", e),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let delimiter = resolve_delimiter(&ns_properties);
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
namespace,
|
namespace,
|
||||||
storage_options,
|
storage_options,
|
||||||
@@ -150,9 +183,20 @@ impl LanceNamespaceDatabase {
|
|||||||
ns_impl: ns_impl.to_string(),
|
ns_impl: ns_impl.to_string(),
|
||||||
ns_properties,
|
ns_properties,
|
||||||
new_table_config,
|
new_table_config,
|
||||||
|
freshness_baselines,
|
||||||
|
delimiter,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build a table's freshness handle, keyed to match the `object_id` the
|
||||||
|
/// namespace client sends on reads (table-id parts joined by the delimiter).
|
||||||
|
fn table_freshness(&self, namespace_path: &[String], name: &str) -> TableFreshness {
|
||||||
|
let mut parts = namespace_path.to_vec();
|
||||||
|
parts.push(name.to_string());
|
||||||
|
let key = parts.join(&self.delimiter);
|
||||||
|
TableFreshness::new(self.freshness_baselines.clone(), key)
|
||||||
|
}
|
||||||
|
|
||||||
fn extract_storage_overrides(
|
fn extract_storage_overrides(
|
||||||
&self,
|
&self,
|
||||||
request: &DbCreateTableRequest,
|
request: &DbCreateTableRequest,
|
||||||
@@ -214,12 +258,16 @@ impl LanceNamespaceDatabase {
|
|||||||
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(enable_stable_row_ids) =
|
let data_schema = request.data.schema();
|
||||||
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
|
if let Some(enable_stable_row_ids) = stable_row_ids_override
|
||||||
|
.or(self.new_table_config.enable_stable_row_ids)
|
||||||
|
.or(has_blob_columns(data_schema.as_ref()).then_some(true))
|
||||||
{
|
{
|
||||||
params.enable_stable_row_ids = enable_stable_row_ids;
|
params.enable_stable_row_ids = enable_stable_row_ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ensure_blob_storage_version(data_schema.as_ref(), params);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -331,7 +379,8 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
self.pushdown_operations.clone(),
|
self.pushdown_operations.clone(),
|
||||||
self.session.clone(),
|
self.session.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?
|
||||||
|
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
|
||||||
|
|
||||||
return Ok(Arc::new(native_table));
|
return Ok(Arc::new(native_table));
|
||||||
}
|
}
|
||||||
@@ -462,7 +511,8 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
self.pushdown_operations.clone(),
|
self.pushdown_operations.clone(),
|
||||||
self.session.clone(),
|
self.session.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?
|
||||||
|
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
|
||||||
|
|
||||||
Ok(Arc::new(native_table))
|
Ok(Arc::new(native_table))
|
||||||
}
|
}
|
||||||
@@ -478,7 +528,8 @@ impl Database for LanceNamespaceDatabase {
|
|||||||
self.pushdown_operations.clone(),
|
self.pushdown_operations.clone(),
|
||||||
self.session.clone(),
|
self.session.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?
|
||||||
|
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
|
||||||
|
|
||||||
Ok(Arc::new(native_table))
|
Ok(Arc::new(native_table))
|
||||||
}
|
}
|
||||||
|
|||||||
312
rust/lancedb/src/database/read_freshness.rs
Normal file
312
rust/lancedb/src/database/read_freshness.rs
Normal file
@@ -0,0 +1,312 @@
|
|||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
//! Read-freshness signaling for the lance-namespace path.
|
||||||
|
//!
|
||||||
|
//! Against a server that serves cached table metadata up to some staleness
|
||||||
|
//! window, a handle that just wrote (or asked for the latest version via
|
||||||
|
//! `checkout_latest`) can still read a stale snapshot. To prevent that, reads
|
||||||
|
//! routed through the namespace client carry an `x-lancedb-min-timestamp`
|
||||||
|
//! header naming the oldest snapshot the caller will accept.
|
||||||
|
//!
|
||||||
|
//! This mirrors `remote::table`: a per-table baseline is bumped to "now" on
|
||||||
|
//! every write and on `checkout_latest()`, and reads send
|
||||||
|
//! `max(baseline, now - read_consistency_interval)`. Since the namespace client
|
||||||
|
//! takes no headers directly, a [`DynamicContextProvider`] injects it per request.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
|
||||||
|
|
||||||
|
/// Provider context keys prefixed with `headers.` become HTTP headers (prefix
|
||||||
|
/// stripped), so this emits the `x-lancedb-min-timestamp` header.
|
||||||
|
const MIN_TIMESTAMP_CONTEXT_KEY: &str = "headers.x-lancedb-min-timestamp";
|
||||||
|
|
||||||
|
/// Per-table freshness baselines (keyed by namespace object id), shared between
|
||||||
|
/// the provider that reads them and the table handles that bump them.
|
||||||
|
pub type FreshnessBaselines = Arc<Mutex<HashMap<String, SystemTime>>>;
|
||||||
|
|
||||||
|
/// `max(baseline, now - interval)`, or `None` when neither constraint applies.
|
||||||
|
fn compute_min_timestamp(
|
||||||
|
baseline: Option<SystemTime>,
|
||||||
|
interval: Option<Duration>,
|
||||||
|
now: SystemTime,
|
||||||
|
) -> Option<SystemTime> {
|
||||||
|
let interval_based = match interval {
|
||||||
|
None => None,
|
||||||
|
Some(d) if d.is_zero() => Some(now),
|
||||||
|
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
||||||
|
};
|
||||||
|
match (interval_based, baseline) {
|
||||||
|
(None, None) => None,
|
||||||
|
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||||
|
(Some(a), Some(b)) => Some(a.max(b)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Advance the baseline to `now`, never backwards, so a concurrent handle's
|
||||||
|
/// write can't lower a floor another handle already set.
|
||||||
|
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
|
||||||
|
match prev {
|
||||||
|
Some(p) => p.max(now),
|
||||||
|
None => now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A handle's view of the shared baseline map for a single table.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct TableFreshness {
|
||||||
|
baselines: FreshnessBaselines,
|
||||||
|
/// Namespace object id for this table (matches the read's `object_id`).
|
||||||
|
key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TableFreshness {
|
||||||
|
pub fn new(baselines: FreshnessBaselines, key: String) -> Self {
|
||||||
|
Self { baselines, key }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn bump(&self) {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let mut baselines = self.baselines.lock().unwrap();
|
||||||
|
let prev = baselines.get(&self.key).copied();
|
||||||
|
baselines.insert(self.key.clone(), next_freshness_baseline(prev, now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read ops that can be served stale and so carry the freshness floor.
|
||||||
|
/// `list_table_versions` resolves "latest" for managed-versioning tables, so it
|
||||||
|
/// is what makes `checkout_latest()` observe a prior write.
|
||||||
|
fn is_read_operation(operation: &str) -> bool {
|
||||||
|
matches!(
|
||||||
|
operation,
|
||||||
|
"describe_table" | "list_table_versions" | "query_table" | "list_tables"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Injects `x-lancedb-min-timestamp` on namespace reads, per addressed table.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ReadFreshnessContextProvider {
|
||||||
|
baselines: FreshnessBaselines,
|
||||||
|
read_consistency_interval: Option<Duration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReadFreshnessContextProvider {
|
||||||
|
pub fn new(baselines: FreshnessBaselines, read_consistency_interval: Option<Duration>) -> Self {
|
||||||
|
Self {
|
||||||
|
baselines,
|
||||||
|
read_consistency_interval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DynamicContextProvider for ReadFreshnessContextProvider {
|
||||||
|
fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
|
||||||
|
if !is_read_operation(&info.operation) {
|
||||||
|
return HashMap::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
let baseline = self.baselines.lock().unwrap().get(&info.object_id).copied();
|
||||||
|
match compute_min_timestamp(baseline, self.read_consistency_interval, SystemTime::now()) {
|
||||||
|
Some(ts) => {
|
||||||
|
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||||
|
HashMap::from([(MIN_TIMESTAMP_CONTEXT_KEY.to_string(), dt.to_rfc3339())])
|
||||||
|
}
|
||||||
|
None => HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// Allowed slop when comparing a header timestamp against a locally
|
||||||
|
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
|
||||||
|
const TOLERANCE: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
fn parse_header_ts(headers: &HashMap<String, String>) -> SystemTime {
|
||||||
|
let value = headers
|
||||||
|
.get(MIN_TIMESTAMP_CONTEXT_KEY)
|
||||||
|
.expect("expected min-timestamp context key");
|
||||||
|
chrono::DateTime::parse_from_rfc3339(value)
|
||||||
|
.unwrap()
|
||||||
|
.with_timezone(&chrono::Utc)
|
||||||
|
.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let baseline = now - Duration::from_secs(60);
|
||||||
|
|
||||||
|
// No interval, no baseline -> no header.
|
||||||
|
assert_eq!(compute_min_timestamp(None, None, now), None);
|
||||||
|
|
||||||
|
// Baseline only -> baseline.
|
||||||
|
assert_eq!(
|
||||||
|
compute_min_timestamp(Some(baseline), None, now),
|
||||||
|
Some(baseline)
|
||||||
|
);
|
||||||
|
|
||||||
|
// ZERO interval, no baseline -> now (strong consistency).
|
||||||
|
assert_eq!(
|
||||||
|
compute_min_timestamp(None, Some(Duration::ZERO), now),
|
||||||
|
Some(now)
|
||||||
|
);
|
||||||
|
|
||||||
|
// Positive interval, no baseline -> now - interval.
|
||||||
|
assert_eq!(
|
||||||
|
compute_min_timestamp(None, Some(Duration::from_secs(10)), now),
|
||||||
|
Some(now - Duration::from_secs(10))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Both: pick the more-recent (tighter) constraint.
|
||||||
|
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
||||||
|
assert_eq!(
|
||||||
|
compute_min_timestamp(Some(baseline), Some(Duration::from_secs(10)), now),
|
||||||
|
Some(now - Duration::from_secs(10))
|
||||||
|
);
|
||||||
|
|
||||||
|
// Both, baseline newer: pick baseline.
|
||||||
|
let recent_baseline = now - Duration::from_secs(5);
|
||||||
|
assert_eq!(
|
||||||
|
compute_min_timestamp(Some(recent_baseline), Some(Duration::from_secs(60)), now),
|
||||||
|
Some(recent_baseline)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_next_freshness_baseline_is_monotonic() {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let earlier = now - Duration::from_secs(30);
|
||||||
|
let later = now + Duration::from_secs(30);
|
||||||
|
|
||||||
|
// No prior baseline -> now.
|
||||||
|
assert_eq!(next_freshness_baseline(None, now), now);
|
||||||
|
// Prior baseline older than now -> now.
|
||||||
|
assert_eq!(next_freshness_baseline(Some(earlier), now), now);
|
||||||
|
// Prior baseline newer than now -> keep the newer baseline.
|
||||||
|
assert_eq!(next_freshness_baseline(Some(later), now), later);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn provider_with(
|
||||||
|
entries: &[(&str, SystemTime)],
|
||||||
|
interval: Option<Duration>,
|
||||||
|
) -> ReadFreshnessContextProvider {
|
||||||
|
let map: HashMap<String, SystemTime> =
|
||||||
|
entries.iter().map(|(k, v)| (k.to_string(), *v)).collect();
|
||||||
|
ReadFreshnessContextProvider::new(Arc::new(Mutex::new(map)), interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_provider_emits_header_at_or_after_bumped_baseline() {
|
||||||
|
// A baseline set "now" with no interval: every read op must carry a
|
||||||
|
// floor at or after that baseline. `list_table_versions` is the hook
|
||||||
|
// that makes managed-versioning `checkout_latest()` observe a write.
|
||||||
|
let baseline = SystemTime::now();
|
||||||
|
let provider = provider_with(&[("ns$tbl", baseline)], None);
|
||||||
|
|
||||||
|
// These ops are keyed by the table id, so they pick up the per-table
|
||||||
|
// baseline. (`list_tables` is keyed by the namespace, so it is covered
|
||||||
|
// separately by the interval-floor test.)
|
||||||
|
for op in ["describe_table", "list_table_versions", "query_table"] {
|
||||||
|
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
|
||||||
|
let sent = parse_header_ts(&ctx);
|
||||||
|
assert!(
|
||||||
|
sent >= baseline - TOLERANCE && sent <= baseline + TOLERANCE,
|
||||||
|
"operation {op} should carry a floor at the bumped baseline"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_provider_list_tables_uses_interval_floor_not_table_baseline() {
|
||||||
|
// `list_tables` is addressed by the namespace id, which never matches a
|
||||||
|
// per-table baseline key, so a bumped table baseline must not leak onto
|
||||||
|
// it. With no interval it sends nothing; with one it sends now-interval.
|
||||||
|
let provider = provider_with(&[("ns$tbl", SystemTime::now())], None);
|
||||||
|
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
|
||||||
|
assert!(
|
||||||
|
ctx.is_empty(),
|
||||||
|
"list_tables must not inherit a per-table baseline"
|
||||||
|
);
|
||||||
|
|
||||||
|
let interval = Duration::from_secs(30);
|
||||||
|
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(interval));
|
||||||
|
let before = SystemTime::now();
|
||||||
|
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
|
||||||
|
let after = SystemTime::now();
|
||||||
|
let sent = parse_header_ts(&ctx);
|
||||||
|
assert!(
|
||||||
|
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
|
||||||
|
"list_tables should carry the interval floor"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_provider_no_header_for_empty_baseline_and_no_interval() {
|
||||||
|
// Manual consistency (no interval) on a table that was never bumped:
|
||||||
|
// no floor, so the server may serve from cache.
|
||||||
|
let provider = provider_with(&[], None);
|
||||||
|
let ctx = provider.provide_context(&OperationInfo::new("describe_table", "ns$tbl"));
|
||||||
|
assert!(ctx.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_provider_interval_floor_applies_without_baseline() {
|
||||||
|
// With a consistency interval and no baseline, the floor is now-interval.
|
||||||
|
let interval = Duration::from_secs(30);
|
||||||
|
let provider = provider_with(&[], Some(interval));
|
||||||
|
|
||||||
|
let before = SystemTime::now();
|
||||||
|
let ctx = provider.provide_context(&OperationInfo::new("query_table", "ns$tbl"));
|
||||||
|
let after = SystemTime::now();
|
||||||
|
|
||||||
|
let sent = parse_header_ts(&ctx);
|
||||||
|
assert!(
|
||||||
|
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
|
||||||
|
"expected floor at roughly now - interval"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_provider_non_read_ops_emit_nothing() {
|
||||||
|
// Even with a fresh baseline and a zero interval, a non-read operation
|
||||||
|
// (which establishes rather than consumes a baseline) sends no header.
|
||||||
|
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(Duration::ZERO));
|
||||||
|
for op in [
|
||||||
|
"create_table",
|
||||||
|
"register_table",
|
||||||
|
"drop_table",
|
||||||
|
"rename_table",
|
||||||
|
// Pinned to an immutable version, so it cannot be served stale.
|
||||||
|
"describe_table_version",
|
||||||
|
] {
|
||||||
|
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
|
||||||
|
assert!(
|
||||||
|
ctx.is_empty(),
|
||||||
|
"operation {op} must not send a freshness header"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_provider_uses_per_table_baseline() {
|
||||||
|
// The floor is looked up by object id, so an unrelated table's baseline
|
||||||
|
// does not leak onto another table's read.
|
||||||
|
let baseline = SystemTime::now();
|
||||||
|
let provider = provider_with(&[("ns$has_baseline", baseline)], None);
|
||||||
|
|
||||||
|
// The bumped table gets a header.
|
||||||
|
let hit =
|
||||||
|
provider.provide_context(&OperationInfo::new("describe_table", "ns$has_baseline"));
|
||||||
|
assert!(!hit.is_empty());
|
||||||
|
|
||||||
|
// A different table with no baseline (and no interval) gets nothing.
|
||||||
|
let miss = provider.provide_context(&OperationInfo::new("describe_table", "ns$other"));
|
||||||
|
assert!(miss.is_empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,7 +13,7 @@ use serde_json::{Value, json};
|
|||||||
use super::EmbeddingFunction;
|
use super::EmbeddingFunction;
|
||||||
use crate::{Error, Result};
|
use crate::{Error, Result};
|
||||||
|
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::{Handle, RuntimeFlavor};
|
||||||
use tokio::task::block_in_place;
|
use tokio::task::block_in_place;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -148,6 +148,12 @@ impl BedrockEmbeddingFunction {
|
|||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Bedrock's SDK is async but this trait method is synchronous, so we
|
||||||
|
// bridge with `block_in_place` + `block_on`. That requires a
|
||||||
|
// multi-threaded Tokio runtime; return a typed error instead of
|
||||||
|
// panicking when no compatible runtime is available.
|
||||||
|
let handle = current_multi_thread_handle()?;
|
||||||
|
|
||||||
for text in texts {
|
for text in texts {
|
||||||
let request_body = match self.model {
|
let request_body = match self.model {
|
||||||
BedrockEmbeddingModel::TitanEmbedding => {
|
BedrockEmbeddingModel::TitanEmbedding => {
|
||||||
@@ -163,24 +169,28 @@ impl BedrockEmbeddingFunction {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Serialize before entering the blocking section so a serialization
|
||||||
|
// failure surfaces as a typed error rather than an `unwrap` panic.
|
||||||
|
let body = serde_json::to_vec(&request_body).map_err(|e| Error::Runtime {
|
||||||
|
message: format!("Failed to serialize Bedrock request: {e}"),
|
||||||
|
})?;
|
||||||
|
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
let model_id = self.model.model_id().to_string();
|
let model_id = self.model.model_id().to_string();
|
||||||
let request_body = request_body.clone();
|
|
||||||
|
|
||||||
let response = block_in_place(move || {
|
let response = block_in_place(|| {
|
||||||
Handle::current().block_on(async move {
|
handle.block_on(async move {
|
||||||
client
|
client
|
||||||
.invoke_model()
|
.invoke_model()
|
||||||
.model_id(model_id)
|
.model_id(model_id)
|
||||||
.body(aws_sdk_bedrockruntime::primitives::Blob::new(
|
.body(aws_sdk_bedrockruntime::primitives::Blob::new(body))
|
||||||
serde_json::to_vec(&request_body).unwrap(),
|
|
||||||
))
|
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.map_err(Box::new)
|
.map_err(|e| Error::Runtime {
|
||||||
|
message: format!("Bedrock invoke_model request failed: {e}"),
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})?;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let response_json: Value =
|
let response_json: Value =
|
||||||
serde_json::from_slice(response.body.as_ref()).map_err(|e| Error::Runtime {
|
serde_json::from_slice(response.body.as_ref()).map_err(|e| Error::Runtime {
|
||||||
@@ -188,22 +198,12 @@ impl BedrockEmbeddingFunction {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
let embedding = match self.model {
|
let embedding = match self.model {
|
||||||
BedrockEmbeddingModel::TitanEmbedding => response_json["embedding"]
|
BedrockEmbeddingModel::TitanEmbedding => {
|
||||||
.as_array()
|
json_array_to_f32(&response_json["embedding"], "embedding")?
|
||||||
.ok_or_else(|| Error::Runtime {
|
}
|
||||||
message: "Missing embedding in response".to_string(),
|
BedrockEmbeddingModel::CohereLarge => {
|
||||||
})?
|
json_array_to_f32(&response_json["embeddings"][0], "embeddings")?
|
||||||
.iter()
|
}
|
||||||
.map(|v| v.as_f64().unwrap() as f32)
|
|
||||||
.collect::<Vec<f32>>(),
|
|
||||||
BedrockEmbeddingModel::CohereLarge => response_json["embeddings"][0]
|
|
||||||
.as_array()
|
|
||||||
.ok_or_else(|| Error::Runtime {
|
|
||||||
message: "Missing embeddings in response".to_string(),
|
|
||||||
})?
|
|
||||||
.iter()
|
|
||||||
.map(|v| v.as_f64().unwrap() as f32)
|
|
||||||
.collect::<Vec<f32>>(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
builder.append_slice(&embedding);
|
builder.append_slice(&embedding);
|
||||||
@@ -212,3 +212,86 @@ impl BedrockEmbeddingFunction {
|
|||||||
Ok(builder.finish())
|
Ok(builder.finish())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a handle to the current multi-threaded Tokio runtime, or a typed
|
||||||
|
/// [`Error::Runtime`] when called outside a runtime or on the current-thread
|
||||||
|
/// runtime. This keeps the synchronous-over-async bridge in
|
||||||
|
/// [`BedrockEmbeddingFunction::compute_inner`] from panicking on runtime
|
||||||
|
/// configurations that cannot support `block_in_place`.
|
||||||
|
fn current_multi_thread_handle() -> Result<Handle> {
|
||||||
|
let handle = Handle::try_current().map_err(|e| Error::Runtime {
|
||||||
|
message: format!("Bedrock embedding must be called from within a Tokio runtime: {e}"),
|
||||||
|
})?;
|
||||||
|
if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
|
||||||
|
return Err(Error::Runtime {
|
||||||
|
message: "Bedrock embedding requires a multi-threaded Tokio runtime; the \
|
||||||
|
current-thread runtime cannot use `block_in_place`"
|
||||||
|
.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(handle)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Converts a JSON value expected to be an array of numbers into `Vec<f32>`.
|
||||||
|
///
|
||||||
|
/// Returns a typed [`Error::Runtime`] (rather than panicking) when the value is
|
||||||
|
/// not an array or contains a non-numeric element, so malformed provider
|
||||||
|
/// responses degrade gracefully.
|
||||||
|
fn json_array_to_f32(value: &Value, field: &str) -> Result<Vec<f32>> {
|
||||||
|
let arr = value.as_array().ok_or_else(|| Error::Runtime {
|
||||||
|
message: format!("Missing or non-array '{field}' field in Bedrock response"),
|
||||||
|
})?;
|
||||||
|
arr.iter()
|
||||||
|
.map(|v| {
|
||||||
|
v.as_f64().map(|f| f as f32).ok_or_else(|| Error::Runtime {
|
||||||
|
message: format!("Non-numeric value in Bedrock '{field}' embedding: {v}"),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn json_array_to_f32_parses_numbers() {
|
||||||
|
let v = json!([1.0, 2, -3.5]);
|
||||||
|
let out = json_array_to_f32(&v, "embedding").unwrap();
|
||||||
|
assert_eq!(out, vec![1.0_f32, 2.0, -3.5]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn json_array_to_f32_rejects_non_array() {
|
||||||
|
// Missing field indexes to `Value::Null`; a malformed payload should be
|
||||||
|
// a typed error, not a panic.
|
||||||
|
let v = json!({"unexpected": "shape"});
|
||||||
|
let err = json_array_to_f32(&v["embedding"], "embedding").unwrap_err();
|
||||||
|
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn json_array_to_f32_rejects_non_numeric_element() {
|
||||||
|
let v = json!([1.0, "not-a-number", 3.0]);
|
||||||
|
let err = json_array_to_f32(&v, "embedding").unwrap_err();
|
||||||
|
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_errors_without_runtime() {
|
||||||
|
// No Tokio runtime in scope -> typed error instead of a panic.
|
||||||
|
let err = current_multi_thread_handle().unwrap_err();
|
||||||
|
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
async fn handle_errors_on_current_thread_runtime() {
|
||||||
|
let err = current_multi_thread_handle().unwrap_err();
|
||||||
|
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn handle_ok_on_multi_thread_runtime() {
|
||||||
|
current_multi_thread_handle().expect("multi-threaded runtime should be accepted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -163,6 +163,7 @@
|
|||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
pub mod arrow;
|
pub mod arrow;
|
||||||
|
pub mod blob;
|
||||||
pub mod connection;
|
pub mod connection;
|
||||||
pub mod data;
|
pub mod data;
|
||||||
pub mod database;
|
pub mod database;
|
||||||
@@ -188,6 +189,7 @@ use std::{fmt::Display, str::FromStr};
|
|||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
pub use blob::{blob, is_blob};
|
||||||
pub use connection::{ConnectNamespaceBuilder, Connection};
|
pub use connection::{ConnectNamespaceBuilder, Connection};
|
||||||
pub use error::{Error, Result};
|
pub use error::{Error, Result};
|
||||||
use lance_index::vector::ApproxMode as LanceApproxMode;
|
use lance_index::vector::ApproxMode as LanceApproxMode;
|
||||||
|
|||||||
@@ -1352,6 +1352,35 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deserialize an index's `created_at` field.
|
||||||
|
///
|
||||||
|
/// The server returns this as an RFC 3339 string (e.g. `"2026-06-18T21:37:36.637Z"`),
|
||||||
|
/// but older deployments sent a unix timestamp in milliseconds. Accept both so the
|
||||||
|
/// client works against any server version.
|
||||||
|
fn deserialize_created_at<'de, D>(
|
||||||
|
deserializer: D,
|
||||||
|
) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
|
||||||
|
where
|
||||||
|
D: serde::Deserializer<'de>,
|
||||||
|
{
|
||||||
|
use serde::de::Error as _;
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(untagged)]
|
||||||
|
enum CreatedAt {
|
||||||
|
Rfc3339(String),
|
||||||
|
Millis(i64),
|
||||||
|
}
|
||||||
|
|
||||||
|
match Option::<CreatedAt>::deserialize(deserializer)? {
|
||||||
|
None => Ok(None),
|
||||||
|
Some(CreatedAt::Rfc3339(s)) => DateTime::parse_from_rfc3339(&s)
|
||||||
|
.map(|dt| Some(dt.with_timezone(&Utc)))
|
||||||
|
.map_err(D::Error::custom),
|
||||||
|
Some(CreatedAt::Millis(ms)) => Ok(DateTime::from_timestamp_millis(ms)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<S: HttpSend + 'static> RemoteTable<S> {
|
impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||||
/// Parse the response from `/index/list/` into `IndexConfig` entries.
|
/// Parse the response from `/index/list/` into `IndexConfig` entries.
|
||||||
///
|
///
|
||||||
@@ -1380,7 +1409,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
|||||||
// Used as the sentinel to decide whether to skip the stats call.
|
// Used as the sentinel to decide whether to skip the stats call.
|
||||||
index_type: Option<IndexType>,
|
index_type: Option<IndexType>,
|
||||||
index_uuid: Option<String>,
|
index_uuid: Option<String>,
|
||||||
#[serde(default, with = "chrono::serde::ts_milliseconds_option")]
|
#[serde(default, deserialize_with = "deserialize_created_at")]
|
||||||
created_at: Option<DateTime<Utc>>,
|
created_at: Option<DateTime<Utc>>,
|
||||||
num_indexed_rows: Option<u64>,
|
num_indexed_rows: Option<u64>,
|
||||||
num_unindexed_rows: Option<u64>,
|
num_unindexed_rows: Option<u64>,
|
||||||
@@ -4678,7 +4707,7 @@ mod tests {
|
|||||||
"num_segments": 2,
|
"num_segments": 2,
|
||||||
"index_version": 1,
|
"index_version": 1,
|
||||||
"index_details": "{\"num_partitions\":16}",
|
"index_details": "{\"num_partitions\":16}",
|
||||||
"created_at": 1700000000000i64,
|
"created_at": "2026-06-18T21:37:36.637Z",
|
||||||
"type_url": "type.googleapis.com/lance.index.vector.IvfPq",
|
"type_url": "type.googleapis.com/lance.index.vector.IvfPq",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -4728,7 +4757,10 @@ mod tests {
|
|||||||
vec_idx.type_url,
|
vec_idx.type_url,
|
||||||
Some("type.googleapis.com/lance.index.vector.IvfPq".to_string())
|
Some("type.googleapis.com/lance.index.vector.IvfPq".to_string())
|
||||||
);
|
);
|
||||||
assert!(vec_idx.created_at.is_some());
|
assert_eq!(
|
||||||
|
vec_idx.created_at,
|
||||||
|
Some("2026-06-18T21:37:36.637Z".parse::<DateTime<Utc>>().unwrap())
|
||||||
|
);
|
||||||
|
|
||||||
let text_idx = &indices[1];
|
let text_idx = &indices[1];
|
||||||
assert_eq!(text_idx.name, "text_idx");
|
assert_eq!(text_idx.name, "text_idx");
|
||||||
@@ -4749,6 +4781,36 @@ mod tests {
|
|||||||
assert_eq!(text_idx.created_at, None);
|
assert_eq!(text_idx.created_at, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deserialize_created_at() {
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct Wrapper {
|
||||||
|
#[serde(default, deserialize_with = "deserialize_created_at")]
|
||||||
|
created_at: Option<DateTime<Utc>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// RFC 3339 string (current server format).
|
||||||
|
let w: Wrapper =
|
||||||
|
serde_json::from_str(r#"{"created_at": "2026-06-18T21:37:36.637Z"}"#).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
w.created_at,
|
||||||
|
Some("2026-06-18T21:37:36.637Z".parse::<DateTime<Utc>>().unwrap())
|
||||||
|
);
|
||||||
|
|
||||||
|
// Unix milliseconds (legacy server format).
|
||||||
|
let w: Wrapper = serde_json::from_str(r#"{"created_at": 1700000000000}"#).unwrap();
|
||||||
|
assert_eq!(w.created_at, DateTime::from_timestamp_millis(1700000000000));
|
||||||
|
|
||||||
|
// Null and missing both yield None.
|
||||||
|
let w: Wrapper = serde_json::from_str(r#"{"created_at": null}"#).unwrap();
|
||||||
|
assert_eq!(w.created_at, None);
|
||||||
|
let w: Wrapper = serde_json::from_str(r#"{}"#).unwrap();
|
||||||
|
assert_eq!(w.created_at, None);
|
||||||
|
|
||||||
|
// A malformed string is rejected rather than silently dropped to None.
|
||||||
|
assert!(serde_json::from_str::<Wrapper>(r#"{"created_at": "not-a-date"}"#).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_list_versions() {
|
async fn test_list_versions() {
|
||||||
let table = Table::new_with_handler("my_table", |request| {
|
let table = Table::new_with_handler("my_table", |request| {
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
|
|
||||||
//! LanceDB Table APIs
|
//! LanceDB Table APIs
|
||||||
|
|
||||||
use arrow_array::{RecordBatch, RecordBatchReader};
|
use arrow_array::{LargeBinaryArray, RecordBatch, RecordBatchReader};
|
||||||
use arrow_schema::{Schema, SchemaRef};
|
use arrow_schema::{Schema, SchemaRef};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use datafusion_execution::TaskContext;
|
use datafusion_execution::TaskContext;
|
||||||
@@ -12,6 +12,7 @@ use datafusion_physical_plan::ExecutionPlan;
|
|||||||
use datafusion_physical_plan::display::DisplayableExecutionPlan;
|
use datafusion_physical_plan::display::DisplayableExecutionPlan;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
|
use lance::dataset::BlobFile;
|
||||||
pub use lance::dataset::ColumnAlteration;
|
pub use lance::dataset::ColumnAlteration;
|
||||||
pub use lance::dataset::NewColumnTransform;
|
pub use lance::dataset::NewColumnTransform;
|
||||||
pub use lance::dataset::ReadParams;
|
pub use lance::dataset::ReadParams;
|
||||||
@@ -43,6 +44,7 @@ use crate::connection::NamespaceClientPushdownOperation;
|
|||||||
|
|
||||||
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
|
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
|
||||||
use crate::database::Database;
|
use crate::database::Database;
|
||||||
|
use crate::database::read_freshness::TableFreshness;
|
||||||
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
|
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
|
||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
use crate::index::IndexStatistics;
|
use crate::index::IndexStatistics;
|
||||||
@@ -586,6 +588,28 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
|||||||
async fn close_lsm_writers(&self) -> Result<()> {
|
async fn close_lsm_writers(&self) -> Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
/// Names of the blob v2 columns in this table, in declaration order.
|
||||||
|
async fn blob_columns(&self) -> Result<Vec<String>> {
|
||||||
|
Err(Error::NotSupported {
|
||||||
|
message: "blob_columns is not supported on this table type".into(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
/// Materialize blob bytes for the given row ids. See [`Table::fetch_blobs`].
|
||||||
|
async fn fetch_blobs(&self, _column: &str, _row_ids: &[u64]) -> Result<LargeBinaryArray> {
|
||||||
|
Err(Error::NotSupported {
|
||||||
|
message: "fetch_blobs is not supported on this table type".into(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
/// Open lazy blob handles for the given row ids. See [`Table::fetch_blob_files`].
|
||||||
|
async fn fetch_blob_files(
|
||||||
|
&self,
|
||||||
|
_column: &str,
|
||||||
|
_row_ids: &[u64],
|
||||||
|
) -> Result<Vec<Option<BlobFile>>> {
|
||||||
|
Err(Error::NotSupported {
|
||||||
|
message: "fetch_blob_files is not supported on this table type".into(),
|
||||||
|
})
|
||||||
|
}
|
||||||
/// Gets the table tag manager.
|
/// Gets the table tag manager.
|
||||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||||
/// Optimize the dataset.
|
/// Optimize the dataset.
|
||||||
@@ -926,6 +950,76 @@ impl Table {
|
|||||||
self.inner.count_rows(filter.map(Filter::Sql)).await
|
self.inner.count_rows(filter.map(Filter::Sql)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Names of the blob v2 columns in this table, in declaration order.
|
||||||
|
///
|
||||||
|
/// Nested blobs use dotted paths (e.g. `info.blob`). Returns
|
||||||
|
/// [`Error::NotSupported`] on table types without blob support.
|
||||||
|
pub async fn blob_columns(&self) -> Result<Vec<String>> {
|
||||||
|
self.inner.blob_columns().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Materialize blob bytes for the given row ids.
|
||||||
|
///
|
||||||
|
/// Output matches `row_ids` in length and order. Null and zero-length rows
|
||||||
|
/// are null. Prefer [`Self::fetch_blob_files`] for large selections.
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use arrow_array::UInt64Array;
|
||||||
|
/// use futures::TryStreamExt;
|
||||||
|
/// use lancedb::query::{ExecutableQuery, QueryBase};
|
||||||
|
///
|
||||||
|
/// # use lancedb::Table;
|
||||||
|
/// # async fn materialize(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
/// let mut stream = table.query().with_row_id().limit(10).execute().await?;
|
||||||
|
/// while let Some(batch) = stream.try_next().await? {
|
||||||
|
/// let row_ids = batch
|
||||||
|
/// .column_by_name("_rowid")
|
||||||
|
/// .unwrap()
|
||||||
|
/// .as_any()
|
||||||
|
/// .downcast_ref::<UInt64Array>()
|
||||||
|
/// .unwrap();
|
||||||
|
/// let images = table.fetch_blobs("image", row_ids.values()).await?;
|
||||||
|
/// let _ = images;
|
||||||
|
/// }
|
||||||
|
/// # Ok(())
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// Returns [`Error::InvalidInput`] when the column does not exist or is
|
||||||
|
/// not a blob v2 column, and [`Error::NotSupported`] on table types
|
||||||
|
/// without blob support.
|
||||||
|
pub async fn fetch_blobs(
|
||||||
|
&self,
|
||||||
|
column: impl AsRef<str>,
|
||||||
|
row_ids: &[u64],
|
||||||
|
) -> Result<LargeBinaryArray> {
|
||||||
|
self.inner.fetch_blobs(column.as_ref(), row_ids).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Open lazy [`BlobFile`] handles for the given row ids.
|
||||||
|
///
|
||||||
|
/// Same length and order as `row_ids`. Null rows are `None`. Bytes are not
|
||||||
|
/// read from disk until a call to [`BlobFile::read`].
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # use lancedb::Table;
|
||||||
|
/// # async fn lazy_read(table: &Table, row_ids: &[u64]) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
/// let handles = table.fetch_blob_files("image", row_ids).await?;
|
||||||
|
/// if let Some(Some(first)) = handles.first() {
|
||||||
|
/// let bytes = first.read().await?;
|
||||||
|
/// println!("first blob is {} bytes", bytes.len());
|
||||||
|
/// }
|
||||||
|
/// # Ok(())
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
pub async fn fetch_blob_files(
|
||||||
|
&self,
|
||||||
|
column: impl AsRef<str>,
|
||||||
|
row_ids: &[u64],
|
||||||
|
) -> Result<Vec<Option<BlobFile>>> {
|
||||||
|
self.inner.fetch_blob_files(column.as_ref(), row_ids).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Insert new records into this Table
|
/// Insert new records into this Table
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
@@ -1763,6 +1857,8 @@ pub struct NativeTable {
|
|||||||
// Operations to push down to the namespace server.
|
// Operations to push down to the namespace server.
|
||||||
// pub(crate) so query.rs can access the field for server-side query execution.
|
// pub(crate) so query.rs can access the field for server-side query execution.
|
||||||
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||||
|
// Read-freshness baseline; `Some` only for namespace-backed tables.
|
||||||
|
freshness: Option<TableFreshness>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for NativeTable {
|
impl std::fmt::Debug for NativeTable {
|
||||||
@@ -1923,6 +2019,7 @@ impl NativeTable {
|
|||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
namespace_client,
|
namespace_client,
|
||||||
pushdown_operations,
|
pushdown_operations,
|
||||||
|
freshness: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1934,6 +2031,12 @@ impl NativeTable {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach the read-freshness baseline handle (namespace connections only).
|
||||||
|
pub(crate) fn with_freshness(mut self, freshness: TableFreshness) -> Self {
|
||||||
|
self.freshness = Some(freshness);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Build a sibling `NativeTable` with the same identity but a different
|
/// Build a sibling `NativeTable` with the same identity but a different
|
||||||
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
|
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
|
||||||
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
|
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
|
||||||
@@ -1946,6 +2049,14 @@ impl NativeTable {
|
|||||||
read_consistency_interval: self.read_consistency_interval,
|
read_consistency_interval: self.read_consistency_interval,
|
||||||
namespace_client: self.namespace_client.clone(),
|
namespace_client: self.namespace_client.clone(),
|
||||||
pushdown_operations: self.pushdown_operations.clone(),
|
pushdown_operations: self.pushdown_operations.clone(),
|
||||||
|
freshness: self.freshness.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bump the read-freshness baseline; no-op for non-namespace tables.
|
||||||
|
fn bump_freshness(&self) {
|
||||||
|
if let Some(freshness) = &self.freshness {
|
||||||
|
freshness.bump();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2045,6 +2156,7 @@ impl NativeTable {
|
|||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
namespace_client: stored_namespace_client,
|
namespace_client: stored_namespace_client,
|
||||||
pushdown_operations,
|
pushdown_operations,
|
||||||
|
freshness: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2134,6 +2246,7 @@ impl NativeTable {
|
|||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
namespace_client,
|
namespace_client,
|
||||||
pushdown_operations,
|
pushdown_operations,
|
||||||
|
freshness: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2265,6 +2378,7 @@ impl NativeTable {
|
|||||||
read_consistency_interval,
|
read_consistency_interval,
|
||||||
namespace_client: stored_namespace_client,
|
namespace_client: stored_namespace_client,
|
||||||
pushdown_operations,
|
pushdown_operations,
|
||||||
|
freshness: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2424,6 +2538,8 @@ impl BaseTable for NativeTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn checkout_latest(&self) -> Result<()> {
|
async fn checkout_latest(&self) -> Result<()> {
|
||||||
|
// Bump before resolving "latest" so that request carries the floor.
|
||||||
|
self.bump_freshness();
|
||||||
self.dataset.as_latest().await?;
|
self.dataset.as_latest().await?;
|
||||||
self.dataset.reload().await
|
self.dataset.reload().await
|
||||||
}
|
}
|
||||||
@@ -2511,6 +2627,8 @@ impl BaseTable for NativeTable {
|
|||||||
debug_assert_eq!(dataset.version().version, version);
|
debug_assert_eq!(dataset.version().version, version);
|
||||||
dataset.restore().await?;
|
dataset.restore().await?;
|
||||||
}
|
}
|
||||||
|
// Restore moves "latest", so bump before resolving it (as RemoteTable does).
|
||||||
|
self.bump_freshness();
|
||||||
self.dataset.as_latest().await?;
|
self.dataset.as_latest().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -2591,7 +2709,13 @@ impl BaseTable for NativeTable {
|
|||||||
output.plan
|
output.plan
|
||||||
};
|
};
|
||||||
|
|
||||||
let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
|
let insert_exec = Arc::new(InsertExec::new_with_tracker(
|
||||||
|
ds_wrapper.clone(),
|
||||||
|
ds,
|
||||||
|
plan,
|
||||||
|
lance_params,
|
||||||
|
output.tracker.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
let tracker_for_tasks = output.tracker.clone();
|
let tracker_for_tasks = output.tracker.clone();
|
||||||
if let Some(ref t) = tracker_for_tasks {
|
if let Some(ref t) = tracker_for_tasks {
|
||||||
@@ -2624,6 +2748,7 @@ impl BaseTable for NativeTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let version = ds_wrapper.get().await?.manifest().version;
|
let version = ds_wrapper.get().await?.manifest().version;
|
||||||
|
self.bump_freshness();
|
||||||
Ok(AddResult { version })
|
Ok(AddResult { version })
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2674,7 +2799,9 @@ impl BaseTable for NativeTable {
|
|||||||
|
|
||||||
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
|
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
|
||||||
// Delegate to the submodule implementation
|
// Delegate to the submodule implementation
|
||||||
update::execute_update(self, update).await
|
let result = update::execute_update(self, update).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_plan(
|
async fn create_plan(
|
||||||
@@ -2706,7 +2833,9 @@ impl BaseTable for NativeTable {
|
|||||||
params: MergeInsertBuilder,
|
params: MergeInsertBuilder,
|
||||||
new_data: Box<dyn RecordBatchReader + Send>,
|
new_data: Box<dyn RecordBatchReader + Send>,
|
||||||
) -> Result<MergeResult> {
|
) -> Result<MergeResult> {
|
||||||
merge::execute_merge_insert(self, params, new_data).await
|
let result = merge::execute_merge_insert(self, params, new_data).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
|
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
|
||||||
@@ -2725,9 +2854,30 @@ impl BaseTable for NativeTable {
|
|||||||
merge::lsm::close_lsm_writers(self).await
|
merge::lsm::close_lsm_writers(self).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn blob_columns(&self) -> Result<Vec<String>> {
|
||||||
|
let schema = self.schema().await?;
|
||||||
|
Ok(crate::blob::blob_column_names(schema.as_ref()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_blobs(&self, column: &str, row_ids: &[u64]) -> Result<LargeBinaryArray> {
|
||||||
|
let dataset = self.dataset.get().await?;
|
||||||
|
crate::blob::take_blobs_aligned(&dataset, column, row_ids).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_blob_files(
|
||||||
|
&self,
|
||||||
|
column: &str,
|
||||||
|
row_ids: &[u64],
|
||||||
|
) -> Result<Vec<Option<BlobFile>>> {
|
||||||
|
let dataset = self.dataset.get().await?;
|
||||||
|
crate::blob::take_blob_files_aligned(&dataset, column, row_ids).await
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete rows from the table
|
/// Delete rows from the table
|
||||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
||||||
delete::execute_delete(self, predicate).await
|
let result = delete::execute_delete(self, predicate).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||||
@@ -2746,22 +2896,30 @@ impl BaseTable for NativeTable {
|
|||||||
transforms: NewColumnTransform,
|
transforms: NewColumnTransform,
|
||||||
read_columns: Option<Vec<String>>,
|
read_columns: Option<Vec<String>>,
|
||||||
) -> Result<AddColumnsResult> {
|
) -> Result<AddColumnsResult> {
|
||||||
schema_evolution::execute_add_columns(self, transforms, read_columns).await
|
let result = schema_evolution::execute_add_columns(self, transforms, read_columns).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
|
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
|
||||||
schema_evolution::execute_alter_columns(self, alterations).await
|
let result = schema_evolution::execute_alter_columns(self, alterations).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_field_metadata(
|
async fn update_field_metadata(
|
||||||
&self,
|
&self,
|
||||||
updates: &[FieldMetadataUpdate],
|
updates: &[FieldMetadataUpdate],
|
||||||
) -> Result<UpdateFieldMetadataResult> {
|
) -> Result<UpdateFieldMetadataResult> {
|
||||||
schema_evolution::execute_update_field_metadata(self, updates).await
|
let result = schema_evolution::execute_update_field_metadata(self, updates).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||||
schema_evolution::execute_drop_columns(self, columns).await
|
let result = schema_evolution::execute_drop_columns(self, columns).await?;
|
||||||
|
self.bump_freshness();
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||||
|
|||||||
@@ -26,6 +26,9 @@ pub enum AddDataMode {
|
|||||||
#[default]
|
#[default]
|
||||||
Append,
|
Append,
|
||||||
/// The existing table will be overwritten with the new data
|
/// The existing table will be overwritten with the new data
|
||||||
|
///
|
||||||
|
/// On overwrite, raw binary is not coerced into a blob struct. The input
|
||||||
|
/// must declare blob v2 for the column to stay a blob column.
|
||||||
Overwrite,
|
Overwrite,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
|
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
|
||||||
|
|
||||||
|
mod blob_coerce;
|
||||||
pub mod cast;
|
pub mod cast;
|
||||||
pub mod insert;
|
pub mod insert;
|
||||||
pub mod reject_nan;
|
pub mod reject_nan;
|
||||||
|
|||||||
495
rust/lancedb/src/table/datafusion/blob_coerce.rs
Normal file
495
rust/lancedb/src/table/datafusion/blob_coerce.rs
Normal file
@@ -0,0 +1,495 @@
|
|||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
//! Coerces write-path input into blob v2 struct columns.
|
||||||
|
//!
|
||||||
|
//! [`super::cast::cast_to_table_schema`] calls [`coerce_blob_expr`].
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrow_schema::{DataType, Field, FieldRef};
|
||||||
|
use datafusion::functions::core::{get_field, named_struct};
|
||||||
|
use datafusion_common::ScalarValue;
|
||||||
|
use datafusion_common::config::ConfigOptions;
|
||||||
|
use datafusion_physical_expr::ScalarFunctionExpr;
|
||||||
|
use datafusion_physical_expr::expressions::{CastExpr, Literal};
|
||||||
|
use datafusion_physical_plan::PhysicalExpr;
|
||||||
|
|
||||||
|
use crate::error::{Error, Result};
|
||||||
|
|
||||||
|
/// Build a projection expression coercing `input_expr` into the blob struct
|
||||||
|
/// declared by `table_field`, composing `named_struct` / `get_field` / `cast`.
|
||||||
|
pub(super) fn coerce_blob_expr(
|
||||||
|
input_expr: Arc<dyn PhysicalExpr>,
|
||||||
|
input_field: &Field,
|
||||||
|
table_field: &FieldRef,
|
||||||
|
config: &Arc<ConfigOptions>,
|
||||||
|
) -> Result<(Arc<dyn PhysicalExpr>, FieldRef)> {
|
||||||
|
let DataType::Struct(declared_fields) = table_field.data_type() else {
|
||||||
|
return Err(Error::InvalidInput {
|
||||||
|
message: format!(
|
||||||
|
"blob v2 column '{}' must be a struct, table declares {}",
|
||||||
|
table_field.name(),
|
||||||
|
table_field.data_type()
|
||||||
|
),
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
let input_struct_children = match input_field.data_type() {
|
||||||
|
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => None,
|
||||||
|
DataType::Struct(children) => {
|
||||||
|
if !children
|
||||||
|
.iter()
|
||||||
|
.any(|c| c.name() == "data" || c.name() == "uri")
|
||||||
|
{
|
||||||
|
return Err(Error::InvalidInput {
|
||||||
|
message: format!(
|
||||||
|
"blob struct input for column '{}' must contain a 'data' or 'uri' child",
|
||||||
|
table_field.name()
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Some(children)
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
return Err(Error::InvalidInput {
|
||||||
|
message: format!(
|
||||||
|
"cannot coerce column '{}' with type {} into a blob v2 struct. \
|
||||||
|
expected Binary, LargeBinary, BinaryView, or a Struct with a 'data' or 'uri' child",
|
||||||
|
table_field.name(),
|
||||||
|
other,
|
||||||
|
),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut ns_args: Vec<Arc<dyn PhysicalExpr>> = Vec::with_capacity(declared_fields.len() * 2);
|
||||||
|
for declared in declared_fields.iter() {
|
||||||
|
ns_args.push(Arc::new(Literal::new(ScalarValue::from(
|
||||||
|
declared.name().as_str(),
|
||||||
|
))));
|
||||||
|
|
||||||
|
let value: Arc<dyn PhysicalExpr> = match input_struct_children {
|
||||||
|
// Raw binary lands in `data` and everything else is a typed null.
|
||||||
|
None => {
|
||||||
|
if declared.name() == "data" {
|
||||||
|
Arc::new(CastExpr::new(
|
||||||
|
input_expr.clone(),
|
||||||
|
declared.data_type().clone(),
|
||||||
|
None,
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
typed_null(declared.data_type())?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(children) => match children.iter().find(|c| c.name() == declared.name()) {
|
||||||
|
Some(child) => {
|
||||||
|
let field_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
|
||||||
|
&format!("get_field({})", declared.name()),
|
||||||
|
get_field(),
|
||||||
|
vec![
|
||||||
|
input_expr.clone(),
|
||||||
|
Arc::new(Literal::new(ScalarValue::from(declared.name().as_str()))),
|
||||||
|
],
|
||||||
|
Arc::new(child.as_ref().clone()),
|
||||||
|
config.clone(),
|
||||||
|
));
|
||||||
|
if child.data_type() == declared.data_type() {
|
||||||
|
field_expr
|
||||||
|
} else {
|
||||||
|
Arc::new(CastExpr::new(
|
||||||
|
field_expr,
|
||||||
|
declared.data_type().clone(),
|
||||||
|
None,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => typed_null(declared.data_type())?,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
ns_args.push(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
|
||||||
|
&format!("named_struct({})", table_field.name()),
|
||||||
|
named_struct(),
|
||||||
|
ns_args,
|
||||||
|
table_field.clone(),
|
||||||
|
config.clone(),
|
||||||
|
));
|
||||||
|
Ok((expr, table_field.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn typed_null(data_type: &DataType) -> Result<Arc<dyn PhysicalExpr>> {
|
||||||
|
let scalar = ScalarValue::try_from(data_type).map_err(|e| Error::InvalidInput {
|
||||||
|
message: format!("cannot build null literal for blob child type {data_type}: {e}"),
|
||||||
|
})?;
|
||||||
|
Ok(Arc::new(Literal::new(scalar)))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::super::cast::cast_to_table_schema;
|
||||||
|
use super::*;
|
||||||
|
use crate::blob::blob;
|
||||||
|
use arrow_array::{
|
||||||
|
Array, ArrayRef, BinaryArray, BinaryViewArray, Int32Array, Int64Array, LargeBinaryArray,
|
||||||
|
RecordBatch, StringArray, StructArray, UInt8Array, UInt64Array,
|
||||||
|
};
|
||||||
|
use arrow_schema::Schema;
|
||||||
|
use datafusion::prelude::SessionContext;
|
||||||
|
use datafusion_catalog::MemTable;
|
||||||
|
use datafusion_physical_plan::ExecutionPlan;
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
use lance_arrow::FieldExt;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
fn wide_blob_field(name: &str) -> Field {
|
||||||
|
Field::new(
|
||||||
|
name,
|
||||||
|
DataType::Struct(
|
||||||
|
vec![
|
||||||
|
Field::new("data", DataType::LargeBinary, true),
|
||||||
|
Field::new("uri", DataType::Utf8, true),
|
||||||
|
Field::new("position", DataType::UInt64, true),
|
||||||
|
Field::new("size", DataType::UInt64, true),
|
||||||
|
]
|
||||||
|
.into(),
|
||||||
|
),
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.with_metadata(HashMap::from([(
|
||||||
|
"ARROW:extension:name".to_string(),
|
||||||
|
"lance.blob.v2".to_string(),
|
||||||
|
)]))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blob_table_schema() -> Schema {
|
||||||
|
Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob("image", true),
|
||||||
|
])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn batch_with_image(image_field: Field, image: ArrayRef) -> RecordBatch {
|
||||||
|
let len = image.len();
|
||||||
|
RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
image_field,
|
||||||
|
])),
|
||||||
|
vec![Arc::new(Int64Array::from_iter_values(0..len as i64)), image],
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn image_struct(batch: &RecordBatch) -> &StructArray {
|
||||||
|
batch
|
||||||
|
.column_by_name("image")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<StructArray>()
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn plan_from_batch(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
|
||||||
|
let schema = batch.schema();
|
||||||
|
let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
|
||||||
|
let ctx = SessionContext::new();
|
||||||
|
ctx.register_table("t", Arc::new(table)).unwrap();
|
||||||
|
let df = ctx.table("t").await.unwrap();
|
||||||
|
df.create_physical_plan().await.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn coerce(batch: RecordBatch, table_schema: &Schema) -> RecordBatch {
|
||||||
|
let plan = plan_from_batch(batch).await;
|
||||||
|
let plan = cast_to_table_schema(plan, table_schema).unwrap();
|
||||||
|
let ctx = SessionContext::new();
|
||||||
|
let stream = plan.execute(0, ctx.task_ctx()).unwrap();
|
||||||
|
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
|
||||||
|
arrow_select::concat::concat_batches(&plan.schema(), &batches).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn coerce_err(batch: RecordBatch, table_schema: &Schema) -> Error {
|
||||||
|
let plan = plan_from_batch(batch).await;
|
||||||
|
cast_to_table_schema(plan, table_schema).unwrap_err()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn large_binary_coerces_to_declared_blob_struct() {
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", DataType::LargeBinary, true),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"hello".as_slice()])),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &blob_table_schema()).await;
|
||||||
|
let image_field = coerced.schema().field_with_name("image").unwrap().clone();
|
||||||
|
assert!(image_field.is_blob_v2());
|
||||||
|
assert!(matches!(image_field.data_type(), DataType::Struct(_)));
|
||||||
|
let data = image_struct(&coerced).column_by_name("data").unwrap();
|
||||||
|
let data: &LargeBinaryArray = data.as_any().downcast_ref().unwrap();
|
||||||
|
assert_eq!(data.value(0), b"hello");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn binary_coerces_to_declared_blob_struct() {
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", DataType::Binary, true),
|
||||||
|
Arc::new(BinaryArray::from_iter_values([b"hi".as_slice()])),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &blob_table_schema()).await;
|
||||||
|
assert!(
|
||||||
|
coerced
|
||||||
|
.schema()
|
||||||
|
.field_with_name("image")
|
||||||
|
.unwrap()
|
||||||
|
.is_blob_v2()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn binary_view_coerces_to_declared_blob_struct() {
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", DataType::BinaryView, true),
|
||||||
|
Arc::new(BinaryViewArray::from_iter_values([b"view".as_slice()])),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &blob_table_schema()).await;
|
||||||
|
let data = image_struct(&coerced).column_by_name("data").unwrap();
|
||||||
|
let data: &LargeBinaryArray = data.as_any().downcast_ref().unwrap();
|
||||||
|
assert_eq!(data.value(0), b"view");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn binary_nulls_stay_null_after_coercion() {
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", DataType::Binary, true),
|
||||||
|
Arc::new(BinaryArray::from_iter(vec![
|
||||||
|
Some(b"present".as_slice()),
|
||||||
|
None,
|
||||||
|
])),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &blob_table_schema()).await;
|
||||||
|
let image = image_struct(&coerced);
|
||||||
|
let data = image.column_by_name("data").unwrap();
|
||||||
|
assert!(!data.is_null(0));
|
||||||
|
assert!(data.is_null(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn binary_coerces_into_four_child_blob_layout() {
|
||||||
|
let table_schema = Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
wide_blob_field("image"),
|
||||||
|
]);
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", DataType::LargeBinary, true),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter(vec![
|
||||||
|
Some(b"alpha".as_slice()),
|
||||||
|
None,
|
||||||
|
])),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &table_schema).await;
|
||||||
|
let image = image_struct(&coerced);
|
||||||
|
assert_eq!(
|
||||||
|
image.num_columns(),
|
||||||
|
4,
|
||||||
|
"coerced struct keeps the declared layout"
|
||||||
|
);
|
||||||
|
assert!(image.column_by_name("position").unwrap().is_null(0));
|
||||||
|
assert!(image.column_by_name("size").unwrap().is_null(0));
|
||||||
|
assert!(!image.column_by_name("data").unwrap().is_null(0));
|
||||||
|
assert!(image.column_by_name("data").unwrap().is_null(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn prebuilt_struct_gains_blob_field_metadata() {
|
||||||
|
let DataType::Struct(children) = blob("image", true).data_type().clone() else {
|
||||||
|
unreachable!("blob field is a struct")
|
||||||
|
};
|
||||||
|
let prebuilt = StructArray::new(
|
||||||
|
children,
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"prebuilt".as_slice()])),
|
||||||
|
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", prebuilt.data_type().clone(), true),
|
||||||
|
Arc::new(prebuilt),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &blob_table_schema()).await;
|
||||||
|
assert!(
|
||||||
|
coerced
|
||||||
|
.schema()
|
||||||
|
.field_with_name("image")
|
||||||
|
.unwrap()
|
||||||
|
.is_blob_v2()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn prebuilt_narrow_struct_widens_to_declared_layout() {
|
||||||
|
let DataType::Struct(narrow_children) = blob("image", true).data_type().clone() else {
|
||||||
|
unreachable!("blob field is a struct")
|
||||||
|
};
|
||||||
|
let prebuilt = StructArray::new(
|
||||||
|
narrow_children,
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"prebuilt".as_slice()])),
|
||||||
|
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let table_schema = Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
wide_blob_field("image"),
|
||||||
|
]);
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", prebuilt.data_type().clone(), true),
|
||||||
|
Arc::new(prebuilt),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &table_schema).await;
|
||||||
|
let image = image_struct(&coerced);
|
||||||
|
assert_eq!(image.num_columns(), 4);
|
||||||
|
assert!(image.column_by_name("position").unwrap().is_null(0));
|
||||||
|
assert!(image.column_by_name("size").unwrap().is_null(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn external_reference_struct_preserves_uri_position_and_size() {
|
||||||
|
let prebuilt = StructArray::new(
|
||||||
|
vec![
|
||||||
|
Field::new("data", DataType::LargeBinary, true),
|
||||||
|
Field::new("uri", DataType::Utf8, true),
|
||||||
|
Field::new("position", DataType::UInt64, true),
|
||||||
|
Field::new("size", DataType::UInt64, true),
|
||||||
|
]
|
||||||
|
.into(),
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from(vec![None::<&[u8]>])) as ArrayRef,
|
||||||
|
Arc::new(StringArray::from(vec![Some("s3://bucket/blob.bin")])) as ArrayRef,
|
||||||
|
Arc::new(UInt64Array::from(vec![Some(7)])) as ArrayRef,
|
||||||
|
Arc::new(UInt64Array::from(vec![Some(6)])) as ArrayRef,
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let table_schema = Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
wide_blob_field("image"),
|
||||||
|
]);
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", prebuilt.data_type().clone(), true),
|
||||||
|
Arc::new(prebuilt),
|
||||||
|
);
|
||||||
|
let coerced = coerce(batch, &table_schema).await;
|
||||||
|
let image = image_struct(&coerced);
|
||||||
|
|
||||||
|
let uri: &StringArray = image
|
||||||
|
.column_by_name("uri")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(uri.value(0), "s3://bucket/blob.bin");
|
||||||
|
let position: &UInt64Array = image
|
||||||
|
.column_by_name("position")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(position.value(0), 7);
|
||||||
|
let size: &UInt64Array = image
|
||||||
|
.column_by_name("size")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(size.value(0), 6);
|
||||||
|
assert!(image.column_by_name("data").unwrap().is_null(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn descriptor_struct_without_value_child_is_rejected() {
|
||||||
|
let descriptor = StructArray::new(
|
||||||
|
vec![
|
||||||
|
Field::new("kind", DataType::UInt8, false),
|
||||||
|
Field::new("position", DataType::UInt64, false),
|
||||||
|
Field::new("size", DataType::UInt64, false),
|
||||||
|
]
|
||||||
|
.into(),
|
||||||
|
vec![
|
||||||
|
Arc::new(UInt8Array::from(vec![0])),
|
||||||
|
Arc::new(UInt64Array::from(vec![0])),
|
||||||
|
Arc::new(UInt64Array::from(vec![0])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", descriptor.data_type().clone(), true),
|
||||||
|
Arc::new(descriptor),
|
||||||
|
);
|
||||||
|
let err = coerce_err(batch, &blob_table_schema()).await;
|
||||||
|
assert!(err.to_string().contains("'data' or 'uri'"));
|
||||||
|
assert!(err.to_string().contains("image"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unsupported_input_type_is_rejected_with_column_name() {
|
||||||
|
let batch = batch_with_image(
|
||||||
|
Field::new("image", DataType::Utf8, true),
|
||||||
|
Arc::new(StringArray::from(vec!["not bytes"])),
|
||||||
|
);
|
||||||
|
let err = coerce_err(batch, &blob_table_schema()).await;
|
||||||
|
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
|
||||||
|
assert!(err.to_string().contains("image"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn blob_metadata_survives_cast_of_sibling_column() {
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int32, false),
|
||||||
|
Field::new("image", DataType::LargeBinary, true),
|
||||||
|
])),
|
||||||
|
vec![
|
||||||
|
Arc::new(Int32Array::from(vec![1])),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"x".as_slice()])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let coerced = coerce(batch, &blob_table_schema()).await;
|
||||||
|
|
||||||
|
let image_field = coerced.schema().field_with_name("image").unwrap().clone();
|
||||||
|
assert!(
|
||||||
|
image_field.is_blob_v2(),
|
||||||
|
"expected blob marker on image field, got {:?}",
|
||||||
|
image_field.metadata()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
coerced.schema().field_with_name("id").unwrap().data_type(),
|
||||||
|
&DataType::Int64
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn exact_blob_input_passes_through_unchanged() {
|
||||||
|
let DataType::Struct(children) = blob("image", true).data_type().clone() else {
|
||||||
|
unreachable!("blob field is a struct")
|
||||||
|
};
|
||||||
|
let image = StructArray::new(
|
||||||
|
children,
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"exact".as_slice()])),
|
||||||
|
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let batch = batch_with_image(blob("image", true), Arc::new(image));
|
||||||
|
let table_schema = blob_table_schema();
|
||||||
|
|
||||||
|
let input = plan_from_batch(batch).await;
|
||||||
|
let input_ptr = Arc::as_ptr(&input);
|
||||||
|
let plan = cast_to_table_schema(input, &table_schema).unwrap();
|
||||||
|
assert_eq!(Arc::as_ptr(&plan), input_ptr, "no projection inserted");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,8 +13,10 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
|
|||||||
use datafusion_physical_plan::expressions::Column;
|
use datafusion_physical_plan::expressions::Column;
|
||||||
use datafusion_physical_plan::projection::ProjectionExec;
|
use datafusion_physical_plan::projection::ProjectionExec;
|
||||||
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
|
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
|
||||||
|
use lance_arrow::FieldExt;
|
||||||
use lance_arrow::json::{is_arrow_json_field, is_json_field};
|
use lance_arrow::json::{is_arrow_json_field, is_json_field};
|
||||||
|
|
||||||
|
use super::blob_coerce::coerce_blob_expr;
|
||||||
use crate::{Error, Result};
|
use crate::{Error, Result};
|
||||||
|
|
||||||
pub fn cast_to_table_schema(
|
pub fn cast_to_table_schema(
|
||||||
@@ -77,6 +79,17 @@ fn build_field_exprs(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Blob columns accept raw binary on write; exact matches pass through below.
|
||||||
|
if table_field.is_blob_v2() && input_field.as_ref() != table_field.as_ref() {
|
||||||
|
result.push(coerce_blob_expr(
|
||||||
|
input_expr,
|
||||||
|
input_field,
|
||||||
|
table_field,
|
||||||
|
&config,
|
||||||
|
)?);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let expr = match (input_field.data_type(), table_field.data_type()) {
|
let expr = match (input_field.data_type(), table_field.data_type()) {
|
||||||
// Both are structs: recurse into sub-fields to handle subschemas and casts.
|
// Both are structs: recurse into sub-fields to handle subschemas and casts.
|
||||||
(DataType::Struct(in_children), DataType::Struct(tbl_children))
|
(DataType::Struct(in_children), DataType::Struct(tbl_children))
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
//! DataFusion ExecutionPlan for inserting data into LanceDB tables.
|
//! DataFusion ExecutionPlan for inserting data into LanceDB tables.
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::sync::{Arc, LazyLock, Mutex};
|
use std::sync::{Arc, LazyLock, Mutex};
|
||||||
|
|
||||||
use arrow_array::{RecordBatch, UInt64Array};
|
use arrow_array::{RecordBatch, UInt64Array};
|
||||||
@@ -20,11 +21,12 @@ use datafusion_physical_plan::{
|
|||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use lance::Dataset;
|
use lance::Dataset;
|
||||||
use lance::dataset::transaction::{Operation, Transaction};
|
use lance::dataset::transaction::{Operation, Transaction};
|
||||||
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams};
|
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams, WriteProgressFn};
|
||||||
use lance::io::exec::utils::InstrumentedRecordBatchStreamAdapter;
|
use lance::io::exec::utils::InstrumentedRecordBatchStreamAdapter;
|
||||||
use lance_table::format::Fragment;
|
use lance_table::format::Fragment;
|
||||||
|
|
||||||
use crate::table::dataset::DatasetConsistencyWrapper;
|
use crate::table::dataset::DatasetConsistencyWrapper;
|
||||||
|
use crate::table::write_progress::WriteProgressTracker;
|
||||||
|
|
||||||
pub(crate) static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
|
pub(crate) static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
|
||||||
Arc::new(ArrowSchema::new(vec![Field::new(
|
Arc::new(ArrowSchema::new(vec![Field::new(
|
||||||
@@ -81,6 +83,7 @@ pub struct InsertExec {
|
|||||||
dataset: Arc<Dataset>,
|
dataset: Arc<Dataset>,
|
||||||
input: Arc<dyn ExecutionPlan>,
|
input: Arc<dyn ExecutionPlan>,
|
||||||
write_params: WriteParams,
|
write_params: WriteParams,
|
||||||
|
tracker: Option<Arc<WriteProgressTracker>>,
|
||||||
properties: Arc<PlanProperties>,
|
properties: Arc<PlanProperties>,
|
||||||
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
|
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
|
||||||
metrics: ExecutionPlanMetricsSet,
|
metrics: ExecutionPlanMetricsSet,
|
||||||
@@ -92,6 +95,16 @@ impl InsertExec {
|
|||||||
dataset: Arc<Dataset>,
|
dataset: Arc<Dataset>,
|
||||||
input: Arc<dyn ExecutionPlan>,
|
input: Arc<dyn ExecutionPlan>,
|
||||||
write_params: WriteParams,
|
write_params: WriteParams,
|
||||||
|
) -> Self {
|
||||||
|
Self::new_with_tracker(ds_wrapper, dataset, input, write_params, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn new_with_tracker(
|
||||||
|
ds_wrapper: DatasetConsistencyWrapper,
|
||||||
|
dataset: Arc<Dataset>,
|
||||||
|
input: Arc<dyn ExecutionPlan>,
|
||||||
|
write_params: WriteParams,
|
||||||
|
tracker: Option<Arc<WriteProgressTracker>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let schema = COUNT_SCHEMA.clone();
|
let schema = COUNT_SCHEMA.clone();
|
||||||
let num_partitions = input.output_partitioning().partition_count();
|
let num_partitions = input.output_partitioning().partition_count();
|
||||||
@@ -107,6 +120,7 @@ impl InsertExec {
|
|||||||
dataset,
|
dataset,
|
||||||
input,
|
input,
|
||||||
write_params,
|
write_params,
|
||||||
|
tracker,
|
||||||
properties: Arc::new(properties),
|
properties: Arc::new(properties),
|
||||||
partial_transactions: Arc::new(Mutex::new(Vec::with_capacity(num_partitions))),
|
partial_transactions: Arc::new(Mutex::new(Vec::with_capacity(num_partitions))),
|
||||||
metrics: ExecutionPlanMetricsSet::new(),
|
metrics: ExecutionPlanMetricsSet::new(),
|
||||||
@@ -161,11 +175,12 @@ impl ExecutionPlan for InsertExec {
|
|||||||
"InsertExec requires exactly one child".to_string(),
|
"InsertExec requires exactly one child".to_string(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
Ok(Arc::new(Self::new(
|
Ok(Arc::new(Self::new_with_tracker(
|
||||||
self.ds_wrapper.clone(),
|
self.ds_wrapper.clone(),
|
||||||
self.dataset.clone(),
|
self.dataset.clone(),
|
||||||
children[0].clone(),
|
children[0].clone(),
|
||||||
self.write_params.clone(),
|
self.write_params.clone(),
|
||||||
|
self.tracker.clone(),
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,10 +191,11 @@ impl ExecutionPlan for InsertExec {
|
|||||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||||
let input_stream = self.input.execute(partition, context)?;
|
let input_stream = self.input.execute(partition, context)?;
|
||||||
let dataset = self.dataset.clone();
|
let dataset = self.dataset.clone();
|
||||||
let write_params = self.write_params.clone();
|
let mut write_params = self.write_params.clone();
|
||||||
let partial_transactions = self.partial_transactions.clone();
|
let partial_transactions = self.partial_transactions.clone();
|
||||||
let total_partitions = self.input.output_partitioning().partition_count();
|
let total_partitions = self.input.output_partitioning().partition_count();
|
||||||
let ds_wrapper = self.ds_wrapper.clone();
|
let ds_wrapper = self.ds_wrapper.clone();
|
||||||
|
let tracker = self.tracker.clone();
|
||||||
|
|
||||||
let output_bytes = MetricBuilder::new(&self.metrics).output_bytes(partition);
|
let output_bytes = MetricBuilder::new(&self.metrics).output_bytes(partition);
|
||||||
let input_schema = input_stream.schema();
|
let input_schema = input_stream.schema();
|
||||||
@@ -195,6 +211,20 @@ impl ExecutionPlan for InsertExec {
|
|||||||
));
|
));
|
||||||
|
|
||||||
let stream = futures::stream::once(async move {
|
let stream = futures::stream::once(async move {
|
||||||
|
if let Some(tracker) = tracker
|
||||||
|
&& write_params.write_progress.is_none()
|
||||||
|
{
|
||||||
|
let last_bytes = Arc::new(AtomicU64::new(0));
|
||||||
|
write_params.write_progress = Some(WriteProgressFn::new(move |stats| {
|
||||||
|
let previous = last_bytes.swap(stats.bytes_written, Ordering::Relaxed);
|
||||||
|
if stats.bytes_written > previous {
|
||||||
|
let delta =
|
||||||
|
usize::try_from(stats.bytes_written - previous).unwrap_or(usize::MAX);
|
||||||
|
tracker.record_bytes(delta);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
let transaction = InsertBuilder::new(dataset.clone())
|
let transaction = InsertBuilder::new(dataset.clone())
|
||||||
.with_params(&write_params)
|
.with_params(&write_params)
|
||||||
.execute_uncommitted_stream(input_stream)
|
.execute_uncommitted_stream(input_stream)
|
||||||
|
|||||||
@@ -518,6 +518,10 @@ mod tests {
|
|||||||
|
|
||||||
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::from_millis(200)));
|
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::from_millis(200)));
|
||||||
|
|
||||||
|
// Freeze `cached_at` on the mock clock so a slow external write below can't
|
||||||
|
// expire the TTL before the explicit advance_by() does (flake on loaded CI).
|
||||||
|
clock::pin();
|
||||||
|
|
||||||
// Populate the cache
|
// Populate the cache
|
||||||
let v1 = wrapper.get().await.unwrap().version().version;
|
let v1 = wrapper.get().await.unwrap().version().version;
|
||||||
assert_eq!(v1, 1);
|
assert_eq!(v1, 1);
|
||||||
|
|||||||
@@ -142,11 +142,21 @@ impl WriteProgressTracker {
|
|||||||
cb(&progress);
|
cb(&progress);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record wire bytes from the insert layer (e.g. IPC-encoded bytes for
|
/// Record wire bytes from the insert layer.
|
||||||
/// remote writes). When wire bytes are recorded, they take precedence over
|
///
|
||||||
/// the in-memory Arrow bytes tracked by [`record_batch`].
|
/// These bytes may be IPC-encoded bytes for remote writes or bytes handed
|
||||||
|
/// to Lance's local writer. When wire bytes are recorded, they take
|
||||||
|
/// precedence over the in-memory Arrow bytes tracked by [`record_batch`].
|
||||||
pub fn record_bytes(&self, bytes: usize) {
|
pub fn record_bytes(&self, bytes: usize) {
|
||||||
self.wire_bytes.fetch_add(bytes, Ordering::Relaxed);
|
self.wire_bytes.fetch_add(bytes, Ordering::Relaxed);
|
||||||
|
let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
let guard = self
|
||||||
|
.rows_and_bytes
|
||||||
|
.lock()
|
||||||
|
.unwrap_or_else(|e| e.into_inner());
|
||||||
|
let progress = self.snapshot(guard.0, guard.1, false);
|
||||||
|
drop(guard);
|
||||||
|
cb(&progress);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Emit the final progress callback indicating the write is complete.
|
/// Emit the final progress callback indicating the write is complete.
|
||||||
@@ -169,8 +179,6 @@ impl WriteProgressTracker {
|
|||||||
let wire = self.wire_bytes.load(Ordering::Relaxed);
|
let wire = self.wire_bytes.load(Ordering::Relaxed);
|
||||||
// Prefer wire bytes (actual I/O size) when the insert layer is
|
// Prefer wire bytes (actual I/O size) when the insert layer is
|
||||||
// tracking them; fall back to in-memory Arrow size otherwise.
|
// tracking them; fall back to in-memory Arrow size otherwise.
|
||||||
// TODO: for local writes, track actual bytes written by Lance
|
|
||||||
// instead of using in-memory Arrow size as a proxy.
|
|
||||||
let output_bytes = if wire > 0 { wire } else { in_memory_bytes };
|
let output_bytes = if wire > 0 { wire } else { in_memory_bytes };
|
||||||
WriteProgress {
|
WriteProgress {
|
||||||
elapsed: self.start.elapsed(),
|
elapsed: self.start.elapsed(),
|
||||||
@@ -383,6 +391,54 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_progress_uses_lance_write_bytes_for_local_tables() {
|
||||||
|
let dir = tempfile::tempdir().unwrap();
|
||||||
|
let db = connect(dir.path().to_str().unwrap())
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||||
|
let table = db
|
||||||
|
.create_table("local_write_bytes", batch)
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let new_data = record_batch!(("id", Int32, [4, 5, 6])).unwrap();
|
||||||
|
let in_memory_bytes = new_data.get_array_memory_size();
|
||||||
|
let final_bytes = Arc::new(AtomicUsize::new(0));
|
||||||
|
let seen_non_memory_bytes = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||||
|
let final_bytes_cb = final_bytes.clone();
|
||||||
|
let seen_non_memory_bytes_cb = seen_non_memory_bytes.clone();
|
||||||
|
|
||||||
|
table
|
||||||
|
.add(new_data)
|
||||||
|
.write_parallelism(1)
|
||||||
|
.progress(move |p| {
|
||||||
|
if p.output_bytes() > 0 && p.output_bytes() != in_memory_bytes {
|
||||||
|
seen_non_memory_bytes_cb.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
if p.done() {
|
||||||
|
final_bytes_cb.store(p.output_bytes(), Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
seen_non_memory_bytes.load(Ordering::SeqCst),
|
||||||
|
"progress should report Lance writer bytes, not only Arrow memory bytes"
|
||||||
|
);
|
||||||
|
assert_ne!(
|
||||||
|
final_bytes.load(Ordering::SeqCst),
|
||||||
|
in_memory_bytes,
|
||||||
|
"final progress bytes should come from Lance write stats"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_record_batch_recovers_from_poisoned_callback_lock() {
|
fn test_record_batch_recovers_from_poisoned_callback_lock() {
|
||||||
use super::{ProgressCallback, WriteProgressTracker};
|
use super::{ProgressCallback, WriteProgressTracker};
|
||||||
|
|||||||
@@ -329,6 +329,15 @@ pub mod clock {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Start mock time at the current instant if not already pinned.
|
||||||
|
pub fn pin() {
|
||||||
|
MOCK_NOW.with(|mock| {
|
||||||
|
if mock.get().is_none() {
|
||||||
|
mock.set(Some(Instant::now()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn clear_mock() {
|
pub fn clear_mock() {
|
||||||
MOCK_NOW.with(|mock| mock.set(None));
|
MOCK_NOW.with(|mock| mock.set(None));
|
||||||
|
|||||||
949
rust/lancedb/tests/blob_integration.rs
Normal file
949
rust/lancedb/tests/blob_integration.rs
Normal file
@@ -0,0 +1,949 @@
|
|||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arrow_array::{
|
||||||
|
Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StringArray,
|
||||||
|
StructArray, UInt64Array,
|
||||||
|
};
|
||||||
|
use arrow_schema::{DataType, Field, Fields, Schema};
|
||||||
|
use futures::TryStreamExt;
|
||||||
|
use lance_encoding::version::LanceFileVersion;
|
||||||
|
use lancedb::{
|
||||||
|
Connection, Error, Result, Table,
|
||||||
|
blob::blob,
|
||||||
|
connect, connect_namespace,
|
||||||
|
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS,
|
||||||
|
query::{ExecutableQuery, QueryBase},
|
||||||
|
table::{AddDataMode, CompactionOptions, OptimizeAction},
|
||||||
|
};
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
fn blob_table_schema() -> Arc<Schema> {
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob("image", true),
|
||||||
|
]))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn binary_input_batch(ids: &[i64], payloads: &[Option<&[u8]>]) -> RecordBatch {
|
||||||
|
RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
Field::new("image", DataType::LargeBinary, true),
|
||||||
|
])),
|
||||||
|
vec![
|
||||||
|
Arc::new(Int64Array::from(ids.to_vec())),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter(payloads.iter().copied())),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_inline_blob_table(
|
||||||
|
db: &Connection,
|
||||||
|
name: &str,
|
||||||
|
ids: &[i64],
|
||||||
|
payloads: &[Option<&[u8]>],
|
||||||
|
) -> Result<Table> {
|
||||||
|
let table = db
|
||||||
|
.create_empty_table(name, blob_table_schema())
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
table
|
||||||
|
.add(binary_input_batch(ids, payloads))
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
Ok(table)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn storage_format_version(table: &Table) -> LanceFileVersion {
|
||||||
|
table
|
||||||
|
.as_native()
|
||||||
|
.unwrap()
|
||||||
|
.manifest()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.data_storage_format
|
||||||
|
.lance_file_version()
|
||||||
|
.unwrap()
|
||||||
|
.resolve()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn uses_stable_row_ids(table: &Table) -> bool {
|
||||||
|
table
|
||||||
|
.as_native()
|
||||||
|
.unwrap()
|
||||||
|
.manifest()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.uses_stable_row_ids()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn query_image_struct(table: &Table) -> StructArray {
|
||||||
|
let batches = table
|
||||||
|
.query()
|
||||||
|
.execute()
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.try_collect::<Vec<_>>()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
|
||||||
|
batch
|
||||||
|
.column_by_name("image")
|
||||||
|
.expect("image column present")
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<StructArray>()
|
||||||
|
.expect("image column is a descriptor struct")
|
||||||
|
.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn declaring_blob_column_bumps_format_and_enables_stable_row_ids() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = db
|
||||||
|
.create_empty_table("t", blob_table_schema())
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||||
|
assert!(uses_stable_row_ids(&table).await);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn explicit_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = db
|
||||||
|
.create_empty_table("t", blob_table_schema())
|
||||||
|
.storage_option(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, "false")
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||||
|
assert!(!uses_stable_row_ids(&table).await);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn non_blob_table_keeps_default_format_and_row_id_setting() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
|
||||||
|
let table = db.create_empty_table("t", schema).execute().await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await < LanceFileVersion::V2_2);
|
||||||
|
assert!(!uses_stable_row_ids(&table).await);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn creating_with_blob_data_bumps_format() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
|
||||||
|
let blob_field = blob("image", true);
|
||||||
|
let DataType::Struct(children) = blob_field.data_type().clone() else {
|
||||||
|
unreachable!("blob field is a struct")
|
||||||
|
};
|
||||||
|
let image = StructArray::new(
|
||||||
|
children,
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"payload".as_slice()])),
|
||||||
|
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob_field,
|
||||||
|
])),
|
||||||
|
vec![Arc::new(Int64Array::from(vec![1])), Arc::new(image)],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let table = db.create_table("t", batch).execute().await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||||
|
assert!(uses_stable_row_ids(&table).await);
|
||||||
|
assert_eq!(table.count_rows(None).await?, 1);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn add_coerces_large_binary_into_blob_column() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table =
|
||||||
|
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"cat".as_slice()), Some(b"dog")])
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert_eq!(table.count_rows(None).await?, 2);
|
||||||
|
let image = query_image_struct(&table).await;
|
||||||
|
assert_eq!(image.len(), 2);
|
||||||
|
let schema = table.schema().await?;
|
||||||
|
let field = schema.field_with_name("image").unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
field
|
||||||
|
.metadata()
|
||||||
|
.get("ARROW:extension:name")
|
||||||
|
.map(String::as_str),
|
||||||
|
Some("lance.blob.v2")
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn add_coerces_binary_into_blob_column() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = db
|
||||||
|
.create_empty_table("t", blob_table_schema())
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
Field::new("image", DataType::Binary, true),
|
||||||
|
])),
|
||||||
|
vec![
|
||||||
|
Arc::new(Int64Array::from(vec![1])),
|
||||||
|
Arc::new(BinaryArray::from_iter_values([b"small".as_slice()])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
table.add(batch).execute().await?;
|
||||||
|
|
||||||
|
assert_eq!(table.count_rows(None).await?, 1);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn add_accepts_null_blob_rows() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(
|
||||||
|
&db,
|
||||||
|
"t",
|
||||||
|
&[1, 2, 3],
|
||||||
|
&[Some(b"first".as_slice()), None, Some(b"third")],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert_eq!(table.count_rows(None).await?, 3);
|
||||||
|
let image = query_image_struct(&table).await;
|
||||||
|
assert_eq!(image.len(), 3);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn add_rejects_uncoercible_blob_input() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = db
|
||||||
|
.create_empty_table("t", blob_table_schema())
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
Field::new("image", DataType::Utf8, true),
|
||||||
|
])),
|
||||||
|
vec![
|
||||||
|
Arc::new(Int64Array::from(vec![1])),
|
||||||
|
Arc::new(StringArray::from(vec!["not bytes"])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let err = table.add(batch).execute().await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("image"));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn connection_level_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap())
|
||||||
|
.storage_option(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, "false")
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
let table = db
|
||||||
|
.create_empty_table("t", blob_table_schema())
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||||
|
assert!(!uses_stable_row_ids(&table).await);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn namespace_create_applies_blob_defaults() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let mut properties = std::collections::HashMap::new();
|
||||||
|
properties.insert("root".to_string(), tmp.path().to_str().unwrap().to_string());
|
||||||
|
let db = connect_namespace("dir", properties).execute().await?;
|
||||||
|
let table = db
|
||||||
|
.create_empty_table("t", blob_table_schema())
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||||
|
assert!(uses_stable_row_ids(&table).await);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Overwrite takes the input schema as-is. A raw-binary overwrite drops the blob
|
||||||
|
// marker; re-declaring blob v2 in the input restores it.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"blob".as_slice())]).await?;
|
||||||
|
|
||||||
|
let raw_schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
Field::new("image", DataType::LargeBinary, true),
|
||||||
|
]));
|
||||||
|
let raw_batch = RecordBatch::try_new(
|
||||||
|
raw_schema.clone(),
|
||||||
|
vec![
|
||||||
|
Arc::new(Int64Array::from(vec![2])),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"plain".as_slice()])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
table
|
||||||
|
.add(raw_batch)
|
||||||
|
.mode(AddDataMode::Overwrite)
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
let schema = table.schema().await?;
|
||||||
|
assert_eq!(schema, raw_schema);
|
||||||
|
assert!(
|
||||||
|
!schema
|
||||||
|
.field_with_name("image")
|
||||||
|
.unwrap()
|
||||||
|
.metadata()
|
||||||
|
.contains_key("ARROW:extension:name")
|
||||||
|
);
|
||||||
|
|
||||||
|
let blob_field = blob("image", true);
|
||||||
|
let DataType::Struct(children) = blob_field.data_type().clone() else {
|
||||||
|
unreachable!("blob field is a struct")
|
||||||
|
};
|
||||||
|
let image = StructArray::new(
|
||||||
|
children,
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([b"declared".as_slice()])),
|
||||||
|
Arc::new(StringArray::from(vec![None::<&str>])),
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let declared_batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob_field,
|
||||||
|
])),
|
||||||
|
vec![Arc::new(Int64Array::from(vec![3])), Arc::new(image)],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
table
|
||||||
|
.add(declared_batch)
|
||||||
|
.mode(AddDataMode::Overwrite)
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
let schema = table.schema().await?;
|
||||||
|
assert_eq!(
|
||||||
|
schema
|
||||||
|
.field_with_name("image")
|
||||||
|
.unwrap()
|
||||||
|
.metadata()
|
||||||
|
.get("ARROW:extension:name")
|
||||||
|
.map(String::as_str),
|
||||||
|
Some("lance.blob.v2")
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn collect_row_ids(table: &Table) -> Result<Vec<u64>> {
|
||||||
|
let batches = table
|
||||||
|
.query()
|
||||||
|
.with_row_id()
|
||||||
|
.execute()
|
||||||
|
.await?
|
||||||
|
.try_collect::<Vec<_>>()
|
||||||
|
.await?;
|
||||||
|
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
|
||||||
|
Ok(batch
|
||||||
|
.column_by_name("_rowid")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<UInt64Array>()
|
||||||
|
.unwrap()
|
||||||
|
.values()
|
||||||
|
.to_vec())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn collect_id_rowid(table: &Table) -> Result<Vec<(i64, u64)>> {
|
||||||
|
let batches = table
|
||||||
|
.query()
|
||||||
|
.with_row_id()
|
||||||
|
.execute()
|
||||||
|
.await?
|
||||||
|
.try_collect::<Vec<_>>()
|
||||||
|
.await?;
|
||||||
|
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
|
||||||
|
let ids = batch
|
||||||
|
.column_by_name("id")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<Int64Array>()
|
||||||
|
.unwrap();
|
||||||
|
let row_ids = batch
|
||||||
|
.column_by_name("_rowid")
|
||||||
|
.unwrap()
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<UInt64Array>()
|
||||||
|
.unwrap();
|
||||||
|
Ok(ids
|
||||||
|
.values()
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.zip(row_ids.values().iter().copied())
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_round_trips_bytes() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let payload: &[u8] = b"blob-round-trip-payload";
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(payload)]).await?;
|
||||||
|
|
||||||
|
let ids = collect_row_ids(&table).await?;
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
assert_eq!(bytes.len(), 1);
|
||||||
|
assert_eq!(bytes.value(0), payload);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_round_trips_nested_blob_column() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
|
||||||
|
let blob_field = blob("blob", true);
|
||||||
|
let DataType::Struct(blob_children) = blob_field.data_type().clone() else {
|
||||||
|
unreachable!("blob field is a struct")
|
||||||
|
};
|
||||||
|
let blob_array = StructArray::new(
|
||||||
|
blob_children,
|
||||||
|
vec![
|
||||||
|
Arc::new(LargeBinaryArray::from_iter_values([
|
||||||
|
b"hello".as_slice(),
|
||||||
|
b"world".as_slice(),
|
||||||
|
])) as ArrayRef,
|
||||||
|
Arc::new(StringArray::from(vec![None::<&str>, None::<&str>])) as ArrayRef,
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let info_fields: Fields = vec![Field::new("name", DataType::Utf8, false), blob_field].into();
|
||||||
|
let info_array = StructArray::new(
|
||||||
|
info_fields.clone(),
|
||||||
|
vec![
|
||||||
|
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
|
||||||
|
Arc::new(blob_array) as ArrayRef,
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let schema = Arc::new(Schema::new(vec![Field::new(
|
||||||
|
"info",
|
||||||
|
DataType::Struct(info_fields),
|
||||||
|
true,
|
||||||
|
)]));
|
||||||
|
let batch = RecordBatch::try_new(schema, vec![Arc::new(info_array) as ArrayRef]).unwrap();
|
||||||
|
let table = db.create_table("t", batch).execute().await?;
|
||||||
|
|
||||||
|
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
|
||||||
|
assert!(uses_stable_row_ids(&table).await);
|
||||||
|
|
||||||
|
let ids = collect_row_ids(&table).await?;
|
||||||
|
let bytes = table.fetch_blobs("info.blob", &ids).await?;
|
||||||
|
assert_eq!(bytes.len(), 2);
|
||||||
|
let values: std::collections::HashSet<&[u8]> =
|
||||||
|
(0..bytes.len()).map(|i| bytes.value(i)).collect();
|
||||||
|
assert!(values.contains(b"hello".as_slice()));
|
||||||
|
assert!(values.contains(b"world".as_slice()));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn blob_columns_lists_nested_dotted_paths() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let blob_field = blob("blob", true);
|
||||||
|
let info = Field::new(
|
||||||
|
"info",
|
||||||
|
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
blob("thumbnail", true),
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
info,
|
||||||
|
]));
|
||||||
|
let table = db.create_empty_table("t", schema).execute().await?;
|
||||||
|
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "info.blob"]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn blob_columns_lists_blob_fields_in_order() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
blob("thumbnail", true),
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob("image", true),
|
||||||
|
]));
|
||||||
|
let table = db.create_empty_table("t", schema).execute().await?;
|
||||||
|
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "image"]);
|
||||||
|
|
||||||
|
let plain = db
|
||||||
|
.create_empty_table(
|
||||||
|
"plain",
|
||||||
|
Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])),
|
||||||
|
)
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
assert!(plain.blob_columns().await?.is_empty());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_preserves_null_alignment() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(
|
||||||
|
&db,
|
||||||
|
"t",
|
||||||
|
&[1, 2, 3, 4],
|
||||||
|
&[Some(b"a".as_slice()), None, Some(b"c"), None],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
assert_eq!(bytes.len(), ids.len());
|
||||||
|
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||||
|
match id {
|
||||||
|
1 => assert_eq!(bytes.value(i), b"a"),
|
||||||
|
2 | 4 => assert!(bytes.is_null(i)),
|
||||||
|
3 => assert_eq!(bytes.value(i), b"c"),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_all_null_column_returns_all_nulls() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1, 2], &[None, None]).await?;
|
||||||
|
|
||||||
|
let ids = collect_row_ids(&table).await?;
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
assert_eq!(bytes.len(), 2);
|
||||||
|
assert_eq!(bytes.null_count(), 2);
|
||||||
|
|
||||||
|
let files = table.fetch_blob_files("image", &ids).await?;
|
||||||
|
assert_eq!(files.len(), 2);
|
||||||
|
assert!(files.iter().all(Option::is_none));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_aligns_with_reordered_and_duplicate_ids() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(
|
||||||
|
&db,
|
||||||
|
"t",
|
||||||
|
&[1, 2, 3],
|
||||||
|
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
let by_id = |want: i64| pairs.iter().find(|(id, _)| *id == want).unwrap().1;
|
||||||
|
let request = vec![by_id(3), by_id(1), by_id(3), by_id(2)];
|
||||||
|
let bytes = table.fetch_blobs("image", &request).await?;
|
||||||
|
assert_eq!(bytes.len(), 4);
|
||||||
|
assert_eq!(bytes.value(0), b"three");
|
||||||
|
assert_eq!(bytes.value(1), b"one");
|
||||||
|
assert_eq!(bytes.value(2), b"three");
|
||||||
|
assert_eq!(bytes.value(3), b"two");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_empty_ids_returns_empty() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||||
|
|
||||||
|
assert_eq!(table.fetch_blobs("image", &[]).await?.len(), 0);
|
||||||
|
assert!(table.fetch_blob_files("image", &[]).await?.is_empty());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_out_of_range_id_errors_without_panic() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||||
|
|
||||||
|
let err = table.fetch_blobs("image", &[u64::MAX]).await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("row ids"));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_rejects_non_blob_column() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||||
|
|
||||||
|
let err = table.fetch_blobs("id", &[0]).await.unwrap_err();
|
||||||
|
assert!(matches!(err, Error::InvalidInput { .. }));
|
||||||
|
assert!(err.to_string().contains("'id' is not a blob column"));
|
||||||
|
|
||||||
|
let err = table.fetch_blob_files("id", &[0]).await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("'id' is not a blob column"));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_rejects_unknown_column() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
|
||||||
|
|
||||||
|
let err = table.fetch_blobs("missing", &[0]).await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("no column named 'missing'"));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_rejects_legacy_v1_blob_column() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
|
||||||
|
std::collections::HashMap::from([("lance-encoding:blob".to_string(), "true".to_string())]),
|
||||||
|
);
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
legacy,
|
||||||
|
]));
|
||||||
|
let table = db.create_empty_table("t", schema).execute().await?;
|
||||||
|
|
||||||
|
let err = table.fetch_blobs("image", &[0]).await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("legacy blob column"));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blob_files_reads_lazily_and_aligns_nulls() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table =
|
||||||
|
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"lazy-bytes".as_slice()), None])
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
let files = table.fetch_blob_files("image", &ids).await?;
|
||||||
|
assert_eq!(files.len(), 2);
|
||||||
|
for ((id, _), file) in pairs.iter().zip(&files) {
|
||||||
|
match id {
|
||||||
|
1 => {
|
||||||
|
let handle = file.as_ref().unwrap();
|
||||||
|
assert_eq!(handle.read().await.unwrap().as_ref(), b"lazy-bytes");
|
||||||
|
}
|
||||||
|
2 => assert!(file.is_none()),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_reads_multiple_blob_columns_independently() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let schema = Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
blob("image", true),
|
||||||
|
blob("thumbnail", true),
|
||||||
|
]));
|
||||||
|
let table = db.create_empty_table("t", schema).execute().await?;
|
||||||
|
let batch = RecordBatch::try_new(
|
||||||
|
Arc::new(Schema::new(vec![
|
||||||
|
Field::new("id", DataType::Int64, false),
|
||||||
|
Field::new("image", DataType::LargeBinary, true),
|
||||||
|
Field::new("thumbnail", DataType::LargeBinary, true),
|
||||||
|
])),
|
||||||
|
vec![
|
||||||
|
Arc::new(Int64Array::from(vec![1, 2])),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter(vec![
|
||||||
|
Some(b"image-1".as_slice()),
|
||||||
|
None,
|
||||||
|
])),
|
||||||
|
Arc::new(LargeBinaryArray::from_iter(vec![
|
||||||
|
None,
|
||||||
|
Some(b"thumb-2".as_slice()),
|
||||||
|
])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
table.add(batch).execute().await?;
|
||||||
|
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
let images = table.fetch_blobs("image", &ids).await?;
|
||||||
|
let thumbs = table.fetch_blobs("thumbnail", &ids).await?;
|
||||||
|
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||||
|
match id {
|
||||||
|
1 => {
|
||||||
|
assert_eq!(images.value(i), b"image-1");
|
||||||
|
assert!(thumbs.is_null(i));
|
||||||
|
}
|
||||||
|
2 => {
|
||||||
|
assert!(images.is_null(i));
|
||||||
|
assert_eq!(thumbs.value(i), b"thumb-2");
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_spans_fragments() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
|
||||||
|
table
|
||||||
|
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||||
|
match id {
|
||||||
|
1 => assert_eq!(bytes.value(i), b"frag-one"),
|
||||||
|
2 => assert_eq!(bytes.value(i), b"frag-two"),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_packed_payload_round_trip() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let big = vec![0xAB_u8; 100 * 1024];
|
||||||
|
let small = b"small".to_vec();
|
||||||
|
let table = create_inline_blob_table(
|
||||||
|
&db,
|
||||||
|
"t",
|
||||||
|
&[1, 2],
|
||||||
|
&[Some(big.as_slice()), Some(small.as_slice())],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||||
|
match id {
|
||||||
|
1 => assert_eq!(bytes.value(i), big.as_slice()),
|
||||||
|
2 => assert_eq!(bytes.value(i), small.as_slice()),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_after_delete() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(
|
||||||
|
&db,
|
||||||
|
"t",
|
||||||
|
&[1, 2, 3],
|
||||||
|
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
table.delete("id = 2").await?;
|
||||||
|
let pairs = collect_id_rowid(&table).await?;
|
||||||
|
assert_eq!(pairs.len(), 2);
|
||||||
|
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
for (i, (id, _)) in pairs.iter().enumerate() {
|
||||||
|
match id {
|
||||||
|
1 => assert_eq!(bytes.value(i), b"one"),
|
||||||
|
3 => assert_eq!(bytes.value(i), b"three"),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_with_precompaction_row_ids_survives_compaction() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
|
||||||
|
table
|
||||||
|
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let pairs_before = collect_id_rowid(&table).await?;
|
||||||
|
let ids_before: Vec<u64> = pairs_before.iter().map(|(_, rowid)| *rowid).collect();
|
||||||
|
|
||||||
|
table
|
||||||
|
.optimize(OptimizeAction::Compact {
|
||||||
|
options: CompactionOptions::default(),
|
||||||
|
remap_options: None,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let bytes_after = table.fetch_blobs("image", &ids_before).await?;
|
||||||
|
assert_eq!(bytes_after.len(), 2);
|
||||||
|
for (i, (id, _)) in pairs_before.iter().enumerate() {
|
||||||
|
match id {
|
||||||
|
1 => assert_eq!(bytes_after.value(i), b"frag-one"),
|
||||||
|
2 => assert_eq!(bytes_after.value(i), b"frag-two"),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn zero_length_blob_reads_back_as_null() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"".as_slice())]).await?;
|
||||||
|
|
||||||
|
let ids = collect_row_ids(&table).await?;
|
||||||
|
let bytes = table.fetch_blobs("image", &ids).await?;
|
||||||
|
assert_eq!(bytes.len(), 1);
|
||||||
|
assert!(bytes.is_null(0));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
const DEDICATED_BLOB_LEN: usize = 64 * 1024;
|
||||||
|
const SCRAMBLED_LOGICAL_IDS: [i64; 7] = [6, 3, 1, 4, 6, 2, 5];
|
||||||
|
|
||||||
|
fn dedicated_blob_bytes(tag: u8) -> Vec<u8> {
|
||||||
|
vec![tag; DEDICATED_BLOB_LEN]
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn multi_fragment_dedicated_blob_table(db: &Connection) -> Result<Table> {
|
||||||
|
let rows: [(i64, Option<u8>); 6] = [
|
||||||
|
(1, Some(1)),
|
||||||
|
(2, Some(2)),
|
||||||
|
(3, None),
|
||||||
|
(4, Some(4)),
|
||||||
|
(5, None),
|
||||||
|
(6, Some(6)),
|
||||||
|
];
|
||||||
|
let mut table: Option<Table> = None;
|
||||||
|
for (logical_id, blob_tag) in rows {
|
||||||
|
let bytes = blob_tag.map(dedicated_blob_bytes);
|
||||||
|
let image = [bytes.as_deref()];
|
||||||
|
table = Some(match table {
|
||||||
|
None => create_inline_blob_table(db, "t", &[logical_id], &image).await?,
|
||||||
|
Some(t) => {
|
||||||
|
t.add(binary_input_batch(&[logical_id], &image))
|
||||||
|
.execute()
|
||||||
|
.await?;
|
||||||
|
t
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(table.unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn row_ids_for_logical(table: &Table, logical_ids: &[i64]) -> Result<Vec<u64>> {
|
||||||
|
let id_rowid = collect_id_rowid(table).await?;
|
||||||
|
Ok(logical_ids
|
||||||
|
.iter()
|
||||||
|
.map(|logical_id| {
|
||||||
|
id_rowid
|
||||||
|
.iter()
|
||||||
|
.find(|(id, _)| id == logical_id)
|
||||||
|
.map(|(_, row_id)| *row_id)
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blobs_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = multi_fragment_dedicated_blob_table(&db).await?;
|
||||||
|
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
|
||||||
|
|
||||||
|
let bytes = table.fetch_blobs("image", &row_ids).await?;
|
||||||
|
assert_eq!(bytes.len(), SCRAMBLED_LOGICAL_IDS.len());
|
||||||
|
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
|
||||||
|
match logical_id {
|
||||||
|
3 | 5 => assert!(bytes.is_null(slot)),
|
||||||
|
id => assert_eq!(
|
||||||
|
bytes.value(slot),
|
||||||
|
dedicated_blob_bytes(*id as u8).as_slice()
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fetch_blob_files_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
|
||||||
|
let tmp = tempdir().unwrap();
|
||||||
|
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||||
|
let table = multi_fragment_dedicated_blob_table(&db).await?;
|
||||||
|
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
|
||||||
|
|
||||||
|
let files = table.fetch_blob_files("image", &row_ids).await?;
|
||||||
|
assert_eq!(files.len(), SCRAMBLED_LOGICAL_IDS.len());
|
||||||
|
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
|
||||||
|
match logical_id {
|
||||||
|
3 | 5 => assert!(files[slot].is_none()),
|
||||||
|
id => {
|
||||||
|
let payload = files[slot].as_ref().unwrap().read().await?;
|
||||||
|
assert_eq!(payload.as_ref(), dedicated_blob_bytes(*id as u8).as_slice());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user