Compare commits

..

1 Commits

Author SHA1 Message Date
lancedb automation
999b31b4a6 chore: update lance dependency to v8.0.0-beta.18 2026-06-17 21:38:42 +00:00
57 changed files with 188 additions and 4413 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.31.0-beta.3"
current_version = "0.30.1-beta.2"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
@@ -23,8 +23,6 @@ allow_dirty = true
commit = true
message = "Bump version: {current_version} → {new_version}"
commit_args = ""
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
allow_shell_hooks = true
# Java maven files
pre_commit_hooks = [

116
Cargo.lock generated
View File

@@ -1472,9 +1472,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.12.0"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae3f5d315924270530207e2a68396c3cc547f6dca3fbdca317cfb1a51edb593"
checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33"
[[package]]
name = "bytes-utils"
@@ -3432,8 +3432,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4735,8 +4735,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arc-swap",
"arrow",
@@ -4771,7 +4771,7 @@ dependencies = [
"futures",
"half",
"humantime",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-arrow",
"lance-core",
"lance-datafusion",
@@ -4810,8 +4810,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4832,7 +4832,7 @@ dependencies = [
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4846,7 +4846,7 @@ dependencies = [
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4855,8 +4855,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrayref",
"paste",
@@ -4865,8 +4865,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4878,7 +4878,7 @@ dependencies = [
"datafusion-common",
"datafusion-sql",
"futures",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-arrow",
"lance-derive",
"libc",
@@ -4904,8 +4904,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow",
"arrow-array",
@@ -4935,8 +4935,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow",
"arrow-array",
@@ -4953,8 +4953,8 @@ dependencies = [
[[package]]
name = "lance-derive"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"proc-macro2",
"quote",
@@ -4963,8 +4963,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4980,7 +4980,7 @@ dependencies = [
"futures",
"hex",
"hyperloglogplus",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-arrow",
"lance-bitpacking",
"lance-core",
@@ -4999,8 +4999,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -5030,8 +5030,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arc-swap",
"arrow",
@@ -5056,7 +5056,7 @@ dependencies = [
"fst",
"futures",
"half",
"itertools 0.14.0",
"itertools 0.13.0",
"jieba-rs",
"jsonb",
"lance-arrow",
@@ -5096,8 +5096,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow",
"arrow-arith",
@@ -5138,8 +5138,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5150,13 +5150,12 @@ dependencies = [
"lance-core",
"num-traits",
"rand 0.9.4",
"rayon",
]
[[package]]
name = "lance-namespace"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow",
"async-trait",
@@ -5168,8 +5167,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5223,15 +5222,15 @@ dependencies = [
[[package]]
name = "lance-select"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-schema",
"byteorder",
"bytes",
"itertools 0.14.0",
"itertools 0.13.0",
"lance-core",
"roaring",
"tracing",
@@ -5239,8 +5238,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow",
"arrow-array",
@@ -5279,8 +5278,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5293,21 +5292,20 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "9.0.0-beta.8"
source = "git+https://github.com/lance-format/lance.git?tag=v9.0.0-beta.8#71c4aa2174971e98acb7e256fde1e1589024f5bc"
version = "8.0.0-beta.18"
source = "git+https://github.com/lance-format/lance.git?tag=v8.0.0-beta.18#909dea18b1de21a84f7574fab8335bab02dc48b8"
dependencies = [
"icu_segmenter",
"jieba-rs",
"lindera",
"rust-stemmers",
"serde",
"stop-words",
"unicode-normalization",
]
[[package]]
name = "lancedb"
version = "0.31.0-beta.3"
version = "0.30.1-beta.2"
dependencies = [
"ahash",
"anyhow",
@@ -5384,14 +5382,13 @@ dependencies = [
"tokenizers",
"tokio",
"url",
"urlencoding",
"uuid",
"walkdir",
]
[[package]]
name = "lancedb-nodejs"
version = "0.31.0-beta.3"
version = "0.30.1-beta.2"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5416,7 +5413,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.34.0-beta.3"
version = "0.33.1-beta.2"
dependencies = [
"arrow",
"async-trait",
@@ -5959,9 +5956,9 @@ dependencies = [
[[package]]
name = "napi"
version = "3.9.3"
version = "3.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbd9f9295f3ff5921e78a71222c3361a8216f7760b1a99a6ad4e8441de18bbb9"
checksum = "ad513ff22558f1830b595ea6eb4091da48145d09a222ce157e781896f78be0b9"
dependencies = [
"bitflags 2.11.1",
"chrono",
@@ -9208,15 +9205,6 @@ version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51f1e89f093f99e7432c491c382b88a6860a5adbe6bf02574bf0a08efff1978"
[[package]]
name = "stop-words"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68df56303396bcfb639455b3c166804aeb7994005010aab5e9e8a1277b8871d"
dependencies = [
"serde_json",
]
[[package]]
name = "str_stack"
version = "0.1.1"

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=9.0.0-beta.8", default-features = false, "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=9.0.0-beta.8", default-features = false, "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=9.0.0-beta.8", default-features = false, "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=9.0.0-beta.8", "tag" = "v9.0.0-beta.8", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=8.0.0-beta.18", default-features = false, "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=8.0.0-beta.18", default-features = false, "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=8.0.0-beta.18", default-features = false, "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=8.0.0-beta.18", "tag" = "v8.0.0-beta.18", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.31.0-beta.3</version>
<version>0.30.1-beta.2</version>
</dependency>
```

View File

@@ -1,29 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / OAuthFlowType
# Enumeration: OAuthFlowType
OAuth authentication flow types.
## Enumeration Members
### AzureManagedIdentity
```ts
AzureManagedIdentity: "azure_managed_identity";
```
Azure Managed Identity via IMDS.
***
### ClientCredentials
```ts
ClientCredentials: "client_credentials";
```
Client Credentials grant (service-to-service / M2M).

View File

@@ -12,7 +12,6 @@
## Enumerations
- [FullTextQueryType](enumerations/FullTextQueryType.md)
- [OAuthFlowType](enumerations/OAuthFlowType.md)
- [Occur](enumerations/Occur.md)
- [Operator](enumerations/Operator.md)
@@ -86,8 +85,6 @@
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
- [MergeResult](interfaces/MergeResult.md)
- [NativeOAuthConfig](interfaces/NativeOAuthConfig.md)
- [OAuthConfig](interfaces/OAuthConfig.md)
- [OpenTableOptions](interfaces/OpenTableOptions.md)
- [OptimizeOptions](interfaces/OptimizeOptions.md)
- [OptimizeStats](interfaces/OptimizeStats.md)

View File

@@ -64,19 +64,6 @@ client used by manifest-enabled native connections.
***
### oauthConfig?
```ts
optional oauthConfig: NativeOAuthConfig;
```
(For LanceDB cloud only): OAuth configuration for IdP-based
authentication (e.g., Azure Entra ID). When set, token acquisition
and refresh are handled entirely in Rust. TypeScript users should pass
the public `OAuthConfig` type exported from `@lancedb/lancedb`.
***
### readConsistencyInterval?
```ts

View File

@@ -1,88 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / NativeOAuthConfig
# Interface: NativeOAuthConfig
OAuth configuration for LanceDB authentication.
This is the generated napi-rs binding shape. TypeScript users should prefer
the public `OAuthConfig` type exported from `@lancedb/lancedb`.
All token acquisition and refresh is handled in the Rust layer.
## Properties
### clientId
```ts
clientId: string;
```
Application / Client ID.
***
### clientSecret?
```ts
optional clientSecret: string;
```
Client secret (required for client_credentials).
***
### flow?
```ts
optional flow: string;
```
Authentication flow: "client_credentials" or "azure_managed_identity"
***
### issuerUrl
```ts
issuerUrl: string;
```
OIDC issuer URL or OAuth authority URL.
For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
***
### managedIdentityClientId?
```ts
optional managedIdentityClientId: string;
```
Client ID for user-assigned managed identity (azure_managed_identity).
***
### refreshBufferSecs?
```ts
optional refreshBufferSecs: number;
```
Seconds before expiry to trigger proactive refresh (default: 300).
Keep this well below the token TTL; if it is greater than or equal to
the TTL, each request refreshes the token.
***
### scopes
```ts
scopes: string[];
```
OAuth scopes to request. For Azure managed identity, exactly one scope
or resource is required. For example: `["api://{app_id}/.default"]`

View File

@@ -1,111 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / OAuthConfig
# Interface: OAuthConfig
OAuth configuration for LanceDB authentication.
This is the public TypeScript OAuth configuration type. The generated
`NativeOAuthConfig` type has the same runtime shape but is an implementation
detail of the napi-rs binding.
All token acquisition and refresh is handled in the Rust layer.
This config is passed through to Rust via napi-rs.
## Examples
```typescript
const config: OAuthConfig = {
issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
clientId: "app-id",
clientSecret: "secret",
scopes: ["api://lancedb-api/.default"],
};
```
```typescript
const config: OAuthConfig = {
issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
clientId: "app-id",
scopes: ["api://lancedb-api/.default"],
flow: OAuthFlowType.AzureManagedIdentity,
};
```
## Properties
### clientId
```ts
clientId: string;
```
Application / Client ID.
***
### clientSecret?
```ts
optional clientSecret: string;
```
Client secret (required for ClientCredentials).
***
### flow?
```ts
optional flow: OAuthFlowType;
```
Authentication flow (default: ClientCredentials).
***
### issuerUrl
```ts
issuerUrl: string;
```
OIDC issuer URL or OAuth authority URL.
For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
***
### managedIdentityClientId?
```ts
optional managedIdentityClientId: string;
```
Client ID for user-assigned managed identity (AzureManagedIdentity).
***
### refreshBufferSecs?
```ts
optional refreshBufferSecs: number;
```
Seconds before expiry to trigger proactive refresh (default: 300).
Keep this well below the token TTL; if it is greater than or equal to
the TTL, each request refreshes the token.
***
### scopes
```ts
scopes: string[];
```
OAuth scopes to request.
For Azure managed identity, exactly one scope or resource is required.
For example: `["api://{app_id}/.default"]`

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.31.0-beta.3</version>
<version>0.30.1-beta.2</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.31.0-beta.3</version>
<version>0.30.1-beta.2</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>9.0.0-beta.8</lance-core.version>
<lance-core.version>8.0.0-beta.18</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.31.0-beta.3"
version = "0.30.1-beta.2"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -52,7 +52,6 @@ export {
SplitHashOptions,
SplitSequentialOptions,
ShuffleOptions,
OAuthConfig as NativeOAuthConfig,
} from "./native.js";
export {
@@ -131,8 +130,6 @@ export {
TokenResponse,
} from "./header";
export { OAuthConfig, OAuthFlowType } from "./oauth";
export { MergeInsertBuilder, WriteExecutionOptions } from "./merge";
export * as embedding from "./embedding";

View File

@@ -1,76 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
/**
* OAuth authentication flow types.
*/
export enum OAuthFlowType {
/** Client Credentials grant (service-to-service / M2M). */
ClientCredentials = "client_credentials",
/** Azure Managed Identity via IMDS. */
AzureManagedIdentity = "azure_managed_identity",
}
/**
* OAuth configuration for LanceDB authentication.
*
* This is the public TypeScript OAuth configuration type. The generated
* `NativeOAuthConfig` type has the same runtime shape but is an implementation
* detail of the napi-rs binding.
*
* All token acquisition and refresh is handled in the Rust layer.
* This config is passed through to Rust via napi-rs.
*
* @example Client Credentials (service-to-service):
* ```typescript
* const config: OAuthConfig = {
* issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
* clientId: "app-id",
* clientSecret: "secret",
* scopes: ["api://lancedb-api/.default"],
* };
* ```
*
* @example Azure Managed Identity:
* ```typescript
* const config: OAuthConfig = {
* issuerUrl: "https://login.microsoftonline.com/{tenant}/v2.0",
* clientId: "app-id",
* scopes: ["api://lancedb-api/.default"],
* flow: OAuthFlowType.AzureManagedIdentity,
* };
* ```
*/
export interface OAuthConfig {
/**
* OIDC issuer URL or OAuth authority URL.
* For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
*/
issuerUrl: string;
/** Application / Client ID. */
clientId: string;
/**
* OAuth scopes to request.
* For Azure managed identity, exactly one scope or resource is required.
* For example: `["api://{app_id}/.default"]`
*/
scopes: string[];
/** Authentication flow (default: ClientCredentials). */
flow?: OAuthFlowType;
/** Client secret (required for ClientCredentials). */
clientSecret?: string;
/** Client ID for user-assigned managed identity (AzureManagedIdentity). */
managedIdentityClientId?: string;
/**
* Seconds before expiry to trigger proactive refresh (default: 300).
* Keep this well below the token TTL; if it is greater than or equal to
* the TTL, each request refreshes the token.
*/
refreshBufferSecs?: number;
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.31.0-beta.3",
"version": "0.30.1-beta.2",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -112,12 +112,6 @@ impl Connection {
builder = builder.client_config(rust_config);
if let Some(oauth_config) = options.oauth_config {
let config: lancedb::remote::oauth::OAuthConfig =
oauth_config.try_into().default_error()?;
builder = builder.oauth_config(config);
}
if let Some(api_key) = options.api_key {
builder = builder.api_key(&api_key);
}

View File

@@ -65,11 +65,6 @@ pub struct ConnectionOptions {
/// (For LanceDB cloud only): the host to use for LanceDB cloud. Used
/// for testing purposes.
pub host_override: Option<String>,
/// (For LanceDB cloud only): OAuth configuration for IdP-based
/// authentication (e.g., Azure Entra ID). When set, token acquisition
/// and refresh are handled entirely in Rust. TypeScript users should pass
/// the public `OAuthConfig` type exported from `@lancedb/lancedb`.
pub oauth_config: Option<remote::OAuthConfig>,
}
#[napi(object)]

View File

@@ -3,7 +3,6 @@
use std::collections::HashMap;
use lancedb::error::Error;
use napi_derive::*;
/// Timeout configuration for remote HTTP client.
@@ -141,84 +140,6 @@ impl From<TlsConfig> for lancedb::remote::TlsConfig {
}
}
/// OAuth configuration for LanceDB authentication.
///
/// This is the generated napi-rs binding shape. TypeScript users should prefer
/// the public `OAuthConfig` type exported from `@lancedb/lancedb`.
///
/// All token acquisition and refresh is handled in the Rust layer.
#[napi(object)]
#[derive(Clone)]
pub struct OAuthConfig {
/// OIDC issuer URL or OAuth authority URL.
/// For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
pub issuer_url: String,
/// Application / Client ID.
pub client_id: String,
/// OAuth scopes to request. For Azure managed identity, exactly one scope
/// or resource is required. For example: `["api://{app_id}/.default"]`
pub scopes: Vec<String>,
/// Authentication flow: "client_credentials" or "azure_managed_identity"
pub flow: Option<String>,
/// Client secret (required for client_credentials).
pub client_secret: Option<String>,
/// Client ID for user-assigned managed identity (azure_managed_identity).
pub managed_identity_client_id: Option<String>,
/// Seconds before expiry to trigger proactive refresh (default: 300).
/// Keep this well below the token TTL; if it is greater than or equal to
/// the TTL, each request refreshes the token.
pub refresh_buffer_secs: Option<u32>,
}
impl std::fmt::Debug for OAuthConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OAuthConfig")
.field("issuer_url", &self.issuer_url)
.field("client_id", &self.client_id)
.field("scopes", &self.scopes)
.field("flow", &self.flow)
.field(
"client_secret",
&self.client_secret.as_deref().map(|_| "<redacted>"),
)
.field(
"managed_identity_client_id",
&self.managed_identity_client_id,
)
.field("refresh_buffer_secs", &self.refresh_buffer_secs)
.finish()
}
}
impl TryFrom<OAuthConfig> for lancedb::remote::oauth::OAuthConfig {
type Error = Error;
fn try_from(config: OAuthConfig) -> Result<Self, Self::Error> {
use lancedb::remote::oauth::OAuthFlow;
let flow = match config.flow.as_deref().unwrap_or("client_credentials") {
"client_credentials" => OAuthFlow::ClientCredentials,
"azure_managed_identity" => OAuthFlow::AzureManagedIdentity {
client_id: config.managed_identity_client_id,
},
other => {
return Err(Error::InvalidInput {
message: format!("Unknown OAuth flow type: {other}"),
});
}
};
Ok(Self {
issuer_url: config.issuer_url,
client_id: config.client_id,
client_secret: config.client_secret,
scopes: config.scopes,
flow,
refresh_buffer_secs: config.refresh_buffer_secs.map(|v| v as u64),
})
}
}
impl From<ClientConfig> for lancedb::remote::ClientConfig {
fn from(config: ClientConfig) -> Self {
Self {
@@ -235,45 +156,3 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unknown_oauth_flow_returns_invalid_input() {
let config = OAuthConfig {
issuer_url: "https://issuer.example.com".to_string(),
client_id: "client-id".to_string(),
scopes: vec!["scope".to_string()],
flow: Some("typo".to_string()),
client_secret: None,
managed_identity_client_id: None,
refresh_buffer_secs: None,
};
let err = lancedb::remote::oauth::OAuthConfig::try_from(config).unwrap_err();
assert!(matches!(
err,
Error::InvalidInput { message }
if message == "Unknown OAuth flow type: typo"
));
}
#[test]
fn test_oauth_config_debug_redacts_client_secret() {
let config = OAuthConfig {
issuer_url: "https://issuer.example.com".to_string(),
client_id: "client-id".to_string(),
scopes: vec!["scope".to_string()],
flow: Some("client_credentials".to_string()),
client_secret: Some("super-secret".to_string()),
managed_identity_client_id: None,
refresh_buffer_secs: None,
};
let debug = format!("{config:?}");
assert!(!debug.contains("super-secret"));
assert!(debug.contains("client_secret: Some(\"<redacted>\")"));
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.34.0-beta.3"
current_version = "0.33.1-beta.2"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.
@@ -23,8 +23,6 @@ allow_dirty = true
commit = true
message = "Bump version: {current_version} → {new_version}"
commit_args = ""
# bump-my-version >=1.4.0 rejects pre_commit_hooks containing shell syntax unless opted in.
allow_shell_hooks = true
# Update Cargo.lock after version bump
pre_commit_hooks = [

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.34.0-beta.3"
version = "0.33.1-beta.2"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -81,7 +81,6 @@ class ColPaliEmbeddings(EmbeddingFunction):
warnings.warn(
"use_token_pooling is deprecated, use pooling_strategy=None instead",
DeprecationWarning,
stacklevel=2,
)
self.pooling_strategy = None

View File

@@ -71,9 +71,6 @@ from lancedb.embeddings import EmbeddingFunctionConfig
from ._lancedb import Session
_MAX_QUERY_K = 2**31 - 1
def _query_to_namespace_request(
table_id: List[str],
query: "Query",
@@ -151,8 +148,7 @@ def _query_to_namespace_request(
if query.limit is not None:
k = query.limit
elif query.vector is None and query.full_text_query is None:
# limit k to max i32 value to avoid client overflows
k = _MAX_QUERY_K
k = sys.maxsize
else:
k = 10
@@ -373,19 +369,6 @@ def _convert_pyarrow_schema_to_json(schema: pa.Schema) -> JsonArrowSchema:
return JsonArrowSchema(fields=fields, metadata=meta)
def _builds_namespace_natively(
namespace_client_impl: Optional[str],
namespace_client_properties: Optional[Dict[str, str]],
) -> bool:
"""Whether ``connect_namespace_client`` builds the namespace client natively
in Rust (installing the read-freshness context provider) rather than wrapping
the pre-built Python client.
Must mirror Rust ``build_namespace_natively`` in ``python/src/connection.rs``.
"""
return namespace_client_impl == "rest" and bool(namespace_client_properties)
class LanceNamespaceDBConnection(DBConnection):
"""
A LanceDB connection that uses a namespace for table management.
@@ -445,13 +428,6 @@ class LanceNamespaceDBConnection(DBConnection):
)
self._namespace_client_impl = namespace_client_impl
self._namespace_client_properties = namespace_client_properties
# When the namespace client is built natively (see Rust
# ``build_namespace_natively``), the underlying Rust table performs
# QueryTable pushdown through the read-freshness context provider, which
# the pure-Python ``query_table`` path bypasses.
self._route_pushdown_to_rust = _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
)
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
@@ -563,7 +539,6 @@ class LanceNamespaceDBConnection(DBConnection):
namespace_path=namespace_path,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
route_pushdown_to_rust=self._route_pushdown_to_rust,
_async=async_table,
)
@@ -601,7 +576,6 @@ class LanceNamespaceDBConnection(DBConnection):
namespace_path=namespace_path,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
route_pushdown_to_rust=self._route_pushdown_to_rust,
_async=async_table,
)
if branch is not None:
@@ -897,8 +871,6 @@ class AsyncLanceNamespaceDBConnection:
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
):
"""
Initialize an async namespace-based LanceDB connection.
@@ -924,12 +896,6 @@ class AsyncLanceNamespaceDBConnection:
namespace.create_table() instead of using declare_table + local write.
Default is None (no pushdown, all operations run locally).
namespace_client_impl : Optional[str]
The namespace implementation name used to create this connection.
Required (with ``namespace_client_properties``) for the Rust client to
be built natively and install the read-freshness provider.
namespace_client_properties : Optional[Dict[str, str]]
The namespace properties used to create this connection.
"""
self._namespace_client = namespace_client
self.read_consistency_interval = read_consistency_interval
@@ -938,14 +904,6 @@ class AsyncLanceNamespaceDBConnection:
self._namespace_client_pushdown_operations = set(
namespace_client_pushdown_operations or []
)
self._namespace_client_impl = namespace_client_impl
self._namespace_client_properties = namespace_client_properties
# See LanceNamespaceDBConnection: when built natively the Rust table runs
# QueryTable pushdown through the read-freshness provider, so defer to it
# rather than the urllib3 client (which omits x-lancedb-min-timestamp).
self._route_pushdown_to_rust = _builds_namespace_natively(
namespace_client_impl, namespace_client_properties
)
self._inner = AsyncConnection(
_connect_namespace_client(
namespace_client,
@@ -959,8 +917,8 @@ class AsyncLanceNamespaceDBConnection:
namespace_client_pushdown_operations=(
list(self._namespace_client_pushdown_operations)
),
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
namespace_client_impl=None,
namespace_client_properties=None,
)
)
@@ -1030,7 +988,6 @@ class AsyncLanceNamespaceDBConnection:
namespace_path=namespace_path,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
route_pushdown_to_rust=self._route_pushdown_to_rust,
)
async def open_table(
@@ -1068,7 +1025,6 @@ class AsyncLanceNamespaceDBConnection:
namespace_path=namespace_path,
namespace_client=self._namespace_client,
pushdown_operations=self._namespace_client_pushdown_operations,
route_pushdown_to_rust=self._route_pushdown_to_rust,
)
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
@@ -1427,6 +1383,4 @@ def connect_namespace_async(
storage_options=storage_options,
session=session,
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
namespace_client_impl=namespace_client_impl,
namespace_client_properties=namespace_client_properties,
)

View File

@@ -124,7 +124,6 @@ class RemoteDBConnection(DBConnection):
"request_thread_pool is no longer used and will be removed in "
"a future release.",
DeprecationWarning,
stacklevel=2,
)
if connection_timeout is not None:
@@ -133,7 +132,6 @@ class RemoteDBConnection(DBConnection):
"release. Please use client_config.timeout_config.connect_timeout "
"instead.",
DeprecationWarning,
stacklevel=2,
)
client_config.timeout_config.connect_timeout = timedelta(
seconds=connection_timeout
@@ -144,7 +142,6 @@ class RemoteDBConnection(DBConnection):
"read_timeout is deprecated and will be removed in a future release. "
"Please use client_config.timeout_config.read_timeout instead.",
DeprecationWarning,
stacklevel=2,
)
client_config.timeout_config.read_timeout = timedelta(seconds=read_timeout)

View File

@@ -845,8 +845,7 @@ class RemoteTable(Table):
"""
warnings.warn(
"cleanup_old_versions() is a no-op on LanceDB Cloud. "
"Tables are automatically cleaned up and optimized.",
stacklevel=2,
"Tables are automatically cleaned up and optimized."
)
pass
@@ -858,8 +857,7 @@ class RemoteTable(Table):
"""
warnings.warn(
"compact_files() is a no-op on LanceDB Cloud. "
"Tables are automatically compacted and optimized.",
stacklevel=2,
"Tables are automatically compacted and optimized."
)
pass
@@ -876,8 +874,7 @@ class RemoteTable(Table):
"""
warnings.warn(
"optimize() is a no-op on LanceDB Cloud. "
"Indices are optimized automatically.",
stacklevel=2,
"Indices are optimized automatically."
)
pass

View File

@@ -2022,7 +2022,6 @@ class LanceTable(Table):
namespace_client: Optional[Any] = None,
managed_versioning: Optional[bool] = None,
pushdown_operations: Optional[set] = None,
route_pushdown_to_rust: bool = False,
_async: AsyncTable = None,
):
if namespace_path is None:
@@ -2032,14 +2031,6 @@ class LanceTable(Table):
self._location = location # Store location for use in _dataset_path
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
# When the connection built the namespace client natively (e.g. an
# enterprise "rest" connection), the underlying Rust table already
# executes QueryTable pushdown itself -- and, unlike this Python urllib3
# path, it routes through the read-freshness context provider that emits
# the ``x-lancedb-min-timestamp`` header. So we must defer pushdown to
# Rust instead of calling the Python ``namespace_client.query_table``
# directly, or reads silently bypass read-freshness (stale results).
self._route_pushdown_to_rust = route_pushdown_to_rust
if _async is not None:
self._table = _async
else:
@@ -2250,7 +2241,6 @@ class LanceTable(Table):
namespace_path=self._namespace_path,
namespace_client=self._namespace_client,
pushdown_operations=self._pushdown_operations,
route_pushdown_to_rust=self._route_pushdown_to_rust,
location=self._location,
_async=async_table,
)
@@ -2401,11 +2391,8 @@ class LanceTable(Table):
Returns
-------
pa.Table"""
if (
_should_push_down_query_table(
self._namespace_client, self._pushdown_operations
)
and not self._route_pushdown_to_rust
if _should_push_down_query_table(
self._namespace_client, self._pushdown_operations
):
return self._execute_query(Query()).read_all()
@@ -3357,7 +3344,6 @@ class LanceTable(Table):
location: Optional[str] = None,
namespace_client: Optional[Any] = None,
pushdown_operations: Optional[set] = None,
route_pushdown_to_rust: bool = False,
):
"""
Create a new table.
@@ -3420,24 +3406,21 @@ class LanceTable(Table):
self._location = location
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
self._route_pushdown_to_rust = route_pushdown_to_rust
if data_storage_version is not None:
warnings.warn(
"setting data_storage_version directly on create_table is deprecated. "
"setting data_storage_version directly on create_table is deprecated. ",
"Use database_options instead.",
DeprecationWarning,
stacklevel=2,
)
if storage_options is None:
storage_options = {}
storage_options["new_table_data_storage_version"] = data_storage_version
if enable_v2_manifest_paths is not None:
warnings.warn(
"setting enable_v2_manifest_paths directly on create_table is "
"setting enable_v2_manifest_paths directly on create_table is ",
"deprecated. Use database_options instead.",
DeprecationWarning,
stacklevel=2,
)
if storage_options is None:
storage_options = {}
@@ -3534,7 +3517,6 @@ class LanceTable(Table):
_should_push_down_query_table(
self._namespace_client, self._pushdown_operations
)
and not self._route_pushdown_to_rust
and self.current_branch() is None
):
from lancedb.namespace import _execute_server_side_query
@@ -4276,7 +4258,6 @@ class AsyncTable:
namespace_path: Optional[List[str]] = None,
namespace_client: Optional[Any] = None,
pushdown_operations: Optional[set] = None,
route_pushdown_to_rust: bool = False,
):
"""Create a new AsyncTable object.
@@ -4289,9 +4270,6 @@ class AsyncTable:
self._namespace_path = namespace_path or []
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
# See LanceTable.__init__: defer QueryTable pushdown to Rust (which emits
# the read-freshness header) for natively-built namespace clients.
self._route_pushdown_to_rust = route_pushdown_to_rust
def _set_namespace_context(
self,
@@ -4299,12 +4277,10 @@ class AsyncTable:
namespace_path: Optional[List[str]] = None,
namespace_client: Optional[Any] = None,
pushdown_operations: Optional[set] = None,
route_pushdown_to_rust: bool = False,
) -> "AsyncTable":
self._namespace_path = namespace_path or []
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
self._route_pushdown_to_rust = route_pushdown_to_rust
return self
def __repr__(self):
@@ -4514,11 +4490,8 @@ class AsyncTable:
-------
pa.Table
"""
if (
_should_push_down_query_table(
self._namespace_client, self._pushdown_operations
)
and not self._route_pushdown_to_rust
if _should_push_down_query_table(
self._namespace_client, self._pushdown_operations
):
return (await self._execute_query(Query())).read_all()
@@ -5202,11 +5175,8 @@ class AsyncTable:
batch_size: Optional[int] = None,
timeout: Optional[timedelta] = None,
) -> pa.RecordBatchReader:
if (
_should_push_down_query_table(
self._namespace_client, self._pushdown_operations
)
and not self._route_pushdown_to_rust
if _should_push_down_query_table(
self._namespace_client, self._pushdown_operations
):
from lancedb.namespace import _execute_server_side_query
@@ -5692,7 +5662,6 @@ class AsyncTable:
"The 'retrain' parameter is deprecated and will be removed in a "
"future version.",
DeprecationWarning,
stacklevel=2,
)
return await self._inner.optimize(

View File

@@ -91,9 +91,7 @@ async def test_create_scalar_index(some_table: AsyncTable):
# Can recreate if replace=True
await some_table.create_index("id", replace=True)
indices = await some_table.list_indices()
assert str(indices).startswith(
'[IndexConfig(name="id_idx", index_type="BTree", columns=["id"]'
)
assert str(indices) == '[Index(BTree, columns=["id"], name="id_idx")]'
assert len(indices) == 1
assert indices[0].index_type == "BTree"
assert indices[0].columns == ["id"]
@@ -108,27 +106,6 @@ async def test_create_scalar_index(some_table: AsyncTable):
assert len(indices) == 0
@pytest.mark.asyncio
async def test_index_config_repr(db_async):
# Use >= 1000 rows so the thousands separator in the repr is exercised.
nrows = 1500
table = await db_async.create_table(
"repr_table", pa.Table.from_pydict({"id": list(range(nrows))})
)
await table.create_index("id", config=BTree())
indices = await table.list_indices()
assert len(indices) == 1
r = repr(indices[0])
assert r.startswith('IndexConfig(name="id_idx", index_type="BTree", columns=["id"]')
# Integer counts use `_` thousands separators (valid Python int syntax).
assert "num_indexed_rows=1_500" in r
assert "num_unindexed_rows=0" in r
# created_at renders as a datetime so the value round-trips.
assert "created_at=datetime.datetime(" in r
assert r.endswith(")")
@pytest.mark.asyncio
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
metadata_type = pa.struct(
@@ -221,9 +198,7 @@ async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
await some_table.create_index("fsb", config=BTree())
indices = await some_table.list_indices()
assert str(indices).startswith(
'[IndexConfig(name="fsb_idx", index_type="BTree", columns=["fsb"]'
)
assert str(indices) == '[Index(BTree, columns=["fsb"], name="fsb_idx")]'
assert len(indices) == 1
assert indices[0].index_type == "BTree"
assert indices[0].columns == ["fsb"]
@@ -272,9 +247,7 @@ async def test_create_bitmap_index(some_table: AsyncTable):
async def test_create_label_list_index(some_table: AsyncTable):
await some_table.create_index("tags", config=LabelList())
indices = await some_table.list_indices()
assert str(indices).startswith(
'[IndexConfig(name="tags_idx", index_type="LabelList", columns=["tags"]'
)
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
plan = await some_table.query().where("array_has(tags, 'tag0')").explain_plan()
assert "ScalarIndexQuery" in plan
@@ -289,9 +262,7 @@ async def test_create_large_list_label_list_index(db_async):
await table.create_index("tags", config=LabelList())
indices = await table.list_indices()
assert str(indices).startswith(
'[IndexConfig(name="tags_idx", index_type="LabelList", columns=["tags"]'
)
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
plan = await table.query().where("array_has(tags, 'shared')").explain_plan()
assert "ScalarIndexQuery" in plan
@@ -328,9 +299,7 @@ async def test_create_label_list_index_rejects_list_struct(db_async):
async def test_full_text_search_index(some_table: AsyncTable):
await some_table.create_index("tags", config=FTS(with_position=False))
indices = await some_table.list_indices()
assert str(indices).startswith(
'[IndexConfig(name="tags_idx", index_type="FTS", columns=["tags"]'
)
assert str(indices) == '[Index(FTS, columns=["tags"], name="tags_idx")]'
await some_table.prewarm_index("tags_idx")

View File

@@ -5,11 +5,11 @@
import tempfile
import shutil
import sys
import pytest
import pyarrow as pa
import lancedb
from lance_namespace.errors import NamespaceNotEmptyError, TableNotFoundError
from lancedb.namespace import _MAX_QUERY_K
from lancedb.table import AsyncTable, LanceTable
@@ -65,9 +65,6 @@ def _namespace_lance_table(namespace_client: _NamespaceClient) -> LanceTable:
table._namespace_path = ["geneva"]
table._namespace_client = namespace_client
table._pushdown_operations = {"QueryTable"}
# This test exercises the Python-side pushdown path (non-native client), so
# pushdown is not routed to Rust.
table._route_pushdown_to_rust = False
return table
@@ -808,37 +805,6 @@ class TestPushdownOperations:
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
assert len(db._namespace_client_pushdown_operations) == 0
def test_route_pushdown_to_rust_for_native_rest(self):
"""A natively-built rest connection must defer QueryTable pushdown to
Rust so reads carry the x-lancedb-min-timestamp read-freshness header."""
db = lancedb.connect_namespace(
"rest",
{"uri": "http://localhost:12345"},
namespace_client_pushdown_operations=["QueryTable"],
)
assert db._route_pushdown_to_rust is True
def test_route_pushdown_to_rust_false_for_dir(self):
"""A non-native (dir) connection keeps the Python pushdown path."""
db = lancedb.connect_namespace("dir", {"root": self.temp_dir})
assert db._route_pushdown_to_rust is False
def test_async_route_pushdown_to_rust_for_native_rest(self):
"""The async connection must not silently bypass the read-freshness fix:
a natively-built rest connection defers pushdown to Rust (regression test
for the async path omitting the freshness header)."""
db = lancedb.connect_namespace_async(
"rest",
{"uri": "http://localhost:12345"},
namespace_client_pushdown_operations=["QueryTable"],
)
assert db._route_pushdown_to_rust is True
def test_async_route_pushdown_to_rust_false_for_dir(self):
"""The async non-native (dir) connection keeps the Python pushdown path."""
db = lancedb.connect_namespace_async("dir", {"root": self.temp_dir})
assert db._route_pushdown_to_rust is False
def test_lance_table_to_arrow_uses_query_pushdown(self):
namespace_client = _NamespaceClient()
table = _namespace_lance_table(namespace_client)
@@ -850,13 +816,10 @@ class TestPushdownOperations:
["geneva", "hist"],
["geneva", "hist"],
]
# Unlimited reads cap k at i32::MAX (the namespace query_table `k`
# field is i32); sys.maxsize would overflow the Rust binding.
assert [request.k for request in namespace_client.requests] == [
_MAX_QUERY_K,
_MAX_QUERY_K,
sys.maxsize,
sys.maxsize,
]
assert all(r.k <= 2**31 - 1 for r in namespace_client.requests)
@pytest.mark.asyncio
@@ -911,13 +874,10 @@ class TestAsyncPushdownOperations:
["geneva", "hist"],
["geneva", "hist"],
]
# Unlimited reads cap k at i32::MAX (the namespace query_table `k`
# field is i32); sys.maxsize would overflow the Rust binding.
assert [request.k for request in namespace_client.requests] == [
_MAX_QUERY_K,
_MAX_QUERY_K,
sys.maxsize,
sys.maxsize,
]
assert all(r.k <= 2**31 - 1 for r in namespace_client.requests)
def test_local_table_to_arrow_and_to_pandas_are_unchanged(tmp_path):

View File

@@ -610,38 +610,24 @@ pub fn connect_namespace_client(
namespace_client_impl: Option<String>,
namespace_client_properties: Option<HashMap<String, String>>,
) -> PyResult<Connection> {
let namespace_client = extract_namespace_arc(py, namespace_client)?;
let read_consistency_interval = read_consistency_interval.map(Duration::from_secs_f64);
let namespace_client_pushdown_operations =
parse_namespace_client_pushdown_operations(namespace_client_pushdown_operations)?;
let ns_impl = namespace_client_impl.unwrap_or_else(|| "python".to_string());
let ns_properties = namespace_client_properties.unwrap_or_default();
let storage_options = storage_options.unwrap_or_default();
let session = session.map(|s| s.inner.clone());
// Prefer building the namespace natively from (impl, properties) so the
// read-freshness provider installed
let database = if build_namespace_natively(namespace_client_impl.as_deref(), &ns_properties) {
let ns_impl = namespace_client_impl.expect("impl present per build_namespace_natively");
crate::runtime::block_on(LanceNamespaceDatabase::connect(
&ns_impl,
ns_properties,
storage_options,
read_consistency_interval,
session,
namespace_client_pushdown_operations,
))
.infer_error()?
} else {
let namespace_client = extract_namespace_arc(py, namespace_client)?;
LanceNamespaceDatabase::from_namespace_client(
namespace_client,
namespace_client_impl.unwrap_or_else(|| "python".to_string()),
ns_properties,
storage_options,
read_consistency_interval,
session,
namespace_client_pushdown_operations,
)
};
let database = LanceNamespaceDatabase::from_namespace_client(
namespace_client,
ns_impl,
ns_properties,
storage_options,
read_consistency_interval,
session,
namespace_client_pushdown_operations,
);
Ok(Connection::new(LanceConnection::new(
Arc::new(database),
@@ -649,16 +635,6 @@ pub fn connect_namespace_client(
)))
}
/// Whether to build the namespace natively (from impl + properties) instead of
/// wrapping a pre-built client. Native construction is required for the
/// read-freshness provider to be installed
fn build_namespace_natively(
namespace_client_impl: Option<&str>,
namespace_client_properties: &HashMap<String, String>,
) -> bool {
matches!(namespace_client_impl, Some("rest")) && !namespace_client_properties.is_empty()
}
#[derive(FromPyObject)]
pub struct PyClientConfig {
user_agent: String,
@@ -757,36 +733,3 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn props(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
#[test]
fn native_build_only_for_rest_with_properties() {
let rest = props(&[("uri", "http://localhost:10024")]);
// rest + non-empty properties -> build natively (installs the
// read-freshness provider so checkout_latest() busts the server cache).
assert!(build_namespace_natively(Some("rest"), &rest));
// dir is local (no server cache) -> wrap the pre-built client unchanged.
assert!(!build_namespace_natively(
Some("dir"),
&props(&[("root", "/tmp")])
));
// No impl: only a pre-built client was handed in -> wrap it as-is.
assert!(!build_namespace_natively(None, &rest));
// rest but no properties: nothing to build a connection from -> wrap.
assert!(!build_namespace_natively(Some("rest"), &HashMap::new()));
}
}

View File

@@ -319,53 +319,11 @@ pub struct IndexConfig {
#[pymethods]
impl IndexConfig {
pub fn __repr__(&self, py: Python<'_>) -> String {
let mut fields = vec![
format!("name={:?}", self.name),
format!("index_type={:?}", self.index_type),
format!("columns={:?}", self.columns),
];
if let Some(v) = &self.index_uuid {
fields.push(format!("index_uuid={:?}", v));
}
if let Some(v) = &self.type_url {
fields.push(format!("type_url={:?}", v));
}
if let Some(v) = self.created_at {
// Render the datetime's own Python repr so the value round-trips,
// falling back to RFC 3339 if the conversion ever fails.
let rendered = v
.into_pyobject(py)
.ok()
.and_then(|obj| obj.into_any().repr().ok())
.map(|r| r.to_string())
.unwrap_or_else(|| v.to_rfc3339());
fields.push(format!("created_at={}", rendered));
}
if let Some(v) = self.num_indexed_rows {
fields.push(format!("num_indexed_rows={}", fmt_thousands(v)));
}
if let Some(v) = self.num_unindexed_rows {
fields.push(format!("num_unindexed_rows={}", fmt_thousands(v)));
}
if let Some(v) = self.size_bytes {
fields.push(format!("size_bytes={}", fmt_thousands(v)));
}
if let Some(v) = self.num_segments {
fields.push(format!("num_segments={}", v));
}
if let Some(v) = self.index_version {
fields.push(format!("index_version={}", v));
}
if let Some(v) = &self.index_details {
let details = v
.bind(py)
.repr()
.map(|r| r.to_string())
.unwrap_or_else(|_| "<unavailable>".to_string());
fields.push(format!("index_details={}", details));
}
format!("IndexConfig({})", fields.join(", "))
pub fn __repr__(&self) -> String {
format!(
"Index({}, columns={:?}, name=\"{}\")",
self.index_type, self.columns, self.name
)
}
// For backwards-compatibility with the old sync SDK, we also support getting
@@ -394,23 +352,6 @@ impl IndexConfig {
}
}
/// Format an integer with `_` thousands separators, e.g. `24_500_213`.
///
/// Underscores are valid Python int-literal syntax, so the repr stays
/// copy-pasteable and machine-parseable while remaining readable.
fn fmt_thousands(n: u64) -> String {
let digits = n.to_string();
let bytes = digits.as_bytes();
let mut out = String::with_capacity(digits.len() + digits.len() / 3);
for (i, b) in bytes.iter().enumerate() {
if i > 0 && (bytes.len() - i).is_multiple_of(3) {
out.push('_');
}
out.push(*b as char);
}
out
}
fn parse_index_details(py: Python<'_>, s: String) -> Py<PyAny> {
let json = py.import("json").expect("json module is always available");
match json.call_method1("loads", (s.as_str(),)) {

View File

@@ -56,15 +56,6 @@ fn get_runtime() -> &'static runtime::Runtime {
unsafe { &*new_ptr }
}
/// Block the current thread on a future using the shared runtime.
///
/// For sync `#[pyfunction]`s that need to drive an async operation (e.g.
/// building a namespace client). Must not be called from within the runtime's
/// own worker threads.
pub fn block_on<F: std::future::Future>(fut: F) -> F::Output {
get_runtime().block_on(fut)
}
/// Runs in async-signal context after `fork()` in the child. We can only
/// touch atomics here; we deliberately leak the previous runtime because
/// dropping a tokio `Runtime` would try to join its (now-dead) worker

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.31.0-beta.3"
version = "0.30.1-beta.2"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -50,7 +50,7 @@ lance-namespace = { workspace = true }
lance-namespace-impls = { workspace = true }
moka = { workspace = true }
pin-project = { workspace = true }
tokio = { version = "1.23", features = ["rt-multi-thread", "sync"] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
log.workspace = true
async-trait = "0"
bytes = "1"
@@ -75,7 +75,6 @@ reqwest = { version = "0.12.0", default-features = false, features = [
"stream",
], optional = true }
http = { version = "1", optional = true } # Matching what is in reqwest
urlencoding = { version = "2", optional = true }
uuid = { version = "1.7.0", features = ["v4", "v5"] }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
@@ -94,7 +93,6 @@ semver = { workspace = true }
anyhow = "1"
tempfile = "3.5.0"
random_word = { version = "0.4.3", features = ["en"] }
tokio = { version = "1.23", features = ["io-util", "macros", "net", "rt-multi-thread", "sync"] }
uuid = { version = "1.7.0", features = ["v4"] }
walkdir = "2"
aws-sdk-dynamodb = { version = "1.55.0" }
@@ -131,13 +129,7 @@ huggingface = [
"lance-namespace-impls/dir-huggingface",
]
dynamodb = ["lance/dynamodb", "aws"]
remote = [
"dep:reqwest",
"dep:http",
"dep:urlencoding",
"lance-namespace-impls/rest",
"lance-namespace-impls/rest-adapter",
]
remote = ["dep:reqwest", "dep:http", "lance-namespace-impls/rest", "lance-namespace-impls/rest-adapter"]
fp16kernels = ["lance-linalg/fp16kernels"]
s3-test = []
bedrock = ["dep:aws-sdk-bedrockruntime"]

View File

@@ -1,435 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Lance blob v2 columns store large binary payloads out of line.
//!
//! Declare a column with [`blob`]. On write, [`crate::table::Table::add`] coerces
//! raw `Binary` / `LargeBinary` into the blob struct layout. Queries return
//! small descriptors, not bytes.
//!
//! Blob tables require Lance file format >= 2.2 and stable row ids at create.
use std::sync::Arc;
use arrow_array::builder::LargeBinaryBuilder;
use arrow_array::{Array, LargeBinaryArray, RecordBatch, StructArray, UInt8Array, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use lance::dataset::{Dataset, WriteParams};
use lance_arrow::FieldExt;
use lance_core::datatypes::parse_field_path;
use lance_encoding::version::LanceFileVersion;
use crate::error::{Error, Result};
pub use lance::dataset::BlobFile;
/// Creates an Arrow field for a Lance blob v2 column.
///
/// `Struct<data, uri>` with the `lance.blob.v2` marker. Same layout Lance
/// expects on write.
///
/// A blob column may be top-level or nested inside a struct or list. Nested
/// blobs are addressed by a dotted path (e.g. `info.blob`) in the read APIs.
///
/// ```
/// use arrow_schema::{DataType, Field, Schema};
///
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int64, false),
/// lancedb::blob("image", true),
/// ]);
/// ```
pub fn blob(name: impl AsRef<str>, nullable: bool) -> Field {
lance::blob::blob_field(name.as_ref(), nullable)
}
/// Returns true if `field` is a blob v2 column.
///
/// ```
/// let field = lancedb::blob("image", true);
/// assert!(lancedb::blob::is_blob(&field));
/// ```
pub fn is_blob(field: &Field) -> bool {
field.is_blob_v2()
}
/// Returns true if `field`, or any field nested under it, is a blob v2 column.
fn field_tree_has_blob_v2(field: &Field) -> bool {
if field.is_blob_v2() {
return true;
}
match field.data_type() {
DataType::Struct(children) => children.iter().any(|c| field_tree_has_blob_v2(c)),
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
field_tree_has_blob_v2(child)
}
_ => false,
}
}
/// Collects the dotted paths of blob v2 columns under `field`, into `paths`.
fn collect_blob_paths(field: &Field, prefix: &str, paths: &mut Vec<String>) {
let path = if prefix.is_empty() {
field.name().clone()
} else {
format!("{prefix}.{}", field.name())
};
if field.is_blob_v2() {
paths.push(path);
return;
}
match field.data_type() {
DataType::Struct(children) => {
for child in children {
collect_blob_paths(child, &path, paths);
}
}
DataType::List(child) | DataType::LargeList(child) | DataType::FixedSizeList(child, _) => {
collect_blob_paths(child, &path, paths)
}
_ => {}
}
}
/// Returns true if `schema` declares any blob v2 column, including nested ones.
pub(crate) fn has_blob_columns(schema: &Schema) -> bool {
schema.fields().iter().any(|f| field_tree_has_blob_v2(f))
}
/// Blob v2 column paths in `schema`, declaration order preserved. Nested blobs
/// are dotted paths (e.g. `info.blob`).
pub(crate) fn blob_column_names(schema: &Schema) -> Vec<String> {
let mut paths = Vec::new();
for field in schema.fields() {
collect_blob_paths(field, "", &mut paths);
}
paths
}
/// Bumps storage format to at least [`LanceFileVersion::V2_2`] for blob schemas.
pub(crate) fn ensure_blob_storage_version(schema: &Schema, params: &mut WriteParams) {
if !has_blob_columns(schema) {
return;
}
let resolved = params
.data_storage_version
.unwrap_or(LanceFileVersion::Stable)
.resolve();
if resolved < LanceFileVersion::V2_2 {
params.data_storage_version = Some(LanceFileVersion::V2_2);
}
}
/// Validate that `column` exists and is a blob v2 column.
///
/// Legacy v1 columns (`lance-encoding:blob`) error with a migration hint.
pub(crate) fn ensure_blob_v2_column(
schema: &lance_core::datatypes::Schema,
column: &str,
) -> Result<()> {
match schema.field(column) {
Some(field) if field.is_blob_v2() => Ok(()),
Some(field) if field.is_blob() => Err(Error::InvalidInput {
message: format!(
"column '{column}' is a legacy blob column; blob APIs require blob v2 columns \
(ARROW:extension:name = \"lance.blob.v2\")"
),
}),
Some(_) => Err(Error::InvalidInput {
message: format!("column '{column}' is not a blob column"),
}),
None => Err(Error::InvalidInput {
message: format!("no column named '{column}' in this table"),
}),
}
}
/// Returns the leaf descriptor `StructArray` for `column` in a descriptor batch.
fn leaf_descriptor_struct<'a>(batch: &'a RecordBatch, column: &str) -> Result<&'a StructArray> {
let path = parse_field_path(column).map_err(|e| Error::InvalidInput {
message: format!("invalid blob column path '{column}': {e}"),
})?;
let not_struct = || Error::Runtime {
message: format!("blob column '{column}' did not read back as a descriptor struct"),
};
let mut current = batch
.column_by_name(&path[0])
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.ok_or_else(not_struct)?;
for segment in &path[1..] {
current = current
.column_by_name(segment)
.and_then(|c| c.as_any().downcast_ref::<StructArray>())
.ok_or_else(not_struct)?;
}
Ok(current)
}
/// Null rows in `row_ids`, from a descriptor take.
///
/// Lance `read_blobs` / `take_blobs` skip null rows (`kind == 0 && position == 0 && size == 0`).
/// TODO(lance): aligned read API would drop this pass.
async fn blob_null_mask(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<Vec<bool>> {
let projection = dataset.schema().project(&[column])?;
let descriptors = dataset.take_builder(row_ids, projection)?.execute().await?;
if descriptors.num_rows() != row_ids.len() {
return Err(Error::InvalidInput {
message: format!(
"blob take for column '{column}' requested {} row ids but only {} exist in the \
table; pass row ids collected from this table",
row_ids.len(),
descriptors.num_rows()
),
});
}
let descriptor_struct = leaf_descriptor_struct(&descriptors, column)?;
let child = |name: &str| {
descriptor_struct
.column_by_name(name)
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor for '{column}' is missing the '{name}' field"),
})
};
let kinds = child("kind")?
.as_any()
.downcast_ref::<UInt8Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'kind' for '{column}' is not a UInt8 array"),
})?;
let positions = child("position")?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'position' for '{column}' is not a UInt64 array"),
})?;
let sizes = child("size")?
.as_any()
.downcast_ref::<UInt64Array>()
.ok_or_else(|| Error::Runtime {
message: format!("blob descriptor 'size' for '{column}' is not a UInt64 array"),
})?;
// Match Lance `collect_blob_entries_v2` skip condition (`BlobKind::Inline` == 0).
Ok((0..descriptor_struct.len())
.map(|i| {
descriptor_struct.is_null(i)
|| kinds.is_null(i)
|| (kinds.value(i) == 0 && positions.value(i) == 0 && sizes.value(i) == 0)
})
.collect())
}
fn non_null_row_ids(row_ids: &[u64], null_mask: &[bool]) -> Vec<u64> {
row_ids
.iter()
.zip(null_mask)
.filter_map(|(row_id, is_null)| (!is_null).then_some(*row_id))
.collect()
}
/// Materialize blob bytes for `row_ids` (same length and order, nulls preserved).
pub(crate) async fn take_blobs_aligned(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<LargeBinaryArray> {
ensure_blob_v2_column(dataset.schema(), column)?;
if row_ids.is_empty() {
return Ok(LargeBinaryBuilder::new().finish());
}
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
let non_null_count = non_null_row_ids.len();
let payloads = if non_null_count == 0 {
Vec::new()
} else {
dataset
.read_blobs(column)?
.with_row_ids(non_null_row_ids)
.preserve_order(true)
.execute()
.await?
};
if payloads.len() != non_null_count {
return Err(Error::Runtime {
message: format!(
"blob read for column '{column}' returned {} payloads for {} non-null rows",
payloads.len(),
non_null_count
),
});
}
let mut builder = LargeBinaryBuilder::new();
let mut payload_idx = 0;
for is_null in &null_mask {
if *is_null {
builder.append_null();
} else {
builder.append_value(payloads[payload_idx].data.as_ref());
payload_idx += 1;
}
}
Ok(builder.finish())
}
/// Open lazy [`BlobFile`] handles for `row_ids` (same length and order, nulls as `None`).
pub(crate) async fn take_blob_files_aligned(
dataset: &Arc<Dataset>,
column: &str,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
ensure_blob_v2_column(dataset.schema(), column)?;
if row_ids.is_empty() {
return Ok(Vec::new());
}
let null_mask = blob_null_mask(dataset, column, row_ids).await?;
let non_null_row_ids = non_null_row_ids(row_ids, &null_mask);
let handles = if non_null_row_ids.is_empty() {
Vec::new()
} else {
dataset.take_blobs(&non_null_row_ids, column).await?
};
if handles.len() != non_null_row_ids.len() {
return Err(Error::Runtime {
message: format!(
"blob take for column '{column}' returned {} handles for {} non-null rows",
handles.len(),
non_null_row_ids.len()
),
});
}
let mut handles = handles.into_iter();
Ok(null_mask
.iter()
.map(|is_null| {
if *is_null {
None
} else {
Some(handles.next().unwrap())
}
})
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::DataType;
use lance_arrow::ARROW_EXT_NAME_KEY;
fn blob_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
])
}
#[test]
fn blob_field_carries_v2_extension_marker() {
let field = blob("image", true);
assert_eq!(
field.metadata().get(ARROW_EXT_NAME_KEY).map(String::as_str),
Some("lance.blob.v2")
);
assert!(matches!(field.data_type(), DataType::Struct(_)));
}
#[test]
fn has_blob_columns_detects_blob_fields() {
assert!(has_blob_columns(&blob_schema()));
let plain = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
assert!(!has_blob_columns(&plain));
}
#[test]
fn storage_version_bumps_to_v2_2() {
let mut params = WriteParams::default();
ensure_blob_storage_version(&blob_schema(), &mut params);
assert_eq!(
params.data_storage_version.unwrap().resolve(),
LanceFileVersion::V2_2
);
}
#[test]
fn storage_version_overrides_lower_explicit_version() {
let mut params = WriteParams {
data_storage_version: Some(LanceFileVersion::V2_0),
..Default::default()
};
ensure_blob_storage_version(&blob_schema(), &mut params);
assert_eq!(
params.data_storage_version.unwrap().resolve(),
LanceFileVersion::V2_2
);
}
#[test]
fn storage_version_keeps_higher_explicit_version() {
let mut params = WriteParams {
data_storage_version: Some(LanceFileVersion::V2_3),
..Default::default()
};
ensure_blob_storage_version(&blob_schema(), &mut params);
assert_eq!(params.data_storage_version.unwrap(), LanceFileVersion::V2_3);
}
#[test]
fn legacy_v1_blob_column_is_rejected_with_migration_hint() {
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([(
"lance-encoding:blob".to_string(),
"true".to_string(),
)]),
);
let arrow_schema = Schema::new(vec![legacy]);
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
let err = ensure_blob_v2_column(&lance_schema, "image").unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
assert!(err.to_string().contains("legacy blob column"));
assert!(err.to_string().contains("lance.blob.v2"));
}
#[test]
fn non_blob_and_unknown_columns_are_rejected_by_name() {
let arrow_schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let lance_schema = lance_core::datatypes::Schema::try_from(&arrow_schema).unwrap();
let err = ensure_blob_v2_column(&lance_schema, "id").unwrap_err();
assert!(err.to_string().contains("'id' is not a blob column"));
let err = ensure_blob_v2_column(&lance_schema, "missing").unwrap_err();
assert!(err.to_string().contains("no column named 'missing'"));
}
#[test]
fn blob_column_names_includes_nested_path() {
let blob_field = blob("blob", true);
let info = Field::new(
"info",
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
true,
);
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false), info]);
assert_eq!(blob_column_names(&schema), vec!["info.blob"]);
}
#[test]
fn storage_version_noop_without_blob_columns() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
let mut params = WriteParams::default();
ensure_blob_storage_version(&schema, &mut params);
assert!(params.data_storage_version.is_none());
}
}

View File

@@ -576,9 +576,6 @@ impl Connection {
/// For LanceNamespaceDatabase, it is the underlying LanceNamespace.
/// For ListingDatabase, it is the equivalent DirectoryNamespace.
/// For RemoteDatabase, it is the equivalent RestNamespace.
///
/// Remote connections using dynamic headers forward them through the
/// namespace client's per-request context provider.
pub async fn namespace_client(&self) -> Result<Arc<dyn lance_namespace::LanceNamespace>> {
self.internal.namespace_client().await
}
@@ -587,9 +584,6 @@ impl Connection {
/// Returns (impl_type, properties) where:
/// - impl_type: "dir" for DirectoryNamespace, "rest" for RestNamespace
/// - properties: configuration properties for the namespace
///
/// Remote connections using dynamic headers cannot be exported because the
/// namespace client config only carries static headers.
pub async fn namespace_client_config(
&self,
) -> Result<(String, std::collections::HashMap<String, String>)> {
@@ -667,8 +661,6 @@ pub struct ConnectRequest {
pub struct ConnectBuilder {
request: ConnectRequest,
embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
#[cfg(feature = "remote")]
oauth_config: Option<crate::remote::OAuthConfig>,
}
#[cfg(feature = "remote")]
@@ -690,8 +682,6 @@ impl ConnectBuilder {
session: None,
},
embedding_registry: None,
#[cfg(feature = "remote")]
oauth_config: None,
}
}
@@ -780,19 +770,6 @@ impl ConnectBuilder {
self
}
/// Configure OAuth authentication for LanceDB Cloud/Enterprise.
///
/// This creates an [`OAuthHeaderProvider`](crate::remote::OAuthHeaderProvider)
/// from the given config and sets it as the header provider. OAuth cannot
/// be combined with an API key or another header provider.
///
/// Token acquisition and refresh are handled in Rust.
#[cfg(feature = "remote")]
pub fn oauth_config(mut self, config: crate::remote::OAuthConfig) -> Self {
self.oauth_config = Some(config);
self
}
/// Provide a custom [`EmbeddingRegistry`] to use for this connection.
pub fn embedding_registry(mut self, registry: Arc<dyn EmbeddingRegistry>) -> Self {
self.embedding_registry = Some(registry);
@@ -938,40 +915,9 @@ impl ConnectBuilder {
let region = options.region.ok_or_else(|| Error::InvalidInput {
message: "A region is required when connecting to LanceDb Cloud".to_string(),
})?;
let api_key = match (&self.oauth_config, &options.api_key) {
(Some(_), None) => String::new(),
(Some(_), Some(_)) => {
return Err(Error::InvalidInput {
message:
"api_key and oauth_config cannot both be set when connecting to LanceDb Cloud"
.to_string(),
});
}
(None, Some(key)) => key.clone(),
(None, None) => {
return Err(Error::InvalidInput {
message:
"An api_key or oauth_config is required when connecting to LanceDb Cloud"
.to_string(),
});
}
};
if self.oauth_config.is_some() && self.request.client_config.header_provider.is_some() {
return Err(Error::InvalidInput {
message:
"oauth_config and client_config.header_provider cannot both be set when connecting to LanceDb Cloud"
.to_string(),
});
}
let mut client_config = self.request.client_config;
if let Some(oauth_config) = self.oauth_config {
let provider = crate::remote::OAuthHeaderProvider::new(oauth_config)?;
client_config.header_provider =
Some(Arc::new(provider) as Arc<dyn crate::remote::HeaderProvider>);
}
let api_key = options.api_key.ok_or_else(|| Error::InvalidInput {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
@@ -979,7 +925,7 @@ impl ConnectBuilder {
&api_key,
&region,
options.host_override,
client_config,
self.request.client_config,
storage_options.into(),
self.request.read_consistency_interval,
)?);
@@ -1288,83 +1234,6 @@ mod tests {
assert_eq!(Some(&"EXPLICIT-VALUE".to_string()), options.get(opts_key));
}
#[cfg(feature = "remote")]
#[tokio::test]
async fn test_connect_rejects_api_key_with_oauth_config() {
let oauth_config = crate::remote::OAuthConfig {
issuer_url: "https://issuer.example.com".to_string(),
client_id: "client-id".to_string(),
client_secret: Some("secret".to_string()),
scopes: vec!["scope".to_string()],
flow: crate::remote::OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
let result = ConnectBuilder::new("db://my-container/my-prefix")
.region("us-east-1")
.api_key("my-api-key")
.oauth_config(oauth_config)
.execute()
.await;
match result {
Err(Error::InvalidInput { message })
if message
== "api_key and oauth_config cannot both be set when connecting to LanceDb Cloud" =>
{}
Err(err) => panic!("expected InvalidInput, got {err:?}"),
Ok(_) => panic!("expected api_key and oauth_config to be rejected"),
}
}
#[cfg(feature = "remote")]
#[tokio::test]
async fn test_connect_rejects_header_provider_with_oauth_config() {
#[derive(Debug)]
struct TestHeaderProvider;
#[async_trait::async_trait]
impl crate::remote::HeaderProvider for TestHeaderProvider {
async fn get_headers(&self) -> Result<HashMap<String, String>> {
Ok(HashMap::from([(
"authorization".to_string(),
"Bearer token".to_string(),
)]))
}
}
let oauth_config = crate::remote::OAuthConfig {
issuer_url: "https://issuer.example.com".to_string(),
client_id: "client-id".to_string(),
client_secret: Some("secret".to_string()),
scopes: vec!["scope".to_string()],
flow: crate::remote::OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
let client_config = crate::remote::ClientConfig {
header_provider: Some(
Arc::new(TestHeaderProvider) as Arc<dyn crate::remote::HeaderProvider>
),
..Default::default()
};
let result = ConnectBuilder::new("db://my-container/my-prefix")
.region("us-east-1")
.client_config(client_config)
.oauth_config(oauth_config)
.execute()
.await;
match result {
Err(Error::InvalidInput { message })
if message
== "oauth_config and client_config.header_provider cannot both be set when connecting to LanceDb Cloud" =>
{}
Err(err) => panic!("expected InvalidInput, got {err:?}"),
Ok(_) => panic!("expected header_provider and oauth_config to be rejected"),
}
}
#[cfg(not(windows))]
#[tokio::test]
async fn test_connect_relative() {

View File

@@ -18,7 +18,6 @@ use lance_table::io::commit::commit_handler_from_url;
use object_store::local::LocalFileSystem;
use snafu::ResultExt;
use crate::blob::{ensure_blob_storage_version, has_blob_columns};
use crate::connection::ConnectRequest;
use crate::database::ReadConsistency;
use crate::database::namespace::LanceNamespaceDatabase;
@@ -839,16 +838,13 @@ impl ListingDatabase {
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
let data_schema = request.data.arrow_schema();
if let Some(enable_stable_row_ids) = stable_row_ids_override
.or(self.new_table_config.enable_stable_row_ids)
.or(has_blob_columns(&data_schema).then_some(true))
// Apply enable_stable_row_ids: table-level override takes precedence over connection config
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
{
write_params.enable_stable_row_ids = enable_stable_row_ids;
}
ensure_blob_storage_version(&data_schema, &mut write_params);
if matches!(&request.mode, CreateTableMode::Overwrite) {
write_params.mode = WriteMode::Overwrite;
}

View File

@@ -23,7 +23,6 @@ use lance_namespace_impls::ConnectBuilder;
use lance_table::io::commit::CommitHandler;
use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::blob::{ensure_blob_storage_version, has_blob_columns};
use crate::connection::NamespaceClientPushdownOperation;
use crate::database::ReadConsistency;
use crate::database::listing::{
@@ -258,16 +257,12 @@ impl LanceNamespaceDatabase {
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
let data_schema = request.data.schema();
if let Some(enable_stable_row_ids) = stable_row_ids_override
.or(self.new_table_config.enable_stable_row_ids)
.or(has_blob_columns(data_schema.as_ref()).then_some(true))
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
{
params.enable_stable_row_ids = enable_stable_row_ids;
}
ensure_blob_storage_version(data_schema.as_ref(), params);
Ok(())
}
}

View File

@@ -13,7 +13,7 @@ use serde_json::{Value, json};
use super::EmbeddingFunction;
use crate::{Error, Result};
use tokio::runtime::{Handle, RuntimeFlavor};
use tokio::runtime::Handle;
use tokio::task::block_in_place;
#[derive(Debug)]
@@ -148,12 +148,6 @@ impl BedrockEmbeddingFunction {
_ => unreachable!(),
};
// Bedrock's SDK is async but this trait method is synchronous, so we
// bridge with `block_in_place` + `block_on`. That requires a
// multi-threaded Tokio runtime; return a typed error instead of
// panicking when no compatible runtime is available.
let handle = current_multi_thread_handle()?;
for text in texts {
let request_body = match self.model {
BedrockEmbeddingModel::TitanEmbedding => {
@@ -169,28 +163,24 @@ impl BedrockEmbeddingFunction {
}
};
// Serialize before entering the blocking section so a serialization
// failure surfaces as a typed error rather than an `unwrap` panic.
let body = serde_json::to_vec(&request_body).map_err(|e| Error::Runtime {
message: format!("Failed to serialize Bedrock request: {e}"),
})?;
let client = self.client.clone();
let model_id = self.model.model_id().to_string();
let request_body = request_body.clone();
let response = block_in_place(|| {
handle.block_on(async move {
let response = block_in_place(move || {
Handle::current().block_on(async move {
client
.invoke_model()
.model_id(model_id)
.body(aws_sdk_bedrockruntime::primitives::Blob::new(body))
.body(aws_sdk_bedrockruntime::primitives::Blob::new(
serde_json::to_vec(&request_body).unwrap(),
))
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Bedrock invoke_model request failed: {e}"),
})
.map_err(Box::new)
})
})?;
})
.unwrap();
let response_json: Value =
serde_json::from_slice(response.body.as_ref()).map_err(|e| Error::Runtime {
@@ -198,12 +188,22 @@ impl BedrockEmbeddingFunction {
})?;
let embedding = match self.model {
BedrockEmbeddingModel::TitanEmbedding => {
json_array_to_f32(&response_json["embedding"], "embedding")?
}
BedrockEmbeddingModel::CohereLarge => {
json_array_to_f32(&response_json["embeddings"][0], "embeddings")?
}
BedrockEmbeddingModel::TitanEmbedding => response_json["embedding"]
.as_array()
.ok_or_else(|| Error::Runtime {
message: "Missing embedding in response".to_string(),
})?
.iter()
.map(|v| v.as_f64().unwrap() as f32)
.collect::<Vec<f32>>(),
BedrockEmbeddingModel::CohereLarge => response_json["embeddings"][0]
.as_array()
.ok_or_else(|| Error::Runtime {
message: "Missing embeddings in response".to_string(),
})?
.iter()
.map(|v| v.as_f64().unwrap() as f32)
.collect::<Vec<f32>>(),
};
builder.append_slice(&embedding);
@@ -212,86 +212,3 @@ impl BedrockEmbeddingFunction {
Ok(builder.finish())
}
}
/// Returns a handle to the current multi-threaded Tokio runtime, or a typed
/// [`Error::Runtime`] when called outside a runtime or on the current-thread
/// runtime. This keeps the synchronous-over-async bridge in
/// [`BedrockEmbeddingFunction::compute_inner`] from panicking on runtime
/// configurations that cannot support `block_in_place`.
fn current_multi_thread_handle() -> Result<Handle> {
let handle = Handle::try_current().map_err(|e| Error::Runtime {
message: format!("Bedrock embedding must be called from within a Tokio runtime: {e}"),
})?;
if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
return Err(Error::Runtime {
message: "Bedrock embedding requires a multi-threaded Tokio runtime; the \
current-thread runtime cannot use `block_in_place`"
.to_string(),
});
}
Ok(handle)
}
/// Converts a JSON value expected to be an array of numbers into `Vec<f32>`.
///
/// Returns a typed [`Error::Runtime`] (rather than panicking) when the value is
/// not an array or contains a non-numeric element, so malformed provider
/// responses degrade gracefully.
fn json_array_to_f32(value: &Value, field: &str) -> Result<Vec<f32>> {
let arr = value.as_array().ok_or_else(|| Error::Runtime {
message: format!("Missing or non-array '{field}' field in Bedrock response"),
})?;
arr.iter()
.map(|v| {
v.as_f64().map(|f| f as f32).ok_or_else(|| Error::Runtime {
message: format!("Non-numeric value in Bedrock '{field}' embedding: {v}"),
})
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn json_array_to_f32_parses_numbers() {
let v = json!([1.0, 2, -3.5]);
let out = json_array_to_f32(&v, "embedding").unwrap();
assert_eq!(out, vec![1.0_f32, 2.0, -3.5]);
}
#[test]
fn json_array_to_f32_rejects_non_array() {
// Missing field indexes to `Value::Null`; a malformed payload should be
// a typed error, not a panic.
let v = json!({"unexpected": "shape"});
let err = json_array_to_f32(&v["embedding"], "embedding").unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[test]
fn json_array_to_f32_rejects_non_numeric_element() {
let v = json!([1.0, "not-a-number", 3.0]);
let err = json_array_to_f32(&v, "embedding").unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[test]
fn handle_errors_without_runtime() {
// No Tokio runtime in scope -> typed error instead of a panic.
let err = current_multi_thread_handle().unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[tokio::test(flavor = "current_thread")]
async fn handle_errors_on_current_thread_runtime() {
let err = current_multi_thread_handle().unwrap_err();
assert!(matches!(err, Error::Runtime { .. }), "got {err:?}");
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_ok_on_multi_thread_runtime() {
current_multi_thread_handle().expect("multi-threaded runtime should be accepted");
}
}

View File

@@ -163,7 +163,6 @@
//! ```
pub mod arrow;
pub mod blob;
pub mod connection;
pub mod data;
pub mod database;
@@ -189,7 +188,6 @@ use std::{fmt::Display, str::FromStr};
use serde::{Deserialize, Serialize};
pub use blob::{blob, is_blob};
pub use connection::{ConnectNamespaceBuilder, Connection};
pub use error::{Error, Result};
use lance_index::vector::ApproxMode as LanceApproxMode;

View File

@@ -8,7 +8,6 @@
pub(crate) mod client;
pub(crate) mod db;
pub mod oauth;
mod retry;
pub(crate) mod table;
pub(crate) mod util;
@@ -21,4 +20,3 @@ const JSON_CONTENT_TYPE: &str = "application/json";
pub use client::{ClientConfig, HeaderProvider, RetryConfig, TimeoutConfig, TlsConfig};
pub use db::{RemoteDatabaseOptions, RemoteDatabaseOptionsBuilder};
pub use oauth::{OAuthConfig, OAuthFlow, OAuthHeaderProvider};

View File

@@ -459,14 +459,12 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
config: &ClientConfig,
) -> Result<HeaderMap> {
let mut headers = HeaderMap::new();
if !api_key.is_empty() {
headers.insert(
HeaderName::from_static("x-api-key"),
HeaderValue::from_str(api_key).map_err(|_| Error::InvalidInput {
message: "non-ascii api key provided".to_string(),
})?,
);
}
headers.insert(
HeaderName::from_static("x-api-key"),
HeaderValue::from_str(api_key).map_err(|_| Error::InvalidInput {
message: "non-ascii api key provided".to_string(),
})?,
);
if region == "local" {
let host = format!("{}.local.api.lancedb.com", db_name);
headers.insert(
@@ -1007,33 +1005,6 @@ mod tests {
assert!(!config_tls.assert_hostname);
}
#[test]
fn test_default_headers_skip_empty_api_key() {
let headers = RestfulLanceDbClient::<Sender>::default_headers(
"",
"us-east-1",
"db-name",
false,
&RemoteOptions::default(),
None,
&ClientConfig::default(),
)
.unwrap();
assert!(!headers.contains_key("x-api-key"));
let headers = RestfulLanceDbClient::<Sender>::default_headers(
"api-key",
"us-east-1",
"db-name",
false,
&RemoteOptions::default(),
None,
&ClientConfig::default(),
)
.unwrap();
assert_eq!(headers.get("x-api-key").unwrap(), "api-key");
}
// Test implementation of HeaderProvider
#[derive(Debug, Clone)]
struct TestHeaderProvider {

View File

@@ -7,7 +7,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use http::StatusCode;
use lance_io::object_store::StorageOptions;
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
use moka::future::Cache;
use reqwest::header::CONTENT_TYPE;
@@ -27,9 +26,7 @@ use crate::remote::util::stream_as_body;
use crate::table::BaseTable;
use super::ARROW_STREAM_CONTENT_TYPE;
use super::client::{
ClientConfig, HeaderProvider, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender,
};
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
use super::util::parse_server_version;
@@ -197,66 +194,10 @@ pub struct RemoteDatabase<S: HttpSend = Sender> {
uri: String,
/// Headers to pass to the namespace client for authentication
namespace_headers: HashMap<String, String>,
namespace_context_provider: Option<Arc<dyn DynamicContextProvider>>,
/// TLS configuration for mTLS support
tls_config: Option<super::client::TlsConfig>,
}
#[derive(Clone)]
struct NamespaceHeaderProviderContext {
header_provider: Arc<dyn HeaderProvider>,
}
impl std::fmt::Debug for NamespaceHeaderProviderContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NamespaceHeaderProviderContext")
.field("header_provider", &"Some(...)")
.finish()
}
}
impl DynamicContextProvider for NamespaceHeaderProviderContext {
fn provide_context(&self, _info: &OperationInfo) -> HashMap<String, String> {
let header_provider = Arc::clone(&self.header_provider);
let handle = match std::thread::Builder::new()
.name("lancedb-namespace-headers".to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| Error::Runtime {
message: format!(
"Failed to create runtime for namespace header provider: {e}"
),
})?
.block_on(header_provider.get_headers())
}) {
Ok(handle) => handle,
Err(err) => {
log::warn!("Failed to spawn dynamic namespace header provider thread: {err}");
return HashMap::new();
}
};
let headers = handle.join();
match headers {
Ok(Ok(headers)) => headers
.into_iter()
.map(|(key, value)| (format!("headers.{key}"), value))
.collect(),
Ok(Err(err)) => {
log::warn!("Failed to get dynamic namespace headers: {err}");
HashMap::new()
}
Err(_) => {
log::warn!("Dynamic namespace header provider panicked");
HashMap::new()
}
}
}
}
impl RemoteDatabase {
pub fn try_new(
uri: &str,
@@ -287,16 +228,6 @@ impl RemoteDatabase {
})
.collect();
let namespace_context_provider =
client_config
.header_provider
.as_ref()
.map(|header_provider| {
Arc::new(NamespaceHeaderProviderContext {
header_provider: Arc::clone(header_provider),
}) as Arc<dyn DynamicContextProvider>
});
let client = RestfulLanceDbClient::try_new(
&parsed,
region,
@@ -316,7 +247,6 @@ impl RemoteDatabase {
table_cache,
uri: uri.to_owned(),
namespace_headers,
namespace_context_provider,
tls_config: client_config.tls_config,
})
}
@@ -341,7 +271,6 @@ mod test_utils {
table_cache: Cache::new(0),
uri: "http://localhost".to_string(),
namespace_headers: HashMap::new(),
namespace_context_provider: None,
tls_config: None,
}
}
@@ -352,18 +281,11 @@ mod test_utils {
T: Into<reqwest::Body>,
{
let client = client_with_handler_and_config(handler, config.clone());
let namespace_context_provider =
config.header_provider.as_ref().map(|header_provider| {
Arc::new(NamespaceHeaderProviderContext {
header_provider: Arc::clone(header_provider),
}) as Arc<dyn DynamicContextProvider>
});
Self {
client,
table_cache: Cache::new(0),
uri: "http://localhost".to_string(),
namespace_headers: config.extra_headers.clone(),
namespace_context_provider,
tls_config: config.tls_config.clone(),
}
}
@@ -837,12 +759,9 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
// Create a RestNamespace pointing to the same remote host with the same authentication headers
let mut builder = lance_namespace_impls::RestNamespaceBuilder::new(self.client.host())
.delimiter(&self.client.id_delimiter)
// TODO: support header provider
.headers(self.namespace_headers.clone());
if let Some(context_provider) = &self.namespace_context_provider {
builder = builder.context_provider(Arc::clone(context_provider));
}
// Apply mTLS configuration if present
if let Some(tls_config) = &self.tls_config {
if let Some(cert_file) = &tls_config.cert_file {
@@ -862,14 +781,6 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
async fn namespace_client_config(&self) -> Result<(String, HashMap<String, String>)> {
if self.namespace_context_provider.is_some() {
return Err(Error::NotSupported {
message:
"Cannot export a namespace client config when dynamic headers are configured; use LanceDB connection namespace methods instead"
.to_string(),
});
}
let mut properties = HashMap::new();
properties.insert("uri".to_string(), self.client.host().to_string());
properties.insert("delimiter".to_string(), self.client.id_delimiter.clone());
@@ -921,13 +832,12 @@ impl From<StorageOptions> for RemoteOptions {
#[cfg(test)]
mod tests {
use super::{NamespaceHeaderProviderContext, build_cache_key};
use super::build_cache_key;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use arrow_array::{Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use lance_namespace_impls::{DynamicContextProvider, OperationInfo};
use crate::connection::ConnectBuilder;
use crate::{
@@ -1792,75 +1702,6 @@ mod tests {
assert!(namespace_client.is_ok());
}
#[test]
fn test_namespace_header_provider_context_maps_headers() {
#[derive(Debug)]
struct TestHeaderProvider;
#[async_trait::async_trait]
impl HeaderProvider for TestHeaderProvider {
async fn get_headers(&self) -> crate::Result<HashMap<String, String>> {
Ok(HashMap::from([(
"authorization".to_string(),
"Bearer token".to_string(),
)]))
}
}
let context_provider = NamespaceHeaderProviderContext {
header_provider: Arc::new(TestHeaderProvider) as Arc<dyn HeaderProvider>,
};
let context =
context_provider.provide_context(&OperationInfo::new("list_tables", "namespace"));
assert_eq!(
context.get("headers.authorization"),
Some(&"Bearer token".to_string())
);
}
#[tokio::test]
async fn test_namespace_client_supports_dynamic_headers() {
#[derive(Debug)]
struct TestHeaderProvider;
#[async_trait::async_trait]
impl HeaderProvider for TestHeaderProvider {
async fn get_headers(&self) -> crate::Result<HashMap<String, String>> {
Ok(HashMap::from([(
"authorization".to_string(),
"Bearer token".to_string(),
)]))
}
}
let client_config = ClientConfig {
header_provider: Some(Arc::new(TestHeaderProvider) as Arc<dyn HeaderProvider>),
..Default::default()
};
let conn = Connection::new_with_handler_and_config(
|_| {
http::Response::builder()
.status(200)
.body(r#"{"tables": []}"#)
.unwrap()
},
client_config,
);
let namespace_client = conn.namespace_client().await;
assert!(namespace_client.is_ok());
match conn.namespace_client_config().await {
Err(Error::NotSupported { message })
if message.contains("dynamic headers are configured") => {}
Err(err) => panic!("expected NotSupported, got {err:?}"),
Ok(_) => panic!("expected namespace_client_config to reject dynamic headers"),
}
}
/// Integration tests using RestAdapter to run RemoteDatabase against a real namespace server
mod rest_adapter_integration {
use super::*;

View File

@@ -1,907 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use log::debug;
use reqwest::Client;
use serde::Deserialize;
use tokio::sync::RwLock;
use crate::error::{Error, Result};
use crate::remote::client::HeaderProvider;
const DEFAULT_REFRESH_BUFFER_SECS: u64 = 300;
const DEFAULT_TOKEN_TTL_SECS: u64 = 3600;
const AZURE_IMDS_ENDPOINT: &str = "http://169.254.169.254/metadata/identity/oauth2/token";
const AZURE_IMDS_API_VERSION: &str = "2018-02-01";
/// OAuth authentication flow configuration.
#[derive(Debug, Clone)]
pub enum OAuthFlow {
/// Client Credentials grant (service-to-service / M2M).
/// Requires `client_secret` in [`OAuthConfig`].
ClientCredentials,
/// Azure Managed Identity via IMDS.
/// Works on Azure VMs, AKS, App Service, and Azure Functions.
/// IMDS requests bypass proxy settings because the endpoint is link-local.
AzureManagedIdentity {
/// Client ID for user-assigned managed identity.
/// Omit for system-assigned managed identity.
client_id: Option<String>,
},
}
/// OAuth configuration for LanceDB authentication.
///
/// All token acquisition and refresh is handled in the Rust layer.
/// Python and TypeScript bindings expose this as a plain config object.
#[derive(Clone)]
pub struct OAuthConfig {
/// OIDC issuer URL or OAuth authority URL.
/// For Azure: `https://login.microsoftonline.com/{tenant_id}/v2.0`
pub issuer_url: String,
/// Application / Client ID.
pub client_id: String,
/// Client secret (required for `ClientCredentials`, optional for others).
pub client_secret: Option<String>,
/// OAuth scopes to request.
/// For Azure managed identity, exactly one scope or resource is required.
/// For example: `["api://{app_id}/.default"]`
pub scopes: Vec<String>,
/// Authentication flow to use.
pub flow: OAuthFlow,
/// Seconds before token expiry to trigger proactive refresh (default: 300).
/// Keep this well below the token TTL; if it is greater than or equal to
/// the TTL, each request refreshes the token.
pub refresh_buffer_secs: Option<u64>,
}
impl std::fmt::Debug for OAuthConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OAuthConfig")
.field("issuer_url", &self.issuer_url)
.field("client_id", &self.client_id)
.field(
"client_secret",
&self.client_secret.as_deref().map(|_| "<redacted>"),
)
.field("scopes", &self.scopes)
.field("flow", &self.flow)
.field("refresh_buffer_secs", &self.refresh_buffer_secs)
.finish()
}
}
// -- OIDC Discovery --
#[derive(Clone, Debug, Deserialize)]
struct OidcDiscovery {
token_endpoint: String,
}
// -- Token Response --
#[derive(Deserialize)]
struct TokenResponse {
access_token: String,
/// Token lifetime in seconds.
/// Some providers (Azure IMDS) return this as a string, so we accept both.
#[serde(default, deserialize_with = "deserialize_optional_u64_or_string")]
expires_in: Option<u64>,
#[serde(default)]
#[allow(dead_code)]
token_type: Option<String>,
}
impl std::fmt::Debug for TokenResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TokenResponse")
.field("access_token", &"<redacted>")
.field("expires_in", &self.expires_in)
.field("token_type", &self.token_type)
.finish()
}
}
fn deserialize_optional_u64_or_string<'de, D>(
deserializer: D,
) -> std::result::Result<Option<u64>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de;
struct U64OrString;
impl<'de> de::Visitor<'de> for U64OrString {
type Value = Option<u64>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("an integer, an integer-valued float, a numeric string, or null")
}
fn visit_u64<E: de::Error>(self, v: u64) -> std::result::Result<Self::Value, E> {
Ok(Some(v))
}
fn visit_i64<E: de::Error>(self, v: i64) -> std::result::Result<Self::Value, E> {
if v < 0 {
return Err(E::custom(format!("invalid expires_in value: {v}")));
}
Ok(Some(v as u64))
}
fn visit_f64<E: de::Error>(self, v: f64) -> std::result::Result<Self::Value, E> {
if !v.is_finite() || v < 0.0 || v.fract() != 0.0 || v > u64::MAX as f64 {
return Err(E::custom(format!("invalid expires_in value: {v}")));
}
Ok(Some(v as u64))
}
fn visit_str<E: de::Error>(self, v: &str) -> std::result::Result<Self::Value, E> {
v.parse::<u64>().map(Some).map_err(de::Error::custom)
}
fn visit_none<E: de::Error>(self) -> std::result::Result<Self::Value, E> {
Ok(None)
}
fn visit_unit<E: de::Error>(self) -> std::result::Result<Self::Value, E> {
Ok(None)
}
}
deserializer.deserialize_any(U64OrString)
}
// -- Internal Token State --
struct TokenState {
access_token: Option<String>,
expires_at: Option<Instant>,
}
impl TokenState {
fn new() -> Self {
Self {
access_token: None,
expires_at: None,
}
}
fn is_expired(&self, buffer: Duration) -> bool {
match (self.access_token.as_ref(), self.expires_at) {
(Some(_), Some(expires_at)) => Instant::now() + buffer >= expires_at,
(None, _) => true,
(Some(_), None) => true,
}
}
fn update(&mut self, resp: &TokenResponse) {
self.access_token = Some(resp.access_token.clone());
let expires_in = resp.expires_in.unwrap_or(DEFAULT_TOKEN_TTL_SECS);
self.expires_at = Some(Instant::now() + Duration::from_secs(expires_in));
}
}
#[async_trait]
trait TokenSource: Send + Sync + std::fmt::Debug {
async fn fetch_token(&self) -> Result<TokenResponse>;
}
struct ClientCredentialsSource {
issuer_url: String,
client_id: String,
client_secret: String,
scopes: Vec<String>,
http_client: Client,
discovery: RwLock<Option<OidcDiscovery>>,
}
impl std::fmt::Debug for ClientCredentialsSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientCredentialsSource")
.field("issuer_url", &self.issuer_url)
.field("client_id", &self.client_id)
.field("client_secret", &"<redacted>")
.field("scopes", &self.scopes)
.finish()
}
}
impl ClientCredentialsSource {
fn new(
issuer_url: String,
client_id: String,
client_secret: Option<String>,
scopes: Vec<String>,
) -> Result<Self> {
let client_secret = client_secret.ok_or(Error::InvalidInput {
message: "client_secret is required for ClientCredentials flow".to_string(),
})?;
Self::validate_issuer_transport(&issuer_url)?;
let http_client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| Error::Runtime {
message: format!("Failed to create HTTP client for OAuth: {e}"),
})?;
Ok(Self {
issuer_url,
client_id,
client_secret,
scopes,
http_client,
discovery: RwLock::new(None),
})
}
fn validate_issuer_transport(issuer_url: &str) -> Result<()> {
let issuer = url::Url::parse(issuer_url).map_err(|e| Error::InvalidInput {
message: format!("Invalid OAuth issuer_url: {e}"),
})?;
match issuer.scheme() {
"https" => Ok(()),
"http" if Self::is_loopback_issuer(&issuer) => Ok(()),
_ => Err(Error::InvalidInput {
message:
"ClientCredentials OAuth issuer_url must use https, except for loopback hosts"
.to_string(),
}),
}
}
fn is_loopback_issuer(issuer: &url::Url) -> bool {
let Some(host) = issuer.host_str() else {
return false;
};
host.eq_ignore_ascii_case("localhost")
|| host
.parse::<IpAddr>()
.map(|addr| addr.is_loopback())
.unwrap_or(false)
}
async fn get_discovery(&self) -> Result<OidcDiscovery> {
{
let cached = self.discovery.read().await;
if let Some(ref disc) = *cached {
return Ok(disc.clone());
}
}
let mut cache = self.discovery.write().await;
// Double-check
if let Some(ref disc) = *cache {
return Ok(disc.clone());
}
let discovery_url = format!(
"{}/.well-known/openid-configuration",
self.issuer_url.trim_end_matches('/')
);
debug!("Fetching OIDC discovery from {}", discovery_url);
let resp = self
.http_client
.get(&discovery_url)
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to fetch OIDC discovery document: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"OIDC discovery failed with status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
let disc: OidcDiscovery = resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse OIDC discovery document: {e}"),
})?;
let result = disc.clone();
*cache = Some(disc);
Ok(result)
}
async fn get_token_endpoint(&self) -> Result<String> {
self.get_discovery().await.map(|disc| disc.token_endpoint)
}
fn scopes_string(&self) -> String {
self.scopes.join(" ")
}
async fn post_token_request(
&self,
endpoint: &str,
params: &[(&str, &str)],
) -> Result<TokenResponse> {
let resp = self
.http_client
.post(endpoint)
.form(params)
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Token request to {endpoint} failed: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"Token request failed with status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse token response: {e}"),
})
}
}
#[async_trait]
impl TokenSource for ClientCredentialsSource {
async fn fetch_token(&self) -> Result<TokenResponse> {
let token_endpoint = self.get_token_endpoint().await?;
let scope = self.scopes_string();
let params = [
("grant_type", "client_credentials"),
("client_id", self.client_id.as_str()),
("client_secret", self.client_secret.as_str()),
("scope", scope.as_str()),
];
self.post_token_request(&token_endpoint, &params).await
}
}
struct AzureImdsSource {
client_id: Option<String>,
resource: String,
http_client: Client,
}
impl std::fmt::Debug for AzureImdsSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureImdsSource")
.field("client_id", &self.client_id)
.field("resource", &self.resource)
.finish()
}
}
impl AzureImdsSource {
fn new(scopes: Vec<String>, client_id: Option<String>) -> Result<Self> {
let resource = Self::resource_from_scopes(&scopes)?;
let http_client = Client::builder()
.timeout(Duration::from_secs(30))
.no_proxy()
.build()
.map_err(|e| Error::Runtime {
message: format!("Failed to create HTTP client for Azure IMDS OAuth: {e}"),
})?;
Ok(Self {
client_id,
resource,
http_client,
})
}
fn resource_from_scopes(scopes: &[String]) -> Result<String> {
let [scope] = scopes else {
return Err(Error::InvalidInput {
message: "AzureManagedIdentity flow requires exactly one OAuth scope or resource"
.to_string(),
});
};
Ok(scope.strip_suffix("/.default").unwrap_or(scope).to_string())
}
}
#[async_trait]
impl TokenSource for AzureImdsSource {
async fn fetch_token(&self) -> Result<TokenResponse> {
let mut url = format!(
"{AZURE_IMDS_ENDPOINT}?api-version={AZURE_IMDS_API_VERSION}&resource={}",
urlencoding::encode(&self.resource),
);
if let Some(cid) = self.client_id.as_deref() {
url.push_str(&format!("&client_id={}", urlencoding::encode(cid)));
}
let resp = self
.http_client
.get(&url)
.header("Metadata", "true")
.send()
.await
.map_err(|e| Error::Runtime {
message: format!("Azure IMDS request failed: {e}"),
})?;
if !resp.status().is_success() {
return Err(Error::Runtime {
message: format!(
"Azure IMDS returned status {}: {}",
resp.status(),
resp.text().await.unwrap_or_default()
),
});
}
resp.json().await.map_err(|e| Error::Runtime {
message: format!("Failed to parse IMDS token response: {e}"),
})
}
}
/// OAuth header provider that manages the full token lifecycle.
///
/// Implements [`HeaderProvider`] to inject `Authorization: Bearer <token>`
/// headers into every LanceDB request, with automatic token refresh.
pub struct OAuthHeaderProvider {
token_source: Box<dyn TokenSource>,
token_state: Arc<RwLock<TokenState>>,
refresh_buffer: Duration,
}
impl std::fmt::Debug for OAuthHeaderProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OAuthHeaderProvider")
.field("token_source", &self.token_source)
.finish()
}
}
impl OAuthHeaderProvider {
/// Create a new OAuth header provider from configuration.
pub fn new(config: OAuthConfig) -> Result<Self> {
let OAuthConfig {
issuer_url,
client_id,
client_secret,
scopes,
flow,
refresh_buffer_secs,
} = config;
if scopes.is_empty() {
return Err(Error::InvalidInput {
message: "At least one OAuth scope is required".to_string(),
});
}
let refresh_buffer =
Duration::from_secs(refresh_buffer_secs.unwrap_or(DEFAULT_REFRESH_BUFFER_SECS));
let token_source: Box<dyn TokenSource> = match flow {
OAuthFlow::ClientCredentials => Box::new(ClientCredentialsSource::new(
issuer_url,
client_id,
client_secret,
scopes,
)?),
OAuthFlow::AzureManagedIdentity { client_id } => {
Box::new(AzureImdsSource::new(scopes, client_id)?)
}
};
Ok(Self {
token_source,
token_state: Arc::new(RwLock::new(TokenState::new())),
refresh_buffer,
})
}
/// Get a valid access token, refreshing if necessary.
async fn get_valid_token(&self) -> Result<String> {
// Fast path: check if current token is still valid
{
let state = self.token_state.read().await;
if !state.is_expired(self.refresh_buffer)
&& let Some(ref token) = state.access_token
{
return Ok(token.clone());
}
}
// Slow path: acquire or refresh token
let mut state = self.token_state.write().await;
// Double-check after acquiring write lock
if !state.is_expired(self.refresh_buffer)
&& let Some(ref token) = state.access_token
{
return Ok(token.clone());
}
debug!("Acquiring new OAuth token via {:?}", self.token_source);
let resp = self.token_source.fetch_token().await?;
state.update(&resp);
Ok(resp.access_token)
}
}
#[async_trait]
impl HeaderProvider for OAuthHeaderProvider {
async fn get_headers(&self) -> Result<HashMap<String, String>> {
let token = self.get_valid_token().await?;
Ok(HashMap::from([(
"authorization".to_string(),
format!("Bearer {token}"),
)]))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::task::JoinHandle;
#[test]
fn test_token_state_expiry() {
let mut state = TokenState::new();
assert!(state.is_expired(Duration::from_secs(0)));
state.access_token = Some("tok".to_string());
state.expires_at = Some(Instant::now() + Duration::from_secs(600));
assert!(!state.is_expired(Duration::from_secs(300)));
assert!(state.is_expired(Duration::from_secs(601)));
state.expires_at = None;
assert!(state.is_expired(Duration::from_secs(0)));
}
#[test]
fn test_token_state_uses_default_expiry() {
let mut state = TokenState::new();
let response = TokenResponse {
access_token: "tok".to_string(),
expires_in: None,
token_type: None,
};
state.update(&response);
assert!(!state.is_expired(Duration::from_secs(DEFAULT_TOKEN_TTL_SECS - 1)));
assert!(state.is_expired(Duration::from_secs(DEFAULT_TOKEN_TTL_SECS + 1)));
}
#[test]
fn test_token_response_accepts_float_expires_in() {
let response: TokenResponse =
serde_json::from_str(r#"{"access_token":"tok","expires_in":3600.0}"#).unwrap();
assert_eq!(response.expires_in, Some(3600));
}
#[test]
fn test_token_response_rejects_negative_expires_in() {
let err =
serde_json::from_str::<TokenResponse>(r#"{"access_token":"tok","expires_in":-1}"#)
.unwrap_err();
assert!(err.to_string().contains("invalid expires_in value: -1"));
}
#[test]
fn test_token_response_debug_redacts_access_token() {
let response = TokenResponse {
access_token: "secret-token".to_string(),
expires_in: Some(3600),
token_type: Some("Bearer".to_string()),
};
let debug = format!("{response:?}");
assert!(!debug.contains("secret-token"));
assert!(debug.contains("access_token: \"<redacted>\""));
}
#[test]
fn test_scopes_string() {
let source = ClientCredentialsSource::new(
"https://login.microsoftonline.com/tenant/v2.0".to_string(),
"app-id".to_string(),
Some("secret".to_string()),
vec!["scope1".to_string(), "scope2".to_string()],
)
.unwrap();
assert_eq!(source.scopes_string(), "scope1 scope2");
}
#[test]
fn test_oauth_config_debug_redacts_client_secret() {
let config = OAuthConfig {
issuer_url: "https://issuer.example.com".to_string(),
client_id: "client-id".to_string(),
client_secret: Some("super-secret".to_string()),
scopes: vec!["scope".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
let debug = format!("{config:?}");
assert!(!debug.contains("super-secret"));
assert!(debug.contains("client_secret: Some(\"<redacted>\")"));
}
#[test]
fn test_oauth_header_provider_debug_redacts_client_secret() {
let config = OAuthConfig {
issuer_url: "https://issuer.example.com".to_string(),
client_id: "client-id".to_string(),
client_secret: Some("super-secret".to_string()),
scopes: vec!["scope".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
let provider = OAuthHeaderProvider::new(config).unwrap();
let debug = format!("{provider:?}");
assert!(!debug.contains("super-secret"));
assert!(debug.contains("client_secret: \"<redacted>\""));
}
#[test]
fn test_managed_identity_resource_from_default_scope() {
assert_eq!(
AzureImdsSource::resource_from_scopes(&["api://test/.default".to_string()]).unwrap(),
"api://test"
);
}
#[test]
fn test_managed_identity_resource_without_default_suffix() {
assert_eq!(
AzureImdsSource::resource_from_scopes(&["api://test".to_string()]).unwrap(),
"api://test"
);
}
#[test]
fn test_managed_identity_rejects_multiple_scopes() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
client_id: "app-id".to_string(),
client_secret: None,
scopes: vec![
"api://test-a/.default".to_string(),
"api://test-b/.default".to_string(),
],
flow: OAuthFlow::AzureManagedIdentity { client_id: None },
refresh_buffer_secs: None,
};
assert!(OAuthHeaderProvider::new(config).is_err());
}
#[tokio::test]
async fn test_token_endpoint_requires_discovery_success() {
let (issuer_url, server) = spawn_discovery_error_server().await;
let source = ClientCredentialsSource::new(
issuer_url,
"client-id".to_string(),
Some("secret".to_string()),
vec!["scope".to_string()],
)
.unwrap();
let err = source.get_token_endpoint().await.unwrap_err();
assert!(matches!(
err,
Error::Runtime { message }
if message.contains("OIDC discovery failed with status 503")
));
server.await.unwrap();
}
#[test]
fn test_client_credentials_requires_secret() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
client_id: "app-id".to_string(),
client_secret: None,
scopes: vec!["scope".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
assert!(OAuthHeaderProvider::new(config).is_err());
}
#[test]
fn test_client_credentials_rejects_insecure_non_loopback_issuer() {
let config = OAuthConfig {
issuer_url: "http://issuer.example.com".to_string(),
client_id: "app-id".to_string(),
client_secret: Some("secret".to_string()),
scopes: vec!["scope".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: None,
};
let err = OAuthHeaderProvider::new(config).unwrap_err();
assert!(matches!(
err,
Error::InvalidInput { message }
if message == "ClientCredentials OAuth issuer_url must use https, except for loopback hosts"
));
}
#[test]
fn test_empty_scopes_rejected() {
let config = OAuthConfig {
issuer_url: "https://login.microsoftonline.com/tenant/v2.0".to_string(),
client_id: "app-id".to_string(),
client_secret: None,
scopes: vec![],
flow: OAuthFlow::AzureManagedIdentity { client_id: None },
refresh_buffer_secs: None,
};
assert!(OAuthHeaderProvider::new(config).is_err());
}
#[tokio::test]
async fn test_client_credentials_token_lifecycle() {
let (issuer_url, token_requests, server) = spawn_oauth_server().await;
let config = OAuthConfig {
issuer_url,
client_id: "client-id".to_string(),
client_secret: Some("secret".to_string()),
scopes: vec!["scope".to_string()],
flow: OAuthFlow::ClientCredentials,
refresh_buffer_secs: Some(0),
};
let provider = OAuthHeaderProvider::new(config).unwrap();
let headers = provider.get_headers().await.unwrap();
assert_eq!(headers.get("authorization").unwrap(), "Bearer token-1");
assert_eq!(token_requests.load(Ordering::SeqCst), 1);
let headers = provider.get_headers().await.unwrap();
assert_eq!(headers.get("authorization").unwrap(), "Bearer token-1");
assert_eq!(token_requests.load(Ordering::SeqCst), 1);
provider.token_state.write().await.expires_at =
Some(Instant::now() - Duration::from_secs(1));
let headers = provider.get_headers().await.unwrap();
assert_eq!(headers.get("authorization").unwrap(), "Bearer token-2");
assert_eq!(token_requests.load(Ordering::SeqCst), 2);
server.await.unwrap();
}
async fn spawn_oauth_server() -> (String, Arc<AtomicUsize>, JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let issuer_url = format!("http://{addr}");
let token_requests = Arc::new(AtomicUsize::new(0));
let server_token_requests = Arc::clone(&token_requests);
let server = tokio::spawn(async move {
for _ in 0..3 {
let (mut stream, _) = listener.accept().await.unwrap();
let (request_line, body) = read_http_request(&mut stream).await;
if request_line.starts_with("GET /.well-known/openid-configuration ") {
let discovery = format!(r#"{{"token_endpoint":"http://{addr}/token"}}"#);
write_json_response(&mut stream, "200 OK", &discovery).await;
} else if request_line.starts_with("POST /token ") {
assert!(body.contains("grant_type=client_credentials"));
assert!(body.contains("client_id=client-id"));
assert!(body.contains("client_secret=secret"));
assert!(body.contains("scope=scope"));
let token_num = server_token_requests.fetch_add(1, Ordering::SeqCst) + 1;
let token = format!(
r#"{{"access_token":"token-{token_num}","expires_in":3600,"token_type":"Bearer"}}"#
);
write_json_response(&mut stream, "200 OK", &token).await;
} else {
write_json_response(&mut stream, "404 Not Found", "{}").await;
}
}
});
(issuer_url, token_requests, server)
}
async fn spawn_discovery_error_server() -> (String, JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let issuer_url = format!("http://{addr}");
let server = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
let (request_line, _) = read_http_request(&mut stream).await;
assert!(request_line.starts_with("GET /.well-known/openid-configuration "));
write_json_response(&mut stream, "503 Service Unavailable", "{}").await;
});
(issuer_url, server)
}
async fn read_http_request(stream: &mut TcpStream) -> (String, String) {
let mut buffer = Vec::new();
let mut header_end = None;
while header_end.is_none() {
let mut chunk = [0; 1024];
let read = stream.read(&mut chunk).await.unwrap();
assert_ne!(read, 0, "connection closed before request headers");
buffer.extend_from_slice(&chunk[..read]);
header_end = find_subsequence(&buffer, b"\r\n\r\n").map(|pos| pos + 4);
}
let header_end = header_end.unwrap();
let headers = String::from_utf8_lossy(&buffer[..header_end]).to_string();
let request_line = headers.lines().next().unwrap_or_default().to_string();
let content_length = headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
name.eq_ignore_ascii_case("content-length")
.then(|| value.trim().parse::<usize>().ok())
.flatten()
})
.unwrap_or(0);
while buffer.len() < header_end + content_length {
let mut chunk = [0; 1024];
let read = stream.read(&mut chunk).await.unwrap();
assert_ne!(read, 0, "connection closed before request body");
buffer.extend_from_slice(&chunk[..read]);
}
let body =
String::from_utf8_lossy(&buffer[header_end..header_end + content_length]).to_string();
(request_line, body)
}
fn find_subsequence(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}
async fn write_json_response(stream: &mut TcpStream, status: &str, body: &str) {
let response = format!(
"HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{body}",
body.len()
);
stream.write_all(response.as_bytes()).await.unwrap();
}
}

View File

@@ -1352,35 +1352,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
}
}
/// Deserialize an index's `created_at` field.
///
/// The server returns this as an RFC 3339 string (e.g. `"2026-06-18T21:37:36.637Z"`),
/// but older deployments sent a unix timestamp in milliseconds. Accept both so the
/// client works against any server version.
fn deserialize_created_at<'de, D>(
deserializer: D,
) -> std::result::Result<Option<DateTime<Utc>>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error as _;
#[derive(Deserialize)]
#[serde(untagged)]
enum CreatedAt {
Rfc3339(String),
Millis(i64),
}
match Option::<CreatedAt>::deserialize(deserializer)? {
None => Ok(None),
Some(CreatedAt::Rfc3339(s)) => DateTime::parse_from_rfc3339(&s)
.map(|dt| Some(dt.with_timezone(&Utc)))
.map_err(D::Error::custom),
Some(CreatedAt::Millis(ms)) => Ok(DateTime::from_timestamp_millis(ms)),
}
}
impl<S: HttpSend + 'static> RemoteTable<S> {
/// Parse the response from `/index/list/` into `IndexConfig` entries.
///
@@ -1409,7 +1380,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
// Used as the sentinel to decide whether to skip the stats call.
index_type: Option<IndexType>,
index_uuid: Option<String>,
#[serde(default, deserialize_with = "deserialize_created_at")]
#[serde(default, with = "chrono::serde::ts_milliseconds_option")]
created_at: Option<DateTime<Utc>>,
num_indexed_rows: Option<u64>,
num_unindexed_rows: Option<u64>,
@@ -4707,7 +4678,7 @@ mod tests {
"num_segments": 2,
"index_version": 1,
"index_details": "{\"num_partitions\":16}",
"created_at": "2026-06-18T21:37:36.637Z",
"created_at": 1700000000000i64,
"type_url": "type.googleapis.com/lance.index.vector.IvfPq",
},
{
@@ -4757,10 +4728,7 @@ mod tests {
vec_idx.type_url,
Some("type.googleapis.com/lance.index.vector.IvfPq".to_string())
);
assert_eq!(
vec_idx.created_at,
Some("2026-06-18T21:37:36.637Z".parse::<DateTime<Utc>>().unwrap())
);
assert!(vec_idx.created_at.is_some());
let text_idx = &indices[1];
assert_eq!(text_idx.name, "text_idx");
@@ -4781,36 +4749,6 @@ mod tests {
assert_eq!(text_idx.created_at, None);
}
#[test]
fn test_deserialize_created_at() {
#[derive(Deserialize)]
struct Wrapper {
#[serde(default, deserialize_with = "deserialize_created_at")]
created_at: Option<DateTime<Utc>>,
}
// RFC 3339 string (current server format).
let w: Wrapper =
serde_json::from_str(r#"{"created_at": "2026-06-18T21:37:36.637Z"}"#).unwrap();
assert_eq!(
w.created_at,
Some("2026-06-18T21:37:36.637Z".parse::<DateTime<Utc>>().unwrap())
);
// Unix milliseconds (legacy server format).
let w: Wrapper = serde_json::from_str(r#"{"created_at": 1700000000000}"#).unwrap();
assert_eq!(w.created_at, DateTime::from_timestamp_millis(1700000000000));
// Null and missing both yield None.
let w: Wrapper = serde_json::from_str(r#"{"created_at": null}"#).unwrap();
assert_eq!(w.created_at, None);
let w: Wrapper = serde_json::from_str(r#"{}"#).unwrap();
assert_eq!(w.created_at, None);
// A malformed string is rejected rather than silently dropped to None.
assert!(serde_json::from_str::<Wrapper>(r#"{"created_at": "not-a-date"}"#).is_err());
}
#[tokio::test]
async fn test_list_versions() {
let table = Table::new_with_handler("my_table", |request| {

View File

@@ -3,7 +3,7 @@
//! LanceDB Table APIs
use arrow_array::{LargeBinaryArray, RecordBatch, RecordBatchReader};
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_execution::TaskContext;
@@ -12,7 +12,6 @@ use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use lance::dataset::BlobFile;
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
@@ -588,28 +587,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn close_lsm_writers(&self) -> Result<()> {
Ok(())
}
/// Names of the blob v2 columns in this table, in declaration order.
async fn blob_columns(&self) -> Result<Vec<String>> {
Err(Error::NotSupported {
message: "blob_columns is not supported on this table type".into(),
})
}
/// Materialize blob bytes for the given row ids. See [`Table::fetch_blobs`].
async fn fetch_blobs(&self, _column: &str, _row_ids: &[u64]) -> Result<LargeBinaryArray> {
Err(Error::NotSupported {
message: "fetch_blobs is not supported on this table type".into(),
})
}
/// Open lazy blob handles for the given row ids. See [`Table::fetch_blob_files`].
async fn fetch_blob_files(
&self,
_column: &str,
_row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
Err(Error::NotSupported {
message: "fetch_blob_files is not supported on this table type".into(),
})
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -950,76 +927,6 @@ impl Table {
self.inner.count_rows(filter.map(Filter::Sql)).await
}
/// Names of the blob v2 columns in this table, in declaration order.
///
/// Nested blobs use dotted paths (e.g. `info.blob`). Returns
/// [`Error::NotSupported`] on table types without blob support.
pub async fn blob_columns(&self) -> Result<Vec<String>> {
self.inner.blob_columns().await
}
/// Materialize blob bytes for the given row ids.
///
/// Output matches `row_ids` in length and order. Null and zero-length rows
/// are null. Prefer [`Self::fetch_blob_files`] for large selections.
///
/// ```
/// use arrow_array::UInt64Array;
/// use futures::TryStreamExt;
/// use lancedb::query::{ExecutableQuery, QueryBase};
///
/// # use lancedb::Table;
/// # async fn materialize(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// let mut stream = table.query().with_row_id().limit(10).execute().await?;
/// while let Some(batch) = stream.try_next().await? {
/// let row_ids = batch
/// .column_by_name("_rowid")
/// .unwrap()
/// .as_any()
/// .downcast_ref::<UInt64Array>()
/// .unwrap();
/// let images = table.fetch_blobs("image", row_ids.values()).await?;
/// let _ = images;
/// }
/// # Ok(())
/// # }
/// ```
///
/// Returns [`Error::InvalidInput`] when the column does not exist or is
/// not a blob v2 column, and [`Error::NotSupported`] on table types
/// without blob support.
pub async fn fetch_blobs(
&self,
column: impl AsRef<str>,
row_ids: &[u64],
) -> Result<LargeBinaryArray> {
self.inner.fetch_blobs(column.as_ref(), row_ids).await
}
/// Open lazy [`BlobFile`] handles for the given row ids.
///
/// Same length and order as `row_ids`. Null rows are `None`. Bytes are not
/// read from disk until a call to [`BlobFile::read`].
///
/// ```
/// # use lancedb::Table;
/// # async fn lazy_read(table: &Table, row_ids: &[u64]) -> Result<(), Box<dyn std::error::Error>> {
/// let handles = table.fetch_blob_files("image", row_ids).await?;
/// if let Some(Some(first)) = handles.first() {
/// let bytes = first.read().await?;
/// println!("first blob is {} bytes", bytes.len());
/// }
/// # Ok(())
/// # }
/// ```
pub async fn fetch_blob_files(
&self,
column: impl AsRef<str>,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
self.inner.fetch_blob_files(column.as_ref(), row_ids).await
}
/// Insert new records into this Table
///
/// # Arguments
@@ -2854,25 +2761,6 @@ impl BaseTable for NativeTable {
merge::lsm::close_lsm_writers(self).await
}
async fn blob_columns(&self) -> Result<Vec<String>> {
let schema = self.schema().await?;
Ok(crate::blob::blob_column_names(schema.as_ref()))
}
async fn fetch_blobs(&self, column: &str, row_ids: &[u64]) -> Result<LargeBinaryArray> {
let dataset = self.dataset.get().await?;
crate::blob::take_blobs_aligned(&dataset, column, row_ids).await
}
async fn fetch_blob_files(
&self,
column: &str,
row_ids: &[u64],
) -> Result<Vec<Option<BlobFile>>> {
let dataset = self.dataset.get().await?;
crate::blob::take_blob_files_aligned(&dataset, column, row_ids).await
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
let result = delete::execute_delete(self, predicate).await?;

View File

@@ -26,9 +26,6 @@ pub enum AddDataMode {
#[default]
Append,
/// The existing table will be overwritten with the new data
///
/// On overwrite, raw binary is not coerced into a blob struct. The input
/// must declare blob v2 for the column to stay a blob column.
Overwrite,
}

View File

@@ -3,7 +3,6 @@
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
mod blob_coerce;
pub mod cast;
pub mod insert;
pub mod reject_nan;

View File

@@ -1,495 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Coerces write-path input into blob v2 struct columns.
//!
//! [`super::cast::cast_to_table_schema`] calls [`coerce_blob_expr`].
use std::sync::Arc;
use arrow_schema::{DataType, Field, FieldRef};
use datafusion::functions::core::{get_field, named_struct};
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::PhysicalExpr;
use crate::error::{Error, Result};
/// Build a projection expression coercing `input_expr` into the blob struct
/// declared by `table_field`, composing `named_struct` / `get_field` / `cast`.
pub(super) fn coerce_blob_expr(
input_expr: Arc<dyn PhysicalExpr>,
input_field: &Field,
table_field: &FieldRef,
config: &Arc<ConfigOptions>,
) -> Result<(Arc<dyn PhysicalExpr>, FieldRef)> {
let DataType::Struct(declared_fields) = table_field.data_type() else {
return Err(Error::InvalidInput {
message: format!(
"blob v2 column '{}' must be a struct, table declares {}",
table_field.name(),
table_field.data_type()
),
});
};
let input_struct_children = match input_field.data_type() {
DataType::Binary | DataType::LargeBinary | DataType::BinaryView => None,
DataType::Struct(children) => {
if !children
.iter()
.any(|c| c.name() == "data" || c.name() == "uri")
{
return Err(Error::InvalidInput {
message: format!(
"blob struct input for column '{}' must contain a 'data' or 'uri' child",
table_field.name()
),
});
}
Some(children)
}
other => {
return Err(Error::InvalidInput {
message: format!(
"cannot coerce column '{}' with type {} into a blob v2 struct. \
expected Binary, LargeBinary, BinaryView, or a Struct with a 'data' or 'uri' child",
table_field.name(),
other,
),
});
}
};
let mut ns_args: Vec<Arc<dyn PhysicalExpr>> = Vec::with_capacity(declared_fields.len() * 2);
for declared in declared_fields.iter() {
ns_args.push(Arc::new(Literal::new(ScalarValue::from(
declared.name().as_str(),
))));
let value: Arc<dyn PhysicalExpr> = match input_struct_children {
// Raw binary lands in `data` and everything else is a typed null.
None => {
if declared.name() == "data" {
Arc::new(CastExpr::new(
input_expr.clone(),
declared.data_type().clone(),
None,
))
} else {
typed_null(declared.data_type())?
}
}
Some(children) => match children.iter().find(|c| c.name() == declared.name()) {
Some(child) => {
let field_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
&format!("get_field({})", declared.name()),
get_field(),
vec![
input_expr.clone(),
Arc::new(Literal::new(ScalarValue::from(declared.name().as_str()))),
],
Arc::new(child.as_ref().clone()),
config.clone(),
));
if child.data_type() == declared.data_type() {
field_expr
} else {
Arc::new(CastExpr::new(
field_expr,
declared.data_type().clone(),
None,
))
}
}
None => typed_null(declared.data_type())?,
},
};
ns_args.push(value);
}
let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
&format!("named_struct({})", table_field.name()),
named_struct(),
ns_args,
table_field.clone(),
config.clone(),
));
Ok((expr, table_field.clone()))
}
fn typed_null(data_type: &DataType) -> Result<Arc<dyn PhysicalExpr>> {
let scalar = ScalarValue::try_from(data_type).map_err(|e| Error::InvalidInput {
message: format!("cannot build null literal for blob child type {data_type}: {e}"),
})?;
Ok(Arc::new(Literal::new(scalar)))
}
#[cfg(test)]
mod tests {
use super::super::cast::cast_to_table_schema;
use super::*;
use crate::blob::blob;
use arrow_array::{
Array, ArrayRef, BinaryArray, BinaryViewArray, Int32Array, Int64Array, LargeBinaryArray,
RecordBatch, StringArray, StructArray, UInt8Array, UInt64Array,
};
use arrow_schema::Schema;
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use datafusion_physical_plan::ExecutionPlan;
use futures::TryStreamExt;
use lance_arrow::FieldExt;
use std::collections::HashMap;
fn wide_blob_field(name: &str) -> Field {
Field::new(
name,
DataType::Struct(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
Field::new("position", DataType::UInt64, true),
Field::new("size", DataType::UInt64, true),
]
.into(),
),
true,
)
.with_metadata(HashMap::from([(
"ARROW:extension:name".to_string(),
"lance.blob.v2".to_string(),
)]))
}
fn blob_table_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
])
}
fn batch_with_image(image_field: Field, image: ArrayRef) -> RecordBatch {
let len = image.len();
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
image_field,
])),
vec![Arc::new(Int64Array::from_iter_values(0..len as i64)), image],
)
.unwrap()
}
fn image_struct(batch: &RecordBatch) -> &StructArray {
batch
.column_by_name("image")
.unwrap()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
}
async fn plan_from_batch(batch: RecordBatch) -> Arc<dyn ExecutionPlan> {
let schema = batch.schema();
let table = MemTable::try_new(schema, vec![vec![batch]]).unwrap();
let ctx = SessionContext::new();
ctx.register_table("t", Arc::new(table)).unwrap();
let df = ctx.table("t").await.unwrap();
df.create_physical_plan().await.unwrap()
}
async fn coerce(batch: RecordBatch, table_schema: &Schema) -> RecordBatch {
let plan = plan_from_batch(batch).await;
let plan = cast_to_table_schema(plan, table_schema).unwrap();
let ctx = SessionContext::new();
let stream = plan.execute(0, ctx.task_ctx()).unwrap();
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
arrow_select::concat::concat_batches(&plan.schema(), &batches).unwrap()
}
async fn coerce_err(batch: RecordBatch, table_schema: &Schema) -> Error {
let plan = plan_from_batch(batch).await;
cast_to_table_schema(plan, table_schema).unwrap_err()
}
#[tokio::test]
async fn large_binary_coerces_to_declared_blob_struct() {
let batch = batch_with_image(
Field::new("image", DataType::LargeBinary, true),
Arc::new(LargeBinaryArray::from_iter_values([b"hello".as_slice()])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
let image_field = coerced.schema().field_with_name("image").unwrap().clone();
assert!(image_field.is_blob_v2());
assert!(matches!(image_field.data_type(), DataType::Struct(_)));
let data = image_struct(&coerced).column_by_name("data").unwrap();
let data: &LargeBinaryArray = data.as_any().downcast_ref().unwrap();
assert_eq!(data.value(0), b"hello");
}
#[tokio::test]
async fn binary_coerces_to_declared_blob_struct() {
let batch = batch_with_image(
Field::new("image", DataType::Binary, true),
Arc::new(BinaryArray::from_iter_values([b"hi".as_slice()])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
assert!(
coerced
.schema()
.field_with_name("image")
.unwrap()
.is_blob_v2()
);
}
#[tokio::test]
async fn binary_view_coerces_to_declared_blob_struct() {
let batch = batch_with_image(
Field::new("image", DataType::BinaryView, true),
Arc::new(BinaryViewArray::from_iter_values([b"view".as_slice()])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
let data = image_struct(&coerced).column_by_name("data").unwrap();
let data: &LargeBinaryArray = data.as_any().downcast_ref().unwrap();
assert_eq!(data.value(0), b"view");
}
#[tokio::test]
async fn binary_nulls_stay_null_after_coercion() {
let batch = batch_with_image(
Field::new("image", DataType::Binary, true),
Arc::new(BinaryArray::from_iter(vec![
Some(b"present".as_slice()),
None,
])),
);
let coerced = coerce(batch, &blob_table_schema()).await;
let image = image_struct(&coerced);
let data = image.column_by_name("data").unwrap();
assert!(!data.is_null(0));
assert!(data.is_null(1));
}
#[tokio::test]
async fn binary_coerces_into_four_child_blob_layout() {
let table_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
wide_blob_field("image"),
]);
let batch = batch_with_image(
Field::new("image", DataType::LargeBinary, true),
Arc::new(LargeBinaryArray::from_iter(vec![
Some(b"alpha".as_slice()),
None,
])),
);
let coerced = coerce(batch, &table_schema).await;
let image = image_struct(&coerced);
assert_eq!(
image.num_columns(),
4,
"coerced struct keeps the declared layout"
);
assert!(image.column_by_name("position").unwrap().is_null(0));
assert!(image.column_by_name("size").unwrap().is_null(0));
assert!(!image.column_by_name("data").unwrap().is_null(0));
assert!(image.column_by_name("data").unwrap().is_null(1));
}
#[tokio::test]
async fn prebuilt_struct_gains_blob_field_metadata() {
let DataType::Struct(children) = blob("image", true).data_type().clone() else {
unreachable!("blob field is a struct")
};
let prebuilt = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"prebuilt".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let batch = batch_with_image(
Field::new("image", prebuilt.data_type().clone(), true),
Arc::new(prebuilt),
);
let coerced = coerce(batch, &blob_table_schema()).await;
assert!(
coerced
.schema()
.field_with_name("image")
.unwrap()
.is_blob_v2()
);
}
#[tokio::test]
async fn prebuilt_narrow_struct_widens_to_declared_layout() {
let DataType::Struct(narrow_children) = blob("image", true).data_type().clone() else {
unreachable!("blob field is a struct")
};
let prebuilt = StructArray::new(
narrow_children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"prebuilt".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let table_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
wide_blob_field("image"),
]);
let batch = batch_with_image(
Field::new("image", prebuilt.data_type().clone(), true),
Arc::new(prebuilt),
);
let coerced = coerce(batch, &table_schema).await;
let image = image_struct(&coerced);
assert_eq!(image.num_columns(), 4);
assert!(image.column_by_name("position").unwrap().is_null(0));
assert!(image.column_by_name("size").unwrap().is_null(0));
}
#[tokio::test]
async fn external_reference_struct_preserves_uri_position_and_size() {
let prebuilt = StructArray::new(
vec![
Field::new("data", DataType::LargeBinary, true),
Field::new("uri", DataType::Utf8, true),
Field::new("position", DataType::UInt64, true),
Field::new("size", DataType::UInt64, true),
]
.into(),
vec![
Arc::new(LargeBinaryArray::from(vec![None::<&[u8]>])) as ArrayRef,
Arc::new(StringArray::from(vec![Some("s3://bucket/blob.bin")])) as ArrayRef,
Arc::new(UInt64Array::from(vec![Some(7)])) as ArrayRef,
Arc::new(UInt64Array::from(vec![Some(6)])) as ArrayRef,
],
None,
);
let table_schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
wide_blob_field("image"),
]);
let batch = batch_with_image(
Field::new("image", prebuilt.data_type().clone(), true),
Arc::new(prebuilt),
);
let coerced = coerce(batch, &table_schema).await;
let image = image_struct(&coerced);
let uri: &StringArray = image
.column_by_name("uri")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(uri.value(0), "s3://bucket/blob.bin");
let position: &UInt64Array = image
.column_by_name("position")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(position.value(0), 7);
let size: &UInt64Array = image
.column_by_name("size")
.unwrap()
.as_any()
.downcast_ref()
.unwrap();
assert_eq!(size.value(0), 6);
assert!(image.column_by_name("data").unwrap().is_null(0));
}
#[tokio::test]
async fn descriptor_struct_without_value_child_is_rejected() {
let descriptor = StructArray::new(
vec![
Field::new("kind", DataType::UInt8, false),
Field::new("position", DataType::UInt64, false),
Field::new("size", DataType::UInt64, false),
]
.into(),
vec![
Arc::new(UInt8Array::from(vec![0])),
Arc::new(UInt64Array::from(vec![0])),
Arc::new(UInt64Array::from(vec![0])),
],
None,
);
let batch = batch_with_image(
Field::new("image", descriptor.data_type().clone(), true),
Arc::new(descriptor),
);
let err = coerce_err(batch, &blob_table_schema()).await;
assert!(err.to_string().contains("'data' or 'uri'"));
assert!(err.to_string().contains("image"));
}
#[tokio::test]
async fn unsupported_input_type_is_rejected_with_column_name() {
let batch = batch_with_image(
Field::new("image", DataType::Utf8, true),
Arc::new(StringArray::from(vec!["not bytes"])),
);
let err = coerce_err(batch, &blob_table_schema()).await;
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
assert!(err.to_string().contains("image"));
}
#[tokio::test]
async fn blob_metadata_survives_cast_of_sibling_column() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("image", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(LargeBinaryArray::from_iter_values([b"x".as_slice()])),
],
)
.unwrap();
let coerced = coerce(batch, &blob_table_schema()).await;
let image_field = coerced.schema().field_with_name("image").unwrap().clone();
assert!(
image_field.is_blob_v2(),
"expected blob marker on image field, got {:?}",
image_field.metadata()
);
assert_eq!(
coerced.schema().field_with_name("id").unwrap().data_type(),
&DataType::Int64
);
}
#[tokio::test]
async fn exact_blob_input_passes_through_unchanged() {
let DataType::Struct(children) = blob("image", true).data_type().clone() else {
unreachable!("blob field is a struct")
};
let image = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"exact".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let batch = batch_with_image(blob("image", true), Arc::new(image));
let table_schema = blob_table_schema();
let input = plan_from_batch(batch).await;
let input_ptr = Arc::as_ptr(&input);
let plan = cast_to_table_schema(input, &table_schema).unwrap();
assert_eq!(Arc::as_ptr(&plan), input_ptr, "no projection inserted");
}
}

View File

@@ -13,10 +13,8 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
use lance_arrow::FieldExt;
use lance_arrow::json::{is_arrow_json_field, is_json_field};
use super::blob_coerce::coerce_blob_expr;
use crate::{Error, Result};
pub fn cast_to_table_schema(
@@ -79,17 +77,6 @@ fn build_field_exprs(
continue;
}
// Blob columns accept raw binary on write; exact matches pass through below.
if table_field.is_blob_v2() && input_field.as_ref() != table_field.as_ref() {
result.push(coerce_blob_expr(
input_expr,
input_field,
table_field,
&config,
)?);
continue;
}
let expr = match (input_field.data_type(), table_field.data_type()) {
// Both are structs: recurse into sub-fields to handle subschemas and casts.
(DataType::Struct(in_children), DataType::Struct(tbl_children))

View File

@@ -579,45 +579,24 @@ fn array_to_f32_vec(arr: &Arc<dyn arrow_array::Array>) -> Result<Vec<f32>> {
})
}
/// Magic bytes that prefix (and suffix) the Arrow IPC *file* format.
const ARROW_IPC_FILE_MAGIC: &[u8] = b"ARROW1";
/// Parse Arrow IPC response from the namespace server.
///
/// The server may return either the Arrow IPC *file* format or the *stream*
/// format. REST/phalanx returns the file format (it begins with the `ARROW1`
/// magic); reading that with a `StreamReader` fails with "failed to fill whole
/// buffer". Detect the magic and pick the matching reader so both are handled.
async fn parse_arrow_ipc_response(bytes: bytes::Bytes) -> Result<DatasetRecordBatchStream> {
use arrow_ipc::reader::{FileReader, StreamReader};
use arrow_ipc::reader::StreamReader;
use std::io::Cursor;
let (schema, batches) = if bytes.starts_with(ARROW_IPC_FILE_MAGIC) {
let reader = FileReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime {
message: format!("Failed to parse Arrow IPC file response: {}", e),
let cursor = Cursor::new(bytes);
let reader = StreamReader::try_new(cursor, None).map_err(|e| Error::Runtime {
message: format!("Failed to parse Arrow IPC response: {}", e),
})?;
// Collect all record batches
let schema = reader.schema();
let batches: Vec<_> = reader
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::Runtime {
message: format!("Failed to read Arrow IPC batches: {}", e),
})?;
let schema = reader.schema();
let batches = reader
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::Runtime {
message: format!("Failed to read Arrow IPC file batches: {}", e),
})?;
(schema, batches)
} else {
let reader =
StreamReader::try_new(Cursor::new(bytes), None).map_err(|e| Error::Runtime {
message: format!("Failed to parse Arrow IPC response: {}", e),
})?;
let schema = reader.schema();
let batches = reader
.into_iter()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| Error::Runtime {
message: format!("Failed to read Arrow IPC batches: {}", e),
})?;
(schema, batches)
};
// Create a stream from the batches
let stream = futures::stream::iter(batches.into_iter().map(Ok));
@@ -645,59 +624,6 @@ mod tests {
FixedSizeListArray::try_new_from_values(Float32Array::from(values), dimension).unwrap()
}
#[tokio::test]
async fn test_parse_arrow_ipc_response_handles_file_and_stream() {
use arrow_array::{Int32Array, RecordBatch};
use arrow_ipc::writer::{FileWriter, StreamWriter};
use arrow_schema::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)
.unwrap();
// Arrow IPC *file* format -- what REST/phalanx returns. Previously this
// failed with "failed to fill whole buffer" because we used a StreamReader.
let mut file_buf = Vec::new();
{
let mut writer = FileWriter::try_new(&mut file_buf, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
assert!(file_buf.starts_with(ARROW_IPC_FILE_MAGIC));
let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(file_buf))
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 3);
// Arrow IPC *stream* format must still parse.
let mut stream_buf = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut stream_buf, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
assert!(!stream_buf.starts_with(ARROW_IPC_FILE_MAGIC));
let rows: usize = parse_arrow_ipc_response(bytes::Bytes::from(stream_buf))
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 3);
}
#[test]
fn test_convert_to_namespace_query_vector() {
let query_vector = Arc::new(Float32Array::from(vec![1.0, 2.0, 3.0, 4.0]));

View File

@@ -1,949 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use arrow_array::{
Array, ArrayRef, BinaryArray, Int64Array, LargeBinaryArray, RecordBatch, StringArray,
StructArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Fields, Schema};
use futures::TryStreamExt;
use lance_encoding::version::LanceFileVersion;
use lancedb::{
Connection, Error, Result, Table,
blob::blob,
connect, connect_namespace,
database::listing::OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS,
query::{ExecutableQuery, QueryBase},
table::{AddDataMode, CompactionOptions, OptimizeAction},
};
use tempfile::tempdir;
fn blob_table_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
]))
}
fn binary_input_batch(ids: &[i64], payloads: &[Option<&[u8]>]) -> RecordBatch {
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int64Array::from(ids.to_vec())),
Arc::new(LargeBinaryArray::from_iter(payloads.iter().copied())),
],
)
.unwrap()
}
async fn create_inline_blob_table(
db: &Connection,
name: &str,
ids: &[i64],
payloads: &[Option<&[u8]>],
) -> Result<Table> {
let table = db
.create_empty_table(name, blob_table_schema())
.execute()
.await?;
table
.add(binary_input_batch(ids, payloads))
.execute()
.await?;
Ok(table)
}
async fn storage_format_version(table: &Table) -> LanceFileVersion {
table
.as_native()
.unwrap()
.manifest()
.await
.unwrap()
.data_storage_format
.lance_file_version()
.unwrap()
.resolve()
}
async fn uses_stable_row_ids(table: &Table) -> bool {
table
.as_native()
.unwrap()
.manifest()
.await
.unwrap()
.uses_stable_row_ids()
}
async fn query_image_struct(table: &Table) -> StructArray {
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
batch
.column_by_name("image")
.expect("image column present")
.as_any()
.downcast_ref::<StructArray>()
.expect("image column is a descriptor struct")
.clone()
}
#[tokio::test]
async fn declaring_blob_column_bumps_format_and_enables_stable_row_ids() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn explicit_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.storage_option(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, "false")
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn non_blob_table_keeps_default_format_and_row_id_setting() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let table = db.create_empty_table("t", schema).execute().await?;
assert!(storage_format_version(&table).await < LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn creating_with_blob_data_bumps_format() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("image", true);
let DataType::Struct(children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let image = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"payload".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob_field,
])),
vec![Arc::new(Int64Array::from(vec![1])), Arc::new(image)],
)
.unwrap();
let table = db.create_table("t", batch).execute().await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
assert_eq!(table.count_rows(None).await?, 1);
Ok(())
}
#[tokio::test]
async fn add_coerces_large_binary_into_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table =
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"cat".as_slice()), Some(b"dog")])
.await?;
assert_eq!(table.count_rows(None).await?, 2);
let image = query_image_struct(&table).await;
assert_eq!(image.len(), 2);
let schema = table.schema().await?;
let field = schema.field_with_name("image").unwrap();
assert_eq!(
field
.metadata()
.get("ARROW:extension:name")
.map(String::as_str),
Some("lance.blob.v2")
);
Ok(())
}
#[tokio::test]
async fn add_coerces_binary_into_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::Binary, true),
])),
vec![
Arc::new(Int64Array::from(vec![1])),
Arc::new(BinaryArray::from_iter_values([b"small".as_slice()])),
],
)
.unwrap();
table.add(batch).execute().await?;
assert_eq!(table.count_rows(None).await?, 1);
Ok(())
}
#[tokio::test]
async fn add_accepts_null_blob_rows() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"first".as_slice()), None, Some(b"third")],
)
.await?;
assert_eq!(table.count_rows(None).await?, 3);
let image = query_image_struct(&table).await;
assert_eq!(image.len(), 3);
Ok(())
}
#[tokio::test]
async fn add_rejects_uncoercible_blob_input() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::Utf8, true),
])),
vec![
Arc::new(Int64Array::from(vec![1])),
Arc::new(StringArray::from(vec!["not bytes"])),
],
)
.unwrap();
let err = table.add(batch).execute().await.unwrap_err();
assert!(err.to_string().contains("image"));
Ok(())
}
#[tokio::test]
async fn connection_level_stable_row_id_setting_wins_over_blob_default() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap())
.storage_option(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, "false")
.execute()
.await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(!uses_stable_row_ids(&table).await);
Ok(())
}
#[tokio::test]
async fn namespace_create_applies_blob_defaults() -> Result<()> {
let tmp = tempdir().unwrap();
let mut properties = std::collections::HashMap::new();
properties.insert("root".to_string(), tmp.path().to_str().unwrap().to_string());
let db = connect_namespace("dir", properties).execute().await?;
let table = db
.create_empty_table("t", blob_table_schema())
.execute()
.await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
Ok(())
}
// Overwrite takes the input schema as-is. A raw-binary overwrite drops the blob
// marker; re-declaring blob v2 in the input restores it.
#[tokio::test]
async fn overwrite_replaces_blob_schema_with_input_schema() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"blob".as_slice())]).await?;
let raw_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
]));
let raw_batch = RecordBatch::try_new(
raw_schema.clone(),
vec![
Arc::new(Int64Array::from(vec![2])),
Arc::new(LargeBinaryArray::from_iter_values([b"plain".as_slice()])),
],
)
.unwrap();
table
.add(raw_batch)
.mode(AddDataMode::Overwrite)
.execute()
.await?;
let schema = table.schema().await?;
assert_eq!(schema, raw_schema);
assert!(
!schema
.field_with_name("image")
.unwrap()
.metadata()
.contains_key("ARROW:extension:name")
);
let blob_field = blob("image", true);
let DataType::Struct(children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let image = StructArray::new(
children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([b"declared".as_slice()])),
Arc::new(StringArray::from(vec![None::<&str>])),
],
None,
);
let declared_batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob_field,
])),
vec![Arc::new(Int64Array::from(vec![3])), Arc::new(image)],
)
.unwrap();
table
.add(declared_batch)
.mode(AddDataMode::Overwrite)
.execute()
.await?;
let schema = table.schema().await?;
assert_eq!(
schema
.field_with_name("image")
.unwrap()
.metadata()
.get("ARROW:extension:name")
.map(String::as_str),
Some("lance.blob.v2")
);
Ok(())
}
async fn collect_row_ids(table: &Table) -> Result<Vec<u64>> {
let batches = table
.query()
.with_row_id()
.execute()
.await?
.try_collect::<Vec<_>>()
.await?;
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
Ok(batch
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values()
.to_vec())
}
async fn collect_id_rowid(table: &Table) -> Result<Vec<(i64, u64)>> {
let batches = table
.query()
.with_row_id()
.execute()
.await?
.try_collect::<Vec<_>>()
.await?;
let batch = arrow_select::concat::concat_batches(&batches[0].schema(), &batches).unwrap();
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let row_ids = batch
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
Ok(ids
.values()
.iter()
.copied()
.zip(row_ids.values().iter().copied())
.collect())
}
#[tokio::test]
async fn fetch_blobs_round_trips_bytes() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let payload: &[u8] = b"blob-round-trip-payload";
let table = create_inline_blob_table(&db, "t", &[1], &[Some(payload)]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 1);
assert_eq!(bytes.value(0), payload);
Ok(())
}
#[tokio::test]
async fn fetch_blobs_round_trips_nested_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("blob", true);
let DataType::Struct(blob_children) = blob_field.data_type().clone() else {
unreachable!("blob field is a struct")
};
let blob_array = StructArray::new(
blob_children,
vec![
Arc::new(LargeBinaryArray::from_iter_values([
b"hello".as_slice(),
b"world".as_slice(),
])) as ArrayRef,
Arc::new(StringArray::from(vec![None::<&str>, None::<&str>])) as ArrayRef,
],
None,
);
let info_fields: Fields = vec![Field::new("name", DataType::Utf8, false), blob_field].into();
let info_array = StructArray::new(
info_fields.clone(),
vec![
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
Arc::new(blob_array) as ArrayRef,
],
None,
);
let schema = Arc::new(Schema::new(vec![Field::new(
"info",
DataType::Struct(info_fields),
true,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(info_array) as ArrayRef]).unwrap();
let table = db.create_table("t", batch).execute().await?;
assert!(storage_format_version(&table).await >= LanceFileVersion::V2_2);
assert!(uses_stable_row_ids(&table).await);
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("info.blob", &ids).await?;
assert_eq!(bytes.len(), 2);
let values: std::collections::HashSet<&[u8]> =
(0..bytes.len()).map(|i| bytes.value(i)).collect();
assert!(values.contains(b"hello".as_slice()));
assert!(values.contains(b"world".as_slice()));
Ok(())
}
#[tokio::test]
async fn blob_columns_lists_nested_dotted_paths() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let blob_field = blob("blob", true);
let info = Field::new(
"info",
DataType::Struct(vec![Field::new("name", DataType::Utf8, false), blob_field].into()),
true,
);
let schema = Arc::new(Schema::new(vec![
blob("thumbnail", true),
Field::new("id", DataType::Int64, false),
info,
]));
let table = db.create_empty_table("t", schema).execute().await?;
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "info.blob"]);
Ok(())
}
#[tokio::test]
async fn blob_columns_lists_blob_fields_in_order() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![
blob("thumbnail", true),
Field::new("id", DataType::Int64, false),
blob("image", true),
]));
let table = db.create_empty_table("t", schema).execute().await?;
assert_eq!(table.blob_columns().await?, vec!["thumbnail", "image"]);
let plain = db
.create_empty_table(
"plain",
Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)])),
)
.execute()
.await?;
assert!(plain.blob_columns().await?.is_empty());
Ok(())
}
#[tokio::test]
async fn fetch_blobs_preserves_null_alignment() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3, 4],
&[Some(b"a".as_slice()), None, Some(b"c"), None],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), ids.len());
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"a"),
2 | 4 => assert!(bytes.is_null(i)),
3 => assert_eq!(bytes.value(i), b"c"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_all_null_column_returns_all_nulls() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1, 2], &[None, None]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 2);
assert_eq!(bytes.null_count(), 2);
let files = table.fetch_blob_files("image", &ids).await?;
assert_eq!(files.len(), 2);
assert!(files.iter().all(Option::is_none));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_aligns_with_reordered_and_duplicate_ids() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let by_id = |want: i64| pairs.iter().find(|(id, _)| *id == want).unwrap().1;
let request = vec![by_id(3), by_id(1), by_id(3), by_id(2)];
let bytes = table.fetch_blobs("image", &request).await?;
assert_eq!(bytes.len(), 4);
assert_eq!(bytes.value(0), b"three");
assert_eq!(bytes.value(1), b"one");
assert_eq!(bytes.value(2), b"three");
assert_eq!(bytes.value(3), b"two");
Ok(())
}
#[tokio::test]
async fn fetch_blobs_empty_ids_returns_empty() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
assert_eq!(table.fetch_blobs("image", &[]).await?.len(), 0);
assert!(table.fetch_blob_files("image", &[]).await?.is_empty());
Ok(())
}
#[tokio::test]
async fn fetch_blobs_out_of_range_id_errors_without_panic() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("image", &[u64::MAX]).await.unwrap_err();
assert!(err.to_string().contains("row ids"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_non_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("id", &[0]).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }));
assert!(err.to_string().contains("'id' is not a blob column"));
let err = table.fetch_blob_files("id", &[0]).await.unwrap_err();
assert!(err.to_string().contains("'id' is not a blob column"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_unknown_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"x".as_slice())]).await?;
let err = table.fetch_blobs("missing", &[0]).await.unwrap_err();
assert!(err.to_string().contains("no column named 'missing'"));
Ok(())
}
#[tokio::test]
async fn fetch_blobs_rejects_legacy_v1_blob_column() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let legacy = Field::new("image", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([("lance-encoding:blob".to_string(), "true".to_string())]),
);
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
legacy,
]));
let table = db.create_empty_table("t", schema).execute().await?;
let err = table.fetch_blobs("image", &[0]).await.unwrap_err();
assert!(err.to_string().contains("legacy blob column"));
Ok(())
}
#[tokio::test]
async fn fetch_blob_files_reads_lazily_and_aligns_nulls() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table =
create_inline_blob_table(&db, "t", &[1, 2], &[Some(b"lazy-bytes".as_slice()), None])
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let files = table.fetch_blob_files("image", &ids).await?;
assert_eq!(files.len(), 2);
for ((id, _), file) in pairs.iter().zip(&files) {
match id {
1 => {
let handle = file.as_ref().unwrap();
assert_eq!(handle.read().await.unwrap().as_ref(), b"lazy-bytes");
}
2 => assert!(file.is_none()),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_reads_multiple_blob_columns_independently() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
blob("image", true),
blob("thumbnail", true),
]));
let table = db.create_empty_table("t", schema).execute().await?;
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("image", DataType::LargeBinary, true),
Field::new("thumbnail", DataType::LargeBinary, true),
])),
vec![
Arc::new(Int64Array::from(vec![1, 2])),
Arc::new(LargeBinaryArray::from_iter(vec![
Some(b"image-1".as_slice()),
None,
])),
Arc::new(LargeBinaryArray::from_iter(vec![
None,
Some(b"thumb-2".as_slice()),
])),
],
)
.unwrap();
table.add(batch).execute().await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let images = table.fetch_blobs("image", &ids).await?;
let thumbs = table.fetch_blobs("thumbnail", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => {
assert_eq!(images.value(i), b"image-1");
assert!(thumbs.is_null(i));
}
2 => {
assert!(images.is_null(i));
assert_eq!(thumbs.value(i), b"thumb-2");
}
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_spans_fragments() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
table
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
.execute()
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"frag-one"),
2 => assert_eq!(bytes.value(i), b"frag-two"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_packed_payload_round_trip() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let big = vec![0xAB_u8; 100 * 1024];
let small = b"small".to_vec();
let table = create_inline_blob_table(
&db,
"t",
&[1, 2],
&[Some(big.as_slice()), Some(small.as_slice())],
)
.await?;
let pairs = collect_id_rowid(&table).await?;
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), big.as_slice()),
2 => assert_eq!(bytes.value(i), small.as_slice()),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_after_delete() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(
&db,
"t",
&[1, 2, 3],
&[Some(b"one".as_slice()), Some(b"two"), Some(b"three")],
)
.await?;
table.delete("id = 2").await?;
let pairs = collect_id_rowid(&table).await?;
assert_eq!(pairs.len(), 2);
let ids: Vec<u64> = pairs.iter().map(|(_, rowid)| *rowid).collect();
let bytes = table.fetch_blobs("image", &ids).await?;
for (i, (id, _)) in pairs.iter().enumerate() {
match id {
1 => assert_eq!(bytes.value(i), b"one"),
3 => assert_eq!(bytes.value(i), b"three"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blobs_with_precompaction_row_ids_survives_compaction() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"frag-one".as_slice())]).await?;
table
.add(binary_input_batch(&[2], &[Some(b"frag-two".as_slice())]))
.execute()
.await?;
let pairs_before = collect_id_rowid(&table).await?;
let ids_before: Vec<u64> = pairs_before.iter().map(|(_, rowid)| *rowid).collect();
table
.optimize(OptimizeAction::Compact {
options: CompactionOptions::default(),
remap_options: None,
})
.await?;
let bytes_after = table.fetch_blobs("image", &ids_before).await?;
assert_eq!(bytes_after.len(), 2);
for (i, (id, _)) in pairs_before.iter().enumerate() {
match id {
1 => assert_eq!(bytes_after.value(i), b"frag-one"),
2 => assert_eq!(bytes_after.value(i), b"frag-two"),
_ => unreachable!(),
}
}
Ok(())
}
#[tokio::test]
async fn zero_length_blob_reads_back_as_null() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = create_inline_blob_table(&db, "t", &[1], &[Some(b"".as_slice())]).await?;
let ids = collect_row_ids(&table).await?;
let bytes = table.fetch_blobs("image", &ids).await?;
assert_eq!(bytes.len(), 1);
assert!(bytes.is_null(0));
Ok(())
}
const DEDICATED_BLOB_LEN: usize = 64 * 1024;
const SCRAMBLED_LOGICAL_IDS: [i64; 7] = [6, 3, 1, 4, 6, 2, 5];
fn dedicated_blob_bytes(tag: u8) -> Vec<u8> {
vec![tag; DEDICATED_BLOB_LEN]
}
async fn multi_fragment_dedicated_blob_table(db: &Connection) -> Result<Table> {
let rows: [(i64, Option<u8>); 6] = [
(1, Some(1)),
(2, Some(2)),
(3, None),
(4, Some(4)),
(5, None),
(6, Some(6)),
];
let mut table: Option<Table> = None;
for (logical_id, blob_tag) in rows {
let bytes = blob_tag.map(dedicated_blob_bytes);
let image = [bytes.as_deref()];
table = Some(match table {
None => create_inline_blob_table(db, "t", &[logical_id], &image).await?,
Some(t) => {
t.add(binary_input_batch(&[logical_id], &image))
.execute()
.await?;
t
}
});
}
Ok(table.unwrap())
}
async fn row_ids_for_logical(table: &Table, logical_ids: &[i64]) -> Result<Vec<u64>> {
let id_rowid = collect_id_rowid(table).await?;
Ok(logical_ids
.iter()
.map(|logical_id| {
id_rowid
.iter()
.find(|(id, _)| id == logical_id)
.map(|(_, row_id)| *row_id)
.unwrap()
})
.collect())
}
#[tokio::test]
async fn fetch_blobs_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = multi_fragment_dedicated_blob_table(&db).await?;
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
let bytes = table.fetch_blobs("image", &row_ids).await?;
assert_eq!(bytes.len(), SCRAMBLED_LOGICAL_IDS.len());
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
match logical_id {
3 | 5 => assert!(bytes.is_null(slot)),
id => assert_eq!(
bytes.value(slot),
dedicated_blob_bytes(*id as u8).as_slice()
),
}
}
Ok(())
}
#[tokio::test]
async fn fetch_blob_files_aligns_across_fragments_with_nulls_and_dups() -> Result<()> {
let tmp = tempdir().unwrap();
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
let table = multi_fragment_dedicated_blob_table(&db).await?;
let row_ids = row_ids_for_logical(&table, &SCRAMBLED_LOGICAL_IDS).await?;
let files = table.fetch_blob_files("image", &row_ids).await?;
assert_eq!(files.len(), SCRAMBLED_LOGICAL_IDS.len());
for (slot, logical_id) in SCRAMBLED_LOGICAL_IDS.iter().enumerate() {
match logical_id {
3 | 5 => assert!(files[slot].is_none()),
id => {
let payload = files[slot].as_ref().unwrap().read().await?;
assert_eq!(payload.as_ref(), dedicated_blob_bytes(*id as u8).as_slice());
}
}
}
Ok(())
}