mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-18 03:30:40 +00:00
Compare commits
11 Commits
yang/appro
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1c19cd133 | ||
|
|
ce5dadd386 | ||
|
|
1f8ebef3cd | ||
|
|
217fd8491d | ||
|
|
9128dbcd7a | ||
|
|
394bb34fa2 | ||
|
|
b2ae763254 | ||
|
|
1bead6960c | ||
|
|
0abf641733 | ||
|
|
976edeb2ff | ||
|
|
b46a44f873 |
@@ -23,6 +23,8 @@ allow_dirty = true
|
||||
commit = true
|
||||
message = "Bump version: {current_version} → {new_version}"
|
||||
commit_args = ""
|
||||
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
|
||||
allow_shell_hooks = true
|
||||
|
||||
# Java maven files
|
||||
pre_commit_hooks = [
|
||||
|
||||
85
Cargo.lock
generated
85
Cargo.lock
generated
@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.4",
|
||||
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4810,8 +4810,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4832,7 +4832,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "lance-arrow-scalar"
|
||||
version = "58.0.0"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4846,7 +4846,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "lance-arrow-stats"
|
||||
version = "58.0.0"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -4855,8 +4855,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4865,8 +4865,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4904,8 +4904,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4935,8 +4935,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4953,8 +4953,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-derive"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -4963,8 +4963,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4999,8 +4999,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -5030,8 +5030,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -5083,6 +5083,7 @@ dependencies = [
|
||||
"rand_distr 0.5.1",
|
||||
"rangemap",
|
||||
"rayon",
|
||||
"regex-syntax",
|
||||
"roaring",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -5095,8 +5096,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -5137,8 +5138,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -5153,8 +5154,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -5166,8 +5167,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -5207,9 +5208,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-reqwest-client"
|
||||
version = "0.8.5"
|
||||
version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0d287494559c22838ce34e51ea0fa29dc780d5be8283de5ab33e9395623000c8"
|
||||
checksum = "ba3f0a235e3ed5f8805205649ccc7d7d0f3df23ce1294242c9265ad488d7f19d"
|
||||
dependencies = [
|
||||
"reqwest 0.12.28",
|
||||
"serde",
|
||||
@@ -5221,8 +5222,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-select"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -5237,8 +5238,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -5277,8 +5278,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -5291,8 +5292,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-tokenizer"
|
||||
version = "8.0.0-beta.14"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.14#c188de59fcf0976a0a9fef53ae67ae7ae8bcb61a"
|
||||
version = "8.0.0-beta.17"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.17#0f2745d10a0fe5b34a1cf214466bbc0c0d13c90c"
|
||||
dependencies = [
|
||||
"icu_segmenter",
|
||||
"jieba-rs",
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=8.0.0-beta.14", default-features = false, "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=8.0.0-beta.14", "tag" = "v8.0.0-beta.14", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=8.0.0-beta.17", default-features = false, "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=8.0.0-beta.17", "tag" = "v8.0.0-beta.17", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>8.0.0-beta.14</lance-core.version>
|
||||
<lance-core.version>8.0.0-beta.17</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -23,6 +23,8 @@ allow_dirty = true
|
||||
commit = true
|
||||
message = "Bump version: {current_version} → {new_version}"
|
||||
commit_args = ""
|
||||
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
|
||||
allow_shell_hooks = true
|
||||
|
||||
# Update Cargo.lock after version bump
|
||||
pre_commit_hooks = [
|
||||
|
||||
@@ -275,7 +275,18 @@ def _py_type_to_arrow_type(py_type: Type[Any], field: FieldInfo) -> pa.DataType:
|
||||
tz = get_extras(field, "tz")
|
||||
return pa.timestamp("us", tz=tz)
|
||||
elif getattr(py_type, "__origin__", None) in (list, tuple):
|
||||
child = py_type.__args__[0]
|
||||
# A bare, unparameterised ``typing.List`` / ``typing.Tuple`` matches this
|
||||
# branch (its ``__origin__`` is ``list`` / ``tuple``) but has no
|
||||
# ``__args__``, so we cannot infer the element type. Raise a clear
|
||||
# ``TypeError`` instead of crashing with an opaque ``AttributeError``.
|
||||
args = getattr(py_type, "__args__", None)
|
||||
if not args:
|
||||
raise TypeError(
|
||||
"Converting Pydantic type to Arrow Type: unsupported type "
|
||||
f"{py_type}. Specify the element type, e.g. List[int] instead "
|
||||
"of a bare List."
|
||||
)
|
||||
child = args[0]
|
||||
return _pydantic_list_child_to_arrow(child, field)
|
||||
raise TypeError(
|
||||
f"Converting Pydantic type to Arrow Type: unsupported type {py_type}."
|
||||
|
||||
@@ -86,7 +86,10 @@ def _from_list(data: list) -> Scannable:
|
||||
|
||||
@to_scannable.register(dict)
|
||||
def _from_dict(data: dict) -> Scannable:
|
||||
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
|
||||
raise ValueError(
|
||||
"Cannot create or add rows from a single dictionary. "
|
||||
"Use a list of dictionaries instead."
|
||||
)
|
||||
|
||||
|
||||
@to_scannable.register(LanceModel)
|
||||
|
||||
@@ -243,7 +243,10 @@ def _into_pyarrow_reader(
|
||||
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
|
||||
|
||||
if isinstance(data, dict):
|
||||
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
|
||||
raise ValueError(
|
||||
"Cannot create or add rows from a single dictionary. "
|
||||
"Use a list of dictionaries instead."
|
||||
)
|
||||
|
||||
if isinstance(data, list):
|
||||
# Handle empty list case
|
||||
|
||||
@@ -373,9 +373,15 @@ def _(value: list):
|
||||
@value_to_sql.register(dict)
|
||||
def _(value: dict):
|
||||
# https://datafusion.apache.org/user-guide/sql/scalar_functions.html#named-struct
|
||||
# Render the field name through value_to_sql(str(...)) as well so that keys
|
||||
# containing characters meaningful in SQL (e.g. a single quote) are escaped
|
||||
# the same way string values are. A bare f"'{k}'" would emit invalid SQL for
|
||||
# a key like "it's".
|
||||
return (
|
||||
"named_struct("
|
||||
+ ", ".join(f"'{k}', {value_to_sql(v)}" for k, v in value.items())
|
||||
+ ", ".join(
|
||||
f"{value_to_sql(str(k))}, {value_to_sql(v)}" for k, v in value.items()
|
||||
)
|
||||
+ ")"
|
||||
)
|
||||
|
||||
|
||||
@@ -188,6 +188,18 @@ def test_nested_struct_list():
|
||||
assert schema == expect_schema
|
||||
|
||||
|
||||
def test_bare_generic_raises_type_error():
|
||||
# A bare, unparameterised List/Tuple has no element type to map to Arrow.
|
||||
# It should raise a clear TypeError, not crash with AttributeError: __args__.
|
||||
for bare in (List, Tuple):
|
||||
|
||||
class TestModel(pydantic.BaseModel):
|
||||
items: bare
|
||||
|
||||
with pytest.raises(TypeError, match="unsupported type"):
|
||||
pydantic_to_schema(TestModel)
|
||||
|
||||
|
||||
def test_nested_struct_list_optional():
|
||||
class SplitInfo(pydantic.BaseModel):
|
||||
start_frame: int
|
||||
|
||||
@@ -301,6 +301,16 @@ def test_create_table(mem_db: DBConnection):
|
||||
assert expected == tbl
|
||||
|
||||
|
||||
def test_create_table_rejects_single_dictionary(mem_db: DBConnection):
|
||||
data = {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}
|
||||
with pytest.raises(ValueError) as excep_info:
|
||||
mem_db.create_table("test", data=data)
|
||||
assert (
|
||||
str(excep_info.value) == "Cannot create or add rows from a single dictionary. "
|
||||
"Use a list of dictionaries instead."
|
||||
)
|
||||
|
||||
|
||||
def test_empty_table(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
@@ -330,8 +340,8 @@ def test_add_dictionary(mem_db: DBConnection):
|
||||
with pytest.raises(ValueError) as excep_info:
|
||||
tbl.add(data=data)
|
||||
assert (
|
||||
str(excep_info.value)
|
||||
== "Cannot add a single dictionary to a table. Use a list."
|
||||
str(excep_info.value) == "Cannot create or add rows from a single dictionary. "
|
||||
"Use a list of dictionaries instead."
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -149,6 +149,21 @@ def test_value_to_sql_dict():
|
||||
assert value_to_sql({}) == "named_struct()"
|
||||
|
||||
|
||||
def test_value_to_sql_dict_key_escaping():
|
||||
# Struct field names that contain a single quote must be escaped (doubled)
|
||||
# the same way string values are, otherwise value_to_sql emits invalid SQL
|
||||
# such as named_struct('it's', 1).
|
||||
assert value_to_sql({"it's": 1}) == "named_struct('it''s', 1)"
|
||||
assert (
|
||||
value_to_sql({"o'brien": "d'angelo"}) == "named_struct('o''brien', 'd''angelo')"
|
||||
)
|
||||
# Escaping also applies to keys of nested structs.
|
||||
assert (
|
||||
value_to_sql({"outer": {"in'r": 1}})
|
||||
== "named_struct('outer', named_struct('in''r', 1))"
|
||||
)
|
||||
|
||||
|
||||
def test_value_to_sql_numpy_scalars():
|
||||
# numpy scalars (e.g. pulled from an ndarray or a pandas column) must
|
||||
# convert the same way as their native Python counterparts. np.float64
|
||||
|
||||
@@ -32,6 +32,7 @@ use crate::table::{BaseTable, WriteOptions};
|
||||
|
||||
pub mod listing;
|
||||
pub mod namespace;
|
||||
pub(crate) mod read_freshness;
|
||||
|
||||
pub trait DatabaseOptions {
|
||||
fn serialize_into_map(&self, map: &mut HashMap<String, String>);
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! Namespace-based database implementation that delegates table management to lance-namespace
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
|
||||
@@ -29,6 +29,9 @@ use crate::database::listing::{
|
||||
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
|
||||
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
|
||||
};
|
||||
use crate::database::read_freshness::{
|
||||
FreshnessBaselines, ReadFreshnessContextProvider, TableFreshness,
|
||||
};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::{NativeTable, map_namespace_lance_error};
|
||||
use lance::dataset::WriteMode;
|
||||
@@ -51,6 +54,10 @@ fn is_table_already_exists_namespace_error(err: &lance::Error) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Object-id delimiter default (matches `RestNamespaceBuilder`'s); overridable
|
||||
/// via the `delimiter` property.
|
||||
const DEFAULT_NAMESPACE_DELIMITER: &str = "$";
|
||||
|
||||
/// A database implementation that uses lance-namespace for table management
|
||||
pub struct LanceNamespaceDatabase {
|
||||
namespace: Arc<dyn LanceNamespace>,
|
||||
@@ -70,6 +77,17 @@ pub struct LanceNamespaceDatabase {
|
||||
ns_properties: HashMap<String, String>,
|
||||
// Options for tables created by this connection
|
||||
new_table_config: NewTableConfig,
|
||||
// Per-table read-freshness baselines, shared with the context provider.
|
||||
freshness_baselines: FreshnessBaselines,
|
||||
// Delimiter for building freshness keys; see `table_freshness`.
|
||||
delimiter: String,
|
||||
}
|
||||
|
||||
fn resolve_delimiter(ns_properties: &HashMap<String, String>) -> String {
|
||||
ns_properties
|
||||
.get("delimiter")
|
||||
.cloned()
|
||||
.unwrap_or_else(|| DEFAULT_NAMESPACE_DELIMITER.to_string())
|
||||
}
|
||||
|
||||
impl LanceNamespaceDatabase {
|
||||
@@ -82,6 +100,9 @@ impl LanceNamespaceDatabase {
|
||||
session: Option<Arc<lance::session::Session>>,
|
||||
namespace_client_pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
) -> Self {
|
||||
// Client is pre-built, so we can't install the freshness provider here;
|
||||
// baselines are still tracked for a uniform bump path.
|
||||
let delimiter = resolve_delimiter(&namespace_client_properties);
|
||||
Self {
|
||||
namespace: namespace_client,
|
||||
storage_options,
|
||||
@@ -92,6 +113,8 @@ impl LanceNamespaceDatabase {
|
||||
ns_impl: namespace_client_impl,
|
||||
ns_properties: namespace_client_properties,
|
||||
new_table_config: NewTableConfig::default(),
|
||||
freshness_baselines: Arc::new(Mutex::new(HashMap::new())),
|
||||
delimiter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,10 +159,19 @@ impl LanceNamespaceDatabase {
|
||||
if let Some(ref sess) = session {
|
||||
builder = builder.session(sess.clone());
|
||||
}
|
||||
|
||||
// Install the read-freshness provider before building the client.
|
||||
let freshness_baselines: FreshnessBaselines = Arc::new(Mutex::new(HashMap::new()));
|
||||
builder = builder.context_provider(Arc::new(ReadFreshnessContextProvider::new(
|
||||
freshness_baselines.clone(),
|
||||
read_consistency_interval,
|
||||
)));
|
||||
|
||||
let namespace = builder.connect().await.map_err(|e| Error::InvalidInput {
|
||||
message: format!("Failed to connect to namespace: {:?}", e),
|
||||
})?;
|
||||
|
||||
let delimiter = resolve_delimiter(&ns_properties);
|
||||
Ok(Self {
|
||||
namespace,
|
||||
storage_options,
|
||||
@@ -150,9 +182,20 @@ impl LanceNamespaceDatabase {
|
||||
ns_impl: ns_impl.to_string(),
|
||||
ns_properties,
|
||||
new_table_config,
|
||||
freshness_baselines,
|
||||
delimiter,
|
||||
})
|
||||
}
|
||||
|
||||
/// Build a table's freshness handle, keyed to match the `object_id` the
|
||||
/// namespace client sends on reads (table-id parts joined by the delimiter).
|
||||
fn table_freshness(&self, namespace_path: &[String], name: &str) -> TableFreshness {
|
||||
let mut parts = namespace_path.to_vec();
|
||||
parts.push(name.to_string());
|
||||
let key = parts.join(&self.delimiter);
|
||||
TableFreshness::new(self.freshness_baselines.clone(), key)
|
||||
}
|
||||
|
||||
fn extract_storage_overrides(
|
||||
&self,
|
||||
request: &DbCreateTableRequest,
|
||||
@@ -331,7 +374,8 @@ impl Database for LanceNamespaceDatabase {
|
||||
self.pushdown_operations.clone(),
|
||||
self.session.clone(),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
|
||||
|
||||
return Ok(Arc::new(native_table));
|
||||
}
|
||||
@@ -462,7 +506,8 @@ impl Database for LanceNamespaceDatabase {
|
||||
self.pushdown_operations.clone(),
|
||||
self.session.clone(),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
|
||||
|
||||
Ok(Arc::new(native_table))
|
||||
}
|
||||
@@ -478,7 +523,8 @@ impl Database for LanceNamespaceDatabase {
|
||||
self.pushdown_operations.clone(),
|
||||
self.session.clone(),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
.with_freshness(self.table_freshness(&request.namespace_path, &request.name));
|
||||
|
||||
Ok(Arc::new(native_table))
|
||||
}
|
||||
|
||||
312
rust/lancedb/src/database/read_freshness.rs
Normal file
312
rust/lancedb/src/database/read_freshness.rs
Normal file
@@ -0,0 +1,312 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Read-freshness signaling for the lance-namespace path.
|
||||
//!
|
||||
//! Against a server that serves cached table metadata up to some staleness
|
||||
//! window, a handle that just wrote (or asked for the latest version via
|
||||
//! `checkout_latest`) can still read a stale snapshot. To prevent that, reads
|
||||
//! routed through the namespace client carry an `x-lancedb-min-timestamp`
|
||||
//! header naming the oldest snapshot the caller will accept.
|
||||
//!
|
||||
//! This mirrors `remote::table`: a per-table baseline is bumped to "now" on
|
||||
//! every write and on `checkout_latest()`, and reads send
|
||||
//! `max(baseline, now - read_consistency_interval)`. Since the namespace client
|
||||
//! takes no headers directly, a [`DynamicContextProvider`] injects it per request.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
|
||||
|
||||
/// Provider context keys prefixed with `headers.` become HTTP headers (prefix
|
||||
/// stripped), so this emits the `x-lancedb-min-timestamp` header.
|
||||
const MIN_TIMESTAMP_CONTEXT_KEY: &str = "headers.x-lancedb-min-timestamp";
|
||||
|
||||
/// Per-table freshness baselines (keyed by namespace object id), shared between
|
||||
/// the provider that reads them and the table handles that bump them.
|
||||
pub type FreshnessBaselines = Arc<Mutex<HashMap<String, SystemTime>>>;
|
||||
|
||||
/// `max(baseline, now - interval)`, or `None` when neither constraint applies.
|
||||
fn compute_min_timestamp(
|
||||
baseline: Option<SystemTime>,
|
||||
interval: Option<Duration>,
|
||||
now: SystemTime,
|
||||
) -> Option<SystemTime> {
|
||||
let interval_based = match interval {
|
||||
None => None,
|
||||
Some(d) if d.is_zero() => Some(now),
|
||||
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
||||
};
|
||||
match (interval_based, baseline) {
|
||||
(None, None) => None,
|
||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||
(Some(a), Some(b)) => Some(a.max(b)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the baseline to `now`, never backwards, so a concurrent handle's
|
||||
/// write can't lower a floor another handle already set.
|
||||
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
|
||||
match prev {
|
||||
Some(p) => p.max(now),
|
||||
None => now,
|
||||
}
|
||||
}
|
||||
|
||||
/// A handle's view of the shared baseline map for a single table.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TableFreshness {
|
||||
baselines: FreshnessBaselines,
|
||||
/// Namespace object id for this table (matches the read's `object_id`).
|
||||
key: String,
|
||||
}
|
||||
|
||||
impl TableFreshness {
|
||||
pub fn new(baselines: FreshnessBaselines, key: String) -> Self {
|
||||
Self { baselines, key }
|
||||
}
|
||||
|
||||
pub fn bump(&self) {
|
||||
let now = SystemTime::now();
|
||||
let mut baselines = self.baselines.lock().unwrap();
|
||||
let prev = baselines.get(&self.key).copied();
|
||||
baselines.insert(self.key.clone(), next_freshness_baseline(prev, now));
|
||||
}
|
||||
}
|
||||
|
||||
/// Read ops that can be served stale and so carry the freshness floor.
|
||||
/// `list_table_versions` resolves "latest" for managed-versioning tables, so it
|
||||
/// is what makes `checkout_latest()` observe a prior write.
|
||||
fn is_read_operation(operation: &str) -> bool {
|
||||
matches!(
|
||||
operation,
|
||||
"describe_table" | "list_table_versions" | "query_table" | "list_tables"
|
||||
)
|
||||
}
|
||||
|
||||
/// Injects `x-lancedb-min-timestamp` on namespace reads, per addressed table.
|
||||
#[derive(Debug)]
|
||||
pub struct ReadFreshnessContextProvider {
|
||||
baselines: FreshnessBaselines,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
impl ReadFreshnessContextProvider {
|
||||
pub fn new(baselines: FreshnessBaselines, read_consistency_interval: Option<Duration>) -> Self {
|
||||
Self {
|
||||
baselines,
|
||||
read_consistency_interval,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DynamicContextProvider for ReadFreshnessContextProvider {
|
||||
fn provide_context(&self, info: &OperationInfo) -> HashMap<String, String> {
|
||||
if !is_read_operation(&info.operation) {
|
||||
return HashMap::new();
|
||||
}
|
||||
|
||||
let baseline = self.baselines.lock().unwrap().get(&info.object_id).copied();
|
||||
match compute_min_timestamp(baseline, self.read_consistency_interval, SystemTime::now()) {
|
||||
Some(ts) => {
|
||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||
HashMap::from([(MIN_TIMESTAMP_CONTEXT_KEY.to_string(), dt.to_rfc3339())])
|
||||
}
|
||||
None => HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Allowed slop when comparing a header timestamp against a locally
|
||||
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
|
||||
const TOLERANCE: Duration = Duration::from_secs(1);
|
||||
|
||||
fn parse_header_ts(headers: &HashMap<String, String>) -> SystemTime {
|
||||
let value = headers
|
||||
.get(MIN_TIMESTAMP_CONTEXT_KEY)
|
||||
.expect("expected min-timestamp context key");
|
||||
chrono::DateTime::parse_from_rfc3339(value)
|
||||
.unwrap()
|
||||
.with_timezone(&chrono::Utc)
|
||||
.into()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
||||
let now = SystemTime::now();
|
||||
let baseline = now - Duration::from_secs(60);
|
||||
|
||||
// No interval, no baseline -> no header.
|
||||
assert_eq!(compute_min_timestamp(None, None, now), None);
|
||||
|
||||
// Baseline only -> baseline.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(Some(baseline), None, now),
|
||||
Some(baseline)
|
||||
);
|
||||
|
||||
// ZERO interval, no baseline -> now (strong consistency).
|
||||
assert_eq!(
|
||||
compute_min_timestamp(None, Some(Duration::ZERO), now),
|
||||
Some(now)
|
||||
);
|
||||
|
||||
// Positive interval, no baseline -> now - interval.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(None, Some(Duration::from_secs(10)), now),
|
||||
Some(now - Duration::from_secs(10))
|
||||
);
|
||||
|
||||
// Both: pick the more-recent (tighter) constraint.
|
||||
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(Some(baseline), Some(Duration::from_secs(10)), now),
|
||||
Some(now - Duration::from_secs(10))
|
||||
);
|
||||
|
||||
// Both, baseline newer: pick baseline.
|
||||
let recent_baseline = now - Duration::from_secs(5);
|
||||
assert_eq!(
|
||||
compute_min_timestamp(Some(recent_baseline), Some(Duration::from_secs(60)), now),
|
||||
Some(recent_baseline)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_next_freshness_baseline_is_monotonic() {
|
||||
let now = SystemTime::now();
|
||||
let earlier = now - Duration::from_secs(30);
|
||||
let later = now + Duration::from_secs(30);
|
||||
|
||||
// No prior baseline -> now.
|
||||
assert_eq!(next_freshness_baseline(None, now), now);
|
||||
// Prior baseline older than now -> now.
|
||||
assert_eq!(next_freshness_baseline(Some(earlier), now), now);
|
||||
// Prior baseline newer than now -> keep the newer baseline.
|
||||
assert_eq!(next_freshness_baseline(Some(later), now), later);
|
||||
}
|
||||
|
||||
fn provider_with(
|
||||
entries: &[(&str, SystemTime)],
|
||||
interval: Option<Duration>,
|
||||
) -> ReadFreshnessContextProvider {
|
||||
let map: HashMap<String, SystemTime> =
|
||||
entries.iter().map(|(k, v)| (k.to_string(), *v)).collect();
|
||||
ReadFreshnessContextProvider::new(Arc::new(Mutex::new(map)), interval)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_provider_emits_header_at_or_after_bumped_baseline() {
|
||||
// A baseline set "now" with no interval: every read op must carry a
|
||||
// floor at or after that baseline. `list_table_versions` is the hook
|
||||
// that makes managed-versioning `checkout_latest()` observe a write.
|
||||
let baseline = SystemTime::now();
|
||||
let provider = provider_with(&[("ns$tbl", baseline)], None);
|
||||
|
||||
// These ops are keyed by the table id, so they pick up the per-table
|
||||
// baseline. (`list_tables` is keyed by the namespace, so it is covered
|
||||
// separately by the interval-floor test.)
|
||||
for op in ["describe_table", "list_table_versions", "query_table"] {
|
||||
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
|
||||
let sent = parse_header_ts(&ctx);
|
||||
assert!(
|
||||
sent >= baseline - TOLERANCE && sent <= baseline + TOLERANCE,
|
||||
"operation {op} should carry a floor at the bumped baseline"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_provider_list_tables_uses_interval_floor_not_table_baseline() {
|
||||
// `list_tables` is addressed by the namespace id, which never matches a
|
||||
// per-table baseline key, so a bumped table baseline must not leak onto
|
||||
// it. With no interval it sends nothing; with one it sends now-interval.
|
||||
let provider = provider_with(&[("ns$tbl", SystemTime::now())], None);
|
||||
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
|
||||
assert!(
|
||||
ctx.is_empty(),
|
||||
"list_tables must not inherit a per-table baseline"
|
||||
);
|
||||
|
||||
let interval = Duration::from_secs(30);
|
||||
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(interval));
|
||||
let before = SystemTime::now();
|
||||
let ctx = provider.provide_context(&OperationInfo::new("list_tables", "ns"));
|
||||
let after = SystemTime::now();
|
||||
let sent = parse_header_ts(&ctx);
|
||||
assert!(
|
||||
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
|
||||
"list_tables should carry the interval floor"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_provider_no_header_for_empty_baseline_and_no_interval() {
|
||||
// Manual consistency (no interval) on a table that was never bumped:
|
||||
// no floor, so the server may serve from cache.
|
||||
let provider = provider_with(&[], None);
|
||||
let ctx = provider.provide_context(&OperationInfo::new("describe_table", "ns$tbl"));
|
||||
assert!(ctx.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_provider_interval_floor_applies_without_baseline() {
|
||||
// With a consistency interval and no baseline, the floor is now-interval.
|
||||
let interval = Duration::from_secs(30);
|
||||
let provider = provider_with(&[], Some(interval));
|
||||
|
||||
let before = SystemTime::now();
|
||||
let ctx = provider.provide_context(&OperationInfo::new("query_table", "ns$tbl"));
|
||||
let after = SystemTime::now();
|
||||
|
||||
let sent = parse_header_ts(&ctx);
|
||||
assert!(
|
||||
sent >= before - interval - TOLERANCE && sent <= after - interval + TOLERANCE,
|
||||
"expected floor at roughly now - interval"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_provider_non_read_ops_emit_nothing() {
|
||||
// Even with a fresh baseline and a zero interval, a non-read operation
|
||||
// (which establishes rather than consumes a baseline) sends no header.
|
||||
let provider = provider_with(&[("ns$tbl", SystemTime::now())], Some(Duration::ZERO));
|
||||
for op in [
|
||||
"create_table",
|
||||
"register_table",
|
||||
"drop_table",
|
||||
"rename_table",
|
||||
// Pinned to an immutable version, so it cannot be served stale.
|
||||
"describe_table_version",
|
||||
] {
|
||||
let ctx = provider.provide_context(&OperationInfo::new(op, "ns$tbl"));
|
||||
assert!(
|
||||
ctx.is_empty(),
|
||||
"operation {op} must not send a freshness header"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_provider_uses_per_table_baseline() {
|
||||
// The floor is looked up by object id, so an unrelated table's baseline
|
||||
// does not leak onto another table's read.
|
||||
let baseline = SystemTime::now();
|
||||
let provider = provider_with(&[("ns$has_baseline", baseline)], None);
|
||||
|
||||
// The bumped table gets a header.
|
||||
let hit =
|
||||
provider.provide_context(&OperationInfo::new("describe_table", "ns$has_baseline"));
|
||||
assert!(!hit.is_empty());
|
||||
|
||||
// A different table with no baseline (and no interval) gets nothing.
|
||||
let miss = provider.provide_context(&OperationInfo::new("describe_table", "ns$other"));
|
||||
assert!(miss.is_empty());
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,7 @@ use serde_json::{Value, json};
|
||||
use super::EmbeddingFunction;
|
||||
use crate::{Error, Result};
|
||||
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::runtime::{Handle, RuntimeFlavor};
|
||||
use tokio::task::block_in_place;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -148,6 +148,12 @@ impl BedrockEmbeddingFunction {
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
// Bedrock's SDK is async but this trait method is synchronous, so we
|
||||
// bridge with `block_in_place` + `block_on`. That requires a
|
||||
// multi-threaded Tokio runtime; return a typed error instead of
|
||||
// panicking when no compatible runtime is available.
|
||||
let handle = current_multi_thread_handle()?;
|
||||
|
||||
for text in texts {
|
||||
let request_body = match self.model {
|
||||
BedrockEmbeddingModel::TitanEmbedding => {
|
||||
@@ -163,24 +169,28 @@ impl BedrockEmbeddingFunction {
|
||||
}
|
||||
};
|
||||
|
||||
// Serialize before entering the blocking section so a serialization
|
||||
// failure surfaces as a typed error rather than an `unwrap` panic.
|
||||
let body = serde_json::to_vec(&request_body).map_err(|e| Error::Runtime {
|
||||
message: format!("Failed to serialize Bedrock request: {e}"),
|
||||
})?;
|
||||
|
||||
let client = self.client.clone();
|
||||
let model_id = self.model.model_id().to_string();
|
||||
let request_body = request_body.clone();
|
||||
|
||||
let response = block_in_place(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let response = block_in_place(|| {
|
||||
handle.block_on(async move {
|
||||
client
|
||||
.invoke_model()
|
||||
.model_id(model_id)
|
||||
.body(aws_sdk_bedrockruntime::primitives::Blob::new(
|
||||
serde_json::to_vec(&request_body).unwrap(),
|
||||
))
|
||||
.body(aws_sdk_bedrockruntime::primitives::Blob::new(body))
|
||||
.send()
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Bedrock invoke_model request failed: {e}"),
|
||||
})
|
||||
})
|
||||
})
|
||||
.unwrap();
|
||||
})?;
|
||||
|
||||
let response_json: Value =
|
||||
serde_json::from_slice(response.body.as_ref()).map_err(|e| Error::Runtime {
|
||||
@@ -188,22 +198,12 @@ impl BedrockEmbeddingFunction {
|
||||
})?;
|
||||
|
||||
let embedding = match self.model {
|
||||
BedrockEmbeddingModel::TitanEmbedding => response_json["embedding"]
|
||||
.as_array()
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: "Missing embedding in response".to_string(),
|
||||
})?
|
||||
.iter()
|
||||
.map(|v| v.as_f64().unwrap() as f32)
|
||||
.collect::<Vec<f32>>(),
|
||||
BedrockEmbeddingModel::CohereLarge => response_json["embeddings"][0]
|
||||
.as_array()
|
||||
.ok_or_else(|| Error::Runtime {
|
||||
message: "Missing embeddings in response".to_string(),
|
||||
})?
|
||||
.iter()
|
||||
.map(|v| v.as_f64().unwrap() as f32)
|
||||
.collect::<Vec<f32>>(),
|
||||
BedrockEmbeddingModel::TitanEmbedding => {
|
||||
json_array_to_f32(&response_json["embedding"], "embedding")?
|
||||
}
|
||||
BedrockEmbeddingModel::CohereLarge => {
|
||||
json_array_to_f32(&response_json["embeddings"][0], "embeddings")?
|
||||
}
|
||||
};
|
||||
|
||||
builder.append_slice(&embedding);
|
||||
@@ -212,3 +212,86 @@ impl BedrockEmbeddingFunction {
|
||||
Ok(builder.finish())
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a handle to the current multi-threaded Tokio runtime, or a typed
|
||||
/// [`Error::Runtime`] when called outside a runtime or on the current-thread
|
||||
/// runtime. This keeps the synchronous-over-async bridge in
|
||||
/// [`BedrockEmbeddingFunction::compute_inner`] from panicking on runtime
|
||||
/// configurations that cannot support `block_in_place`.
|
||||
fn current_multi_thread_handle() -> Result<Handle> {
|
||||
let handle = Handle::try_current().map_err(|e| Error::Runtime {
|
||||
message: format!("Bedrock embedding must be called from within a Tokio runtime: {e}"),
|
||||
})?;
|
||||
if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
|
||||
return Err(Error::Runtime {
|
||||
message: "Bedrock embedding requires a multi-threaded Tokio runtime; the \
|
||||
current-thread runtime cannot use `block_in_place`"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
/// Converts a JSON value expected to be an array of numbers into `Vec<f32>`.
|
||||
///
|
||||
/// Returns a typed [`Error::Runtime`] (rather than panicking) when the value is
|
||||
/// not an array or contains a non-numeric element, so malformed provider
|
||||
/// responses degrade gracefully.
|
||||
fn json_array_to_f32(value: &Value, field: &str) -> Result<Vec<f32>> {
|
||||
let arr = value.as_array().ok_or_else(|| Error::Runtime {
|
||||
message: format!("Missing or non-array '{field}' field in Bedrock response"),
|
||||
})?;
|
||||
arr.iter()
|
||||
.map(|v| {
|
||||
v.as_f64().map(|f| f as f32).ok_or_else(|| Error::Runtime {
|
||||
message: format!("Non-numeric value in Bedrock '{field}' embedding: {v}"),
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn json_array_to_f32_parses_numbers() {
|
||||
let v = json!([1.0, 2, -3.5]);
|
||||
let out = json_array_to_f32(&v, "embedding").unwrap();
|
||||
assert_eq!(out, vec![1.0_f32, 2.0, -3.5]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_array_to_f32_rejects_non_array() {
|
||||
// Missing field indexes to `Value::Null`; a malformed payload should be
|
||||
// a typed error, not a panic.
|
||||
let v = json!({"unexpected": "shape"});
|
||||
let err = json_array_to_f32(&v["embedding"], "embedding").unwrap_err();
|
||||
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn json_array_to_f32_rejects_non_numeric_element() {
|
||||
let v = json!([1.0, "not-a-number", 3.0]);
|
||||
let err = json_array_to_f32(&v, "embedding").unwrap_err();
|
||||
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_errors_without_runtime() {
|
||||
// No Tokio runtime in scope -> typed error instead of a panic.
|
||||
let err = current_multi_thread_handle().unwrap_err();
|
||||
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn handle_errors_on_current_thread_runtime() {
|
||||
let err = current_multi_thread_handle().unwrap_err();
|
||||
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn handle_ok_on_multi_thread_runtime() {
|
||||
current_multi_thread_handle().expect("multi-threaded runtime should be accepted");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,12 +184,13 @@ pub mod table;
|
||||
pub mod test_utils;
|
||||
pub mod utils;
|
||||
|
||||
use std::fmt::Display;
|
||||
use std::{fmt::Display, str::FromStr};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use connection::{ConnectNamespaceBuilder, Connection};
|
||||
pub use error::{Error, Result};
|
||||
use lance_index::vector::ApproxMode as LanceApproxMode;
|
||||
use lance_linalg::distance::DistanceType as LanceDistanceType;
|
||||
pub use table::Table;
|
||||
|
||||
@@ -258,6 +259,79 @@ impl Display for DistanceType {
|
||||
}
|
||||
}
|
||||
|
||||
/// Controls the speed / accuracy tradeoff for approximate vector search.
|
||||
///
|
||||
/// This currently only affects RQ-quantized vector indexes, such as IVF_RQ.
|
||||
/// Other index types ignore this setting.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[non_exhaustive]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ApproxMode {
|
||||
/// Prefer lower query latency, which can reduce recall.
|
||||
Fast,
|
||||
/// Use the default balance between query latency and recall.
|
||||
#[default]
|
||||
Normal,
|
||||
/// Prefer higher recall, which can increase query latency.
|
||||
Accurate,
|
||||
}
|
||||
|
||||
impl From<ApproxMode> for LanceApproxMode {
|
||||
fn from(value: ApproxMode) -> Self {
|
||||
match value {
|
||||
ApproxMode::Fast => Self::Fast,
|
||||
ApproxMode::Normal => Self::Normal,
|
||||
ApproxMode::Accurate => Self::Accurate,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LanceApproxMode> for ApproxMode {
|
||||
fn from(value: LanceApproxMode) -> Self {
|
||||
match value {
|
||||
LanceApproxMode::Fast => Self::Fast,
|
||||
LanceApproxMode::Normal => Self::Normal,
|
||||
LanceApproxMode::Accurate => Self::Accurate,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for ApproxMode {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &str) -> std::prelude::v1::Result<Self, Self::Error> {
|
||||
Self::from_str(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ApproxMode {
|
||||
type Err = Error;
|
||||
|
||||
fn from_str(value: &str) -> std::prelude::v1::Result<Self, Self::Err> {
|
||||
match value.to_ascii_lowercase().as_str() {
|
||||
"fast" => Ok(Self::Fast),
|
||||
"normal" => Ok(Self::Normal),
|
||||
"accurate" => Ok(Self::Accurate),
|
||||
_ => Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"approx_mode must be one of 'fast', 'normal', or 'accurate', got '{}'",
|
||||
value
|
||||
),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ApproxMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Fast => write!(f, "fast"),
|
||||
Self::Normal => write!(f, "normal"),
|
||||
Self::Accurate => write!(f, "accurate"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect to a database
|
||||
pub use connection::connect;
|
||||
/// Connect to a namespace-backed database
|
||||
|
||||
@@ -20,12 +20,12 @@ use lance_index::scalar::FullTextSearchQuery;
|
||||
use lance_index::scalar::inverted::SCORE_COL;
|
||||
use lance_index::vector::DIST_COL;
|
||||
|
||||
use crate::DistanceType;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::rerankers::rrf::RRFReranker;
|
||||
use crate::rerankers::{NormalizeMethod, Reranker, check_reranker_result};
|
||||
use crate::table::BaseTable;
|
||||
use crate::utils::{MaxBatchLengthStream, TimeoutStream};
|
||||
use crate::{ApproxMode, DistanceType};
|
||||
use crate::{
|
||||
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
|
||||
table::AnyQuery,
|
||||
@@ -935,6 +935,8 @@ pub struct VectorQueryRequest {
|
||||
pub refine_factor: Option<u32>,
|
||||
/// The distance type to use for the search
|
||||
pub distance_type: Option<DistanceType>,
|
||||
/// The speed / accuracy tradeoff to use for approximate vector search
|
||||
pub approx_mode: Option<ApproxMode>,
|
||||
/// Default is true. Set to false to enforce a brute force search.
|
||||
pub use_index: bool,
|
||||
}
|
||||
@@ -952,6 +954,7 @@ impl Default for VectorQueryRequest {
|
||||
ef: None,
|
||||
refine_factor: None,
|
||||
distance_type: None,
|
||||
approx_mode: None,
|
||||
use_index: true,
|
||||
}
|
||||
}
|
||||
@@ -1192,6 +1195,15 @@ impl VectorQuery {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the speed / accuracy tradeoff for approximate vector search.
|
||||
///
|
||||
/// This setting is currently only used by RQ-quantized indexes, such as
|
||||
/// IVF_RQ. Other index types ignore this setting.
|
||||
pub fn approx_mode(mut self, approx_mode: ApproxMode) -> Self {
|
||||
self.request.approx_mode = Some(approx_mode);
|
||||
self
|
||||
}
|
||||
|
||||
/// If this is called then any vector index is skipped
|
||||
///
|
||||
/// An exhaustive (flat) search will be performed. The query vector will
|
||||
@@ -1546,6 +1558,7 @@ mod tests {
|
||||
.nprobes(1000)
|
||||
.postfilter()
|
||||
.distance_type(DistanceType::Cosine)
|
||||
.approx_mode(ApproxMode::Accurate)
|
||||
.refine_factor(999);
|
||||
|
||||
assert_eq!(
|
||||
@@ -1564,9 +1577,49 @@ mod tests {
|
||||
assert_eq!(query.request.maximum_nprobes, Some(1000));
|
||||
assert!(query.request.use_index);
|
||||
assert_eq!(query.request.distance_type, Some(DistanceType::Cosine));
|
||||
assert_eq!(query.request.approx_mode, Some(ApproxMode::Accurate));
|
||||
assert_eq!(query.request.refine_factor, Some(999));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_approx_mode_serde_parse_default_and_display() {
|
||||
assert_eq!(ApproxMode::default(), ApproxMode::Normal);
|
||||
assert_eq!(
|
||||
serde_json::to_string(&ApproxMode::Fast).unwrap(),
|
||||
"\"fast\""
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::from_str::<ApproxMode>("\"accurate\"").unwrap(),
|
||||
ApproxMode::Accurate
|
||||
);
|
||||
assert_eq!("normal".parse::<ApproxMode>().unwrap(), ApproxMode::Normal);
|
||||
assert_eq!(ApproxMode::try_from("FAST").unwrap(), ApproxMode::Fast);
|
||||
assert_eq!(ApproxMode::Accurate.to_string(), "accurate");
|
||||
assert!(ApproxMode::try_from("invalid").is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_vector_query_approx_mode_builder() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let dataset_path = tmp_dir.path().join("test.lance");
|
||||
let uri = dataset_path.to_str().unwrap();
|
||||
|
||||
let conn = connect(uri).execute().await.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", make_test_batches())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let query = table
|
||||
.query()
|
||||
.nearest_to(&[0.1, 0.2])
|
||||
.unwrap()
|
||||
.approx_mode(ApproxMode::Fast);
|
||||
|
||||
assert_eq!(query.request.approx_mode, Some(ApproxMode::Fast));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute() {
|
||||
// TODO: Switch back to memory://foo after https://github.com/lancedb/lancedb/issues/1051
|
||||
|
||||
@@ -706,6 +706,9 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
if let Some(distance_type) = query.distance_type {
|
||||
body["distance_type"] = serde_json::json!(distance_type);
|
||||
}
|
||||
if let Some(approx_mode) = query.approx_mode {
|
||||
body["approx_mode"] = serde_json::json!(approx_mode);
|
||||
}
|
||||
// In 0.23.1 we migrated from `nprobes` to `minimum_nprobes` and `maximum_nprobes`.
|
||||
// Old client / new server: since minimum_nprobes is missing, fallback to nprobes
|
||||
// New client / old server: old server will only see nprobes, make sure to set both
|
||||
@@ -3610,6 +3613,61 @@ mod tests {
|
||||
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_vector_approx_mode_sent_when_set() {
|
||||
let expected_data = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
|
||||
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
|
||||
)
|
||||
.unwrap();
|
||||
let expected_data_ref = expected_data.clone();
|
||||
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = serde_json::json!({
|
||||
"prefilter": true,
|
||||
"nprobes": 20,
|
||||
"minimum_nprobes": 20,
|
||||
"maximum_nprobes": 20,
|
||||
"approx_mode": "accurate",
|
||||
"lower_bound": Option::<f32>::None,
|
||||
"upper_bound": Option::<f32>::None,
|
||||
"k": 10,
|
||||
"ef": Option::<usize>::None,
|
||||
"refine_factor": null,
|
||||
"version": null,
|
||||
});
|
||||
expected_body["vector"] = vec![0.1f32, 0.2, 0.3].into();
|
||||
assert_eq!(body, expected_body);
|
||||
|
||||
let response_body = write_ipc_file(&expected_data_ref);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, ARROW_FILE_CONTENT_TYPE)
|
||||
.body(response_body)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let data = table
|
||||
.query()
|
||||
.nearest_to(vec![0.1, 0.2, 0.3])
|
||||
.unwrap()
|
||||
.approx_mode(crate::ApproxMode::Accurate)
|
||||
.execute()
|
||||
.await;
|
||||
let data = data.unwrap().collect::<Vec<_>>().await;
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_fts_default_values() {
|
||||
let expected_data = RecordBatch::try_new(
|
||||
|
||||
@@ -43,6 +43,7 @@ use crate::connection::NamespaceClientPushdownOperation;
|
||||
|
||||
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
|
||||
use crate::database::Database;
|
||||
use crate::database::read_freshness::TableFreshness;
|
||||
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::index::IndexStatistics;
|
||||
@@ -1763,6 +1764,8 @@ pub struct NativeTable {
|
||||
// Operations to push down to the namespace server.
|
||||
// pub(crate) so query.rs can access the field for server-side query execution.
|
||||
pub(crate) pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
|
||||
// Read-freshness baseline; `Some` only for namespace-backed tables.
|
||||
freshness: Option<TableFreshness>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for NativeTable {
|
||||
@@ -1923,6 +1926,7 @@ impl NativeTable {
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
pushdown_operations,
|
||||
freshness: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1934,6 +1938,12 @@ impl NativeTable {
|
||||
self
|
||||
}
|
||||
|
||||
/// Attach the read-freshness baseline handle (namespace connections only).
|
||||
pub(crate) fn with_freshness(mut self, freshness: TableFreshness) -> Self {
|
||||
self.freshness = Some(freshness);
|
||||
self
|
||||
}
|
||||
|
||||
/// Build a sibling `NativeTable` with the same identity but a different
|
||||
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
|
||||
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
|
||||
@@ -1946,6 +1956,14 @@ impl NativeTable {
|
||||
read_consistency_interval: self.read_consistency_interval,
|
||||
namespace_client: self.namespace_client.clone(),
|
||||
pushdown_operations: self.pushdown_operations.clone(),
|
||||
freshness: self.freshness.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Bump the read-freshness baseline; no-op for non-namespace tables.
|
||||
fn bump_freshness(&self) {
|
||||
if let Some(freshness) = &self.freshness {
|
||||
freshness.bump();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2045,6 +2063,7 @@ impl NativeTable {
|
||||
read_consistency_interval,
|
||||
namespace_client: stored_namespace_client,
|
||||
pushdown_operations,
|
||||
freshness: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2134,6 +2153,7 @@ impl NativeTable {
|
||||
read_consistency_interval,
|
||||
namespace_client,
|
||||
pushdown_operations,
|
||||
freshness: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2265,6 +2285,7 @@ impl NativeTable {
|
||||
read_consistency_interval,
|
||||
namespace_client: stored_namespace_client,
|
||||
pushdown_operations,
|
||||
freshness: None,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2424,6 +2445,8 @@ impl BaseTable for NativeTable {
|
||||
}
|
||||
|
||||
async fn checkout_latest(&self) -> Result<()> {
|
||||
// Bump before resolving "latest" so that request carries the floor.
|
||||
self.bump_freshness();
|
||||
self.dataset.as_latest().await?;
|
||||
self.dataset.reload().await
|
||||
}
|
||||
@@ -2511,6 +2534,8 @@ impl BaseTable for NativeTable {
|
||||
debug_assert_eq!(dataset.version().version, version);
|
||||
dataset.restore().await?;
|
||||
}
|
||||
// Restore moves "latest", so bump before resolving it (as RemoteTable does).
|
||||
self.bump_freshness();
|
||||
self.dataset.as_latest().await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -2591,7 +2616,13 @@ impl BaseTable for NativeTable {
|
||||
output.plan
|
||||
};
|
||||
|
||||
let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
|
||||
let insert_exec = Arc::new(InsertExec::new_with_tracker(
|
||||
ds_wrapper.clone(),
|
||||
ds,
|
||||
plan,
|
||||
lance_params,
|
||||
output.tracker.clone(),
|
||||
));
|
||||
|
||||
let tracker_for_tasks = output.tracker.clone();
|
||||
if let Some(ref t) = tracker_for_tasks {
|
||||
@@ -2624,6 +2655,7 @@ impl BaseTable for NativeTable {
|
||||
}
|
||||
|
||||
let version = ds_wrapper.get().await?.manifest().version;
|
||||
self.bump_freshness();
|
||||
Ok(AddResult { version })
|
||||
}
|
||||
|
||||
@@ -2674,7 +2706,9 @@ impl BaseTable for NativeTable {
|
||||
|
||||
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult> {
|
||||
// Delegate to the submodule implementation
|
||||
update::execute_update(self, update).await
|
||||
let result = update::execute_update(self, update).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn create_plan(
|
||||
@@ -2706,7 +2740,9 @@ impl BaseTable for NativeTable {
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<MergeResult> {
|
||||
merge::execute_merge_insert(self, params, new_data).await
|
||||
let result = merge::execute_merge_insert(self, params, new_data).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn set_unenforced_primary_key(&self, columns: &[&str]) -> Result<()> {
|
||||
@@ -2727,7 +2763,9 @@ impl BaseTable for NativeTable {
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
||||
delete::execute_delete(self, predicate).await
|
||||
let result = delete::execute_delete(self, predicate).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
@@ -2746,22 +2784,30 @@ impl BaseTable for NativeTable {
|
||||
transforms: NewColumnTransform,
|
||||
read_columns: Option<Vec<String>>,
|
||||
) -> Result<AddColumnsResult> {
|
||||
schema_evolution::execute_add_columns(self, transforms, read_columns).await
|
||||
let result = schema_evolution::execute_add_columns(self, transforms, read_columns).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<AlterColumnsResult> {
|
||||
schema_evolution::execute_alter_columns(self, alterations).await
|
||||
let result = schema_evolution::execute_alter_columns(self, alterations).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
schema_evolution::execute_update_field_metadata(self, updates).await
|
||||
let result = schema_evolution::execute_update_field_metadata(self, updates).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
schema_evolution::execute_drop_columns(self, columns).await
|
||||
let result = schema_evolution::execute_drop_columns(self, columns).await?;
|
||||
self.bump_freshness();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//! DataFusion ExecutionPlan for inserting data into LanceDB tables.
|
||||
|
||||
use std::any::Any;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, LazyLock, Mutex};
|
||||
|
||||
use arrow_array::{RecordBatch, UInt64Array};
|
||||
@@ -20,11 +21,12 @@ use datafusion_physical_plan::{
|
||||
use futures::TryStreamExt;
|
||||
use lance::Dataset;
|
||||
use lance::dataset::transaction::{Operation, Transaction};
|
||||
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams};
|
||||
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams, WriteProgressFn};
|
||||
use lance::io::exec::utils::InstrumentedRecordBatchStreamAdapter;
|
||||
use lance_table::format::Fragment;
|
||||
|
||||
use crate::table::dataset::DatasetConsistencyWrapper;
|
||||
use crate::table::write_progress::WriteProgressTracker;
|
||||
|
||||
pub(crate) static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
|
||||
Arc::new(ArrowSchema::new(vec![Field::new(
|
||||
@@ -81,6 +83,7 @@ pub struct InsertExec {
|
||||
dataset: Arc<Dataset>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
write_params: WriteParams,
|
||||
tracker: Option<Arc<WriteProgressTracker>>,
|
||||
properties: Arc<PlanProperties>,
|
||||
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
|
||||
metrics: ExecutionPlanMetricsSet,
|
||||
@@ -92,6 +95,16 @@ impl InsertExec {
|
||||
dataset: Arc<Dataset>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
write_params: WriteParams,
|
||||
) -> Self {
|
||||
Self::new_with_tracker(ds_wrapper, dataset, input, write_params, None)
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_tracker(
|
||||
ds_wrapper: DatasetConsistencyWrapper,
|
||||
dataset: Arc<Dataset>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
write_params: WriteParams,
|
||||
tracker: Option<Arc<WriteProgressTracker>>,
|
||||
) -> Self {
|
||||
let schema = COUNT_SCHEMA.clone();
|
||||
let num_partitions = input.output_partitioning().partition_count();
|
||||
@@ -107,6 +120,7 @@ impl InsertExec {
|
||||
dataset,
|
||||
input,
|
||||
write_params,
|
||||
tracker,
|
||||
properties: Arc::new(properties),
|
||||
partial_transactions: Arc::new(Mutex::new(Vec::with_capacity(num_partitions))),
|
||||
metrics: ExecutionPlanMetricsSet::new(),
|
||||
@@ -161,11 +175,12 @@ impl ExecutionPlan for InsertExec {
|
||||
"InsertExec requires exactly one child".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(Arc::new(Self::new(
|
||||
Ok(Arc::new(Self::new_with_tracker(
|
||||
self.ds_wrapper.clone(),
|
||||
self.dataset.clone(),
|
||||
children[0].clone(),
|
||||
self.write_params.clone(),
|
||||
self.tracker.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
@@ -176,10 +191,11 @@ impl ExecutionPlan for InsertExec {
|
||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||
let input_stream = self.input.execute(partition, context)?;
|
||||
let dataset = self.dataset.clone();
|
||||
let write_params = self.write_params.clone();
|
||||
let mut write_params = self.write_params.clone();
|
||||
let partial_transactions = self.partial_transactions.clone();
|
||||
let total_partitions = self.input.output_partitioning().partition_count();
|
||||
let ds_wrapper = self.ds_wrapper.clone();
|
||||
let tracker = self.tracker.clone();
|
||||
|
||||
let output_bytes = MetricBuilder::new(&self.metrics).output_bytes(partition);
|
||||
let input_schema = input_stream.schema();
|
||||
@@ -195,6 +211,20 @@ impl ExecutionPlan for InsertExec {
|
||||
));
|
||||
|
||||
let stream = futures::stream::once(async move {
|
||||
if let Some(tracker) = tracker
|
||||
&& write_params.write_progress.is_none()
|
||||
{
|
||||
let last_bytes = Arc::new(AtomicU64::new(0));
|
||||
write_params.write_progress = Some(WriteProgressFn::new(move |stats| {
|
||||
let previous = last_bytes.swap(stats.bytes_written, Ordering::Relaxed);
|
||||
if stats.bytes_written > previous {
|
||||
let delta =
|
||||
usize::try_from(stats.bytes_written - previous).unwrap_or(usize::MAX);
|
||||
tracker.record_bytes(delta);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
let transaction = InsertBuilder::new(dataset.clone())
|
||||
.with_params(&write_params)
|
||||
.execute_uncommitted_stream(input_stream)
|
||||
|
||||
@@ -518,6 +518,10 @@ mod tests {
|
||||
|
||||
let wrapper = DatasetConsistencyWrapper::new_latest(ds, Some(Duration::from_millis(200)));
|
||||
|
||||
// Freeze `cached_at` on the mock clock so a slow external write below can't
|
||||
// expire the TTL before the explicit advance_by() does (flake on loaded CI).
|
||||
clock::pin();
|
||||
|
||||
// Populate the cache
|
||||
let v1 = wrapper.get().await.unwrap().version().version;
|
||||
assert_eq!(v1, 1);
|
||||
|
||||
@@ -44,17 +44,35 @@ pub async fn execute_query(
|
||||
// QueryTable pushdown runs the query server-side, but only on the main
|
||||
// branch: the namespace request carries no branch yet, so a branch handle
|
||||
// must fall through to local execution.
|
||||
if table
|
||||
.pushdown_operations
|
||||
.contains(&NamespaceClientPushdownOperation::QueryTable)
|
||||
if can_execute_namespace_query(table, query)
|
||||
&& let Some(ref namespace_client) = table.namespace_client
|
||||
&& table.dataset.current_branch().is_none()
|
||||
{
|
||||
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
|
||||
}
|
||||
execute_generic_query(table, query, options).await
|
||||
}
|
||||
|
||||
fn can_execute_namespace_query(table: &NativeTable, query: &AnyQuery) -> bool {
|
||||
table
|
||||
.pushdown_operations
|
||||
.contains(&NamespaceClientPushdownOperation::QueryTable)
|
||||
&& table.namespace_client.is_some()
|
||||
&& table.dataset.current_branch().is_none()
|
||||
&& !requires_local_namespace_execution(query)
|
||||
}
|
||||
|
||||
fn requires_local_namespace_execution(query: &AnyQuery) -> bool {
|
||||
// The namespace QueryTable request has no approx_mode field yet, so
|
||||
// pushing this query down would silently ignore the user's setting.
|
||||
matches!(
|
||||
query,
|
||||
AnyQuery::VectorQuery(VectorQueryRequest {
|
||||
approx_mode: Some(_),
|
||||
..
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn analyze_query_plan(
|
||||
table: &NativeTable,
|
||||
query: &AnyQuery,
|
||||
@@ -167,6 +185,10 @@ pub async fn create_plan(
|
||||
scanner.nearest(&column, query_vector.as_ref(), top_k)?;
|
||||
}
|
||||
|
||||
if let Some(approx_mode) = query.approx_mode {
|
||||
scanner.approx_mode(approx_mode.into());
|
||||
}
|
||||
|
||||
scanner.minimum_nprobes(query.minimum_nprobes);
|
||||
if let Some(maximum_nprobes) = query.maximum_nprobes {
|
||||
scanner.maximum_nprobes(maximum_nprobes);
|
||||
@@ -587,12 +609,20 @@ async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result<DatasetRecordBa
|
||||
#[cfg(test)]
|
||||
#[allow(deprecated)]
|
||||
mod tests {
|
||||
use arrow_array::Float32Array;
|
||||
use arrow_array::{ArrayRef, FixedSizeListArray, Float32Array};
|
||||
use futures::TryStreamExt;
|
||||
use std::sync::Arc;
|
||||
use lance_arrow::FixedSizeListArrayExt;
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::query::QueryExecutionOptions;
|
||||
use crate::query::{QueryExecutionOptions, QueryRequest};
|
||||
|
||||
fn fixed_size_list_array(values: Vec<f32>, dimension: i32) -> FixedSizeListArray {
|
||||
FixedSizeListArray::try_new_from_values(Float32Array::from(values), dimension).unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_to_namespace_query_vector() {
|
||||
@@ -715,6 +745,80 @@ mod tests {
|
||||
assert_eq!(count, 2); // 4 and 5
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct CountingNamespaceClient {
|
||||
query_table_calls: AtomicUsize,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl LanceNamespace for CountingNamespaceClient {
|
||||
fn namespace_id(&self) -> String {
|
||||
"counting".to_string()
|
||||
}
|
||||
|
||||
async fn query_table(&self, _request: NsQueryTableRequest) -> lance::Result<bytes::Bytes> {
|
||||
self.query_table_calls.fetch_add(1, Ordering::SeqCst);
|
||||
panic!("approx_mode queries must not be pushed down to namespace query_table");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_query_approx_mode_with_namespace_pushdown_runs_locally() {
|
||||
use crate::connect;
|
||||
use crate::table::query::execute_query;
|
||||
use arrow_array::{Int32Array, RecordBatch};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
let vectors = Arc::new(fixed_size_list_array(
|
||||
vec![0.0, 0.0, 10.0, 10.0, 20.0, 20.0],
|
||||
2,
|
||||
));
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("vector", vectors.data_type().clone(), false),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![Arc::new(Int32Array::from(vec![1, 2, 3])), vectors],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("test_approx_mode_namespace_fallback", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let namespace_client = Arc::new(CountingNamespaceClient::default());
|
||||
let mut native_table = table.as_native().unwrap().clone();
|
||||
native_table.namespace_client = Some(namespace_client.clone());
|
||||
native_table
|
||||
.pushdown_operations
|
||||
.insert(NamespaceClientPushdownOperation::QueryTable);
|
||||
|
||||
let query_vector = Arc::new(Float32Array::from(vec![0.0, 0.0]));
|
||||
let query = AnyQuery::VectorQuery(VectorQueryRequest {
|
||||
base: QueryRequest {
|
||||
limit: Some(1),
|
||||
..Default::default()
|
||||
},
|
||||
column: Some("vector".to_string()),
|
||||
query_vector: vec![query_vector as ArrayRef],
|
||||
approx_mode: Some(crate::ApproxMode::Accurate),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let stream = execute_query(&native_table, &query, QueryExecutionOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
|
||||
let count: usize = batches.iter().map(|b| b.num_rows()).sum();
|
||||
|
||||
assert_eq!(count, 1);
|
||||
assert_eq!(namespace_client.query_table_calls.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_plan_multivector_structure() {
|
||||
use arrow_array::{Float32Array, RecordBatch};
|
||||
@@ -779,4 +883,97 @@ mod tests {
|
||||
"Plan should add query_index column"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_plan_applies_approx_mode_to_ann_query() {
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use datafusion_physical_plan::ExecutionPlan;
|
||||
use lance::io::exec::{ANNIvfPartitionExec, ANNIvfSubIndexExec};
|
||||
use lance_index::vector::ApproxMode;
|
||||
|
||||
use crate::connect;
|
||||
use crate::index::{Index, vector::IvfRqIndexBuilder};
|
||||
use crate::table::query::create_plan;
|
||||
|
||||
fn find_ann_approx_mode(plan: &dyn ExecutionPlan) -> Option<ApproxMode> {
|
||||
if let Some(ann) = plan.as_any().downcast_ref::<ANNIvfSubIndexExec>() {
|
||||
return Some(ann.query().approx_mode);
|
||||
}
|
||||
if let Some(ann) = plan.as_any().downcast_ref::<ANNIvfPartitionExec>() {
|
||||
return Some(ann.query.approx_mode);
|
||||
}
|
||||
plan.children()
|
||||
.into_iter()
|
||||
.find_map(|child| find_ann_approx_mode(child.as_ref()))
|
||||
}
|
||||
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let dimension = 8;
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new(
|
||||
"vector",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
dimension,
|
||||
),
|
||||
false,
|
||||
),
|
||||
]));
|
||||
|
||||
let vectors = Arc::new(fixed_size_list_array(
|
||||
(0..512 * dimension)
|
||||
.map(|value| value as f32 / dimension as f32)
|
||||
.collect(),
|
||||
dimension,
|
||||
));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema,
|
||||
vec![
|
||||
Arc::new(arrow_array::Int32Array::from_iter_values(0..512)),
|
||||
vectors,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("test_approx_mode_plan", vec![batch])
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(
|
||||
&["vector"],
|
||||
Index::IvfRq(
|
||||
IvfRqIndexBuilder::default()
|
||||
.num_partitions(1)
|
||||
.sample_rate(1)
|
||||
.max_iterations(1)
|
||||
.num_bits(1),
|
||||
),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let native_table = table.as_native().unwrap();
|
||||
let query_vector = Arc::new(Float32Array::from(vec![0.0; dimension as usize]));
|
||||
let query = AnyQuery::VectorQuery(VectorQueryRequest {
|
||||
column: Some("vector".to_string()),
|
||||
query_vector: vec![query_vector as ArrayRef],
|
||||
base: QueryRequest {
|
||||
limit: Some(1),
|
||||
..Default::default()
|
||||
},
|
||||
approx_mode: Some(crate::ApproxMode::Accurate),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let plan = create_plan(native_table, &query, QueryExecutionOptions::default())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
find_ann_approx_mode(plan.as_ref()),
|
||||
Some(ApproxMode::Accurate)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,11 +142,21 @@ impl WriteProgressTracker {
|
||||
cb(&progress);
|
||||
}
|
||||
|
||||
/// Record wire bytes from the insert layer (e.g. IPC-encoded bytes for
|
||||
/// remote writes). When wire bytes are recorded, they take precedence over
|
||||
/// the in-memory Arrow bytes tracked by [`record_batch`].
|
||||
/// Record wire bytes from the insert layer.
|
||||
///
|
||||
/// These bytes may be IPC-encoded bytes for remote writes or bytes handed
|
||||
/// to Lance's local writer. When wire bytes are recorded, they take
|
||||
/// precedence over the in-memory Arrow bytes tracked by [`record_batch`].
|
||||
pub fn record_bytes(&self, bytes: usize) {
|
||||
self.wire_bytes.fetch_add(bytes, Ordering::Relaxed);
|
||||
let mut cb = self.callback.lock().unwrap_or_else(|e| e.into_inner());
|
||||
let guard = self
|
||||
.rows_and_bytes
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner());
|
||||
let progress = self.snapshot(guard.0, guard.1, false);
|
||||
drop(guard);
|
||||
cb(&progress);
|
||||
}
|
||||
|
||||
/// Emit the final progress callback indicating the write is complete.
|
||||
@@ -169,8 +179,6 @@ impl WriteProgressTracker {
|
||||
let wire = self.wire_bytes.load(Ordering::Relaxed);
|
||||
// Prefer wire bytes (actual I/O size) when the insert layer is
|
||||
// tracking them; fall back to in-memory Arrow size otherwise.
|
||||
// TODO: for local writes, track actual bytes written by Lance
|
||||
// instead of using in-memory Arrow size as a proxy.
|
||||
let output_bytes = if wire > 0 { wire } else { in_memory_bytes };
|
||||
WriteProgress {
|
||||
elapsed: self.start.elapsed(),
|
||||
@@ -383,6 +391,54 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_progress_uses_lance_write_bytes_for_local_tables() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let db = connect(dir.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let table = db
|
||||
.create_table("local_write_bytes", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let new_data = record_batch!(("id", Int32, [4, 5, 6])).unwrap();
|
||||
let in_memory_bytes = new_data.get_array_memory_size();
|
||||
let final_bytes = Arc::new(AtomicUsize::new(0));
|
||||
let seen_non_memory_bytes = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let final_bytes_cb = final_bytes.clone();
|
||||
let seen_non_memory_bytes_cb = seen_non_memory_bytes.clone();
|
||||
|
||||
table
|
||||
.add(new_data)
|
||||
.write_parallelism(1)
|
||||
.progress(move |p| {
|
||||
if p.output_bytes() > 0 && p.output_bytes() != in_memory_bytes {
|
||||
seen_non_memory_bytes_cb.store(true, Ordering::SeqCst);
|
||||
}
|
||||
if p.done() {
|
||||
final_bytes_cb.store(p.output_bytes(), Ordering::SeqCst);
|
||||
}
|
||||
})
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
seen_non_memory_bytes.load(Ordering::SeqCst),
|
||||
"progress should report Lance writer bytes, not only Arrow memory bytes"
|
||||
);
|
||||
assert_ne!(
|
||||
final_bytes.load(Ordering::SeqCst),
|
||||
in_memory_bytes,
|
||||
"final progress bytes should come from Lance write stats"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_batch_recovers_from_poisoned_callback_lock() {
|
||||
use super::{ProgressCallback, WriteProgressTracker};
|
||||
|
||||
@@ -329,6 +329,15 @@ pub mod clock {
|
||||
});
|
||||
}
|
||||
|
||||
/// Start mock time at the current instant if not already pinned.
|
||||
pub fn pin() {
|
||||
MOCK_NOW.with(|mock| {
|
||||
if mock.get().is_none() {
|
||||
mock.set(Some(Instant::now()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn clear_mock() {
|
||||
MOCK_NOW.with(|mock| mock.set(None));
|
||||
|
||||
Reference in New Issue
Block a user