mirror of
https://github.com/lancedb/lancedb.git
synced 2026-03-26 02:20:40 +00:00
Compare commits
8 Commits
python-v0.
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2478993501 | ||
|
|
a0228036ae | ||
|
|
d8fc071a7d | ||
|
|
e6fd8d071e | ||
|
|
670dcca551 | ||
|
|
ed7e01a58b | ||
|
|
3450ccaf7f | ||
|
|
9b229f1e7c |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.27.0"
|
||||
current_version = "0.27.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
16
.github/workflows/rust.yml
vendored
16
.github/workflows/rust.yml
vendored
@@ -207,14 +207,14 @@ jobs:
|
||||
- name: Downgrade dependencies
|
||||
# These packages have newer requirements for MSRV
|
||||
run: |
|
||||
cargo update -p aws-sdk-bedrockruntime --precise 1.64.0
|
||||
cargo update -p aws-sdk-dynamodb --precise 1.55.0
|
||||
cargo update -p aws-config --precise 1.5.10
|
||||
cargo update -p aws-sdk-kms --precise 1.51.0
|
||||
cargo update -p aws-sdk-s3 --precise 1.65.0
|
||||
cargo update -p aws-sdk-sso --precise 1.50.0
|
||||
cargo update -p aws-sdk-ssooidc --precise 1.51.0
|
||||
cargo update -p aws-sdk-sts --precise 1.51.0
|
||||
cargo update -p aws-sdk-bedrockruntime --precise 1.77.0
|
||||
cargo update -p aws-sdk-dynamodb --precise 1.68.0
|
||||
cargo update -p aws-config --precise 1.6.0
|
||||
cargo update -p aws-sdk-kms --precise 1.63.0
|
||||
cargo update -p aws-sdk-s3 --precise 1.79.0
|
||||
cargo update -p aws-sdk-sso --precise 1.62.0
|
||||
cargo update -p aws-sdk-ssooidc --precise 1.63.0
|
||||
cargo update -p aws-sdk-sts --precise 1.63.0
|
||||
cargo update -p home --precise 0.5.9
|
||||
- name: cargo +${{ matrix.msrv }} check
|
||||
env:
|
||||
|
||||
92
Cargo.lock
generated
92
Cargo.lock
generated
@@ -3070,9 +3070,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a32ddfc5478379cd1782bdd9d7d1411063f563e5b338fc73bafe5916451a5b9d"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.2",
|
||||
@@ -4242,9 +4241,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95c5ce428fda0721f5c48bfde17a1921c4da2d2142b2f46a16c89abf5fce8003"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4310,9 +4308,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9fdaf99863fa0d631e422881e88be4837d8b82f36a87143d723a9d285acec4b"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4332,9 +4329,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "866b1634d38d94e8ab86fbcf238ac82dc8a5f72a4a6a90525f29899772e7cc7f"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4343,9 +4339,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "977c29f4e48c201c2806fe6ae117b65d0287eda236acd07357b556a54b0d5c5a"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4382,9 +4377,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ccc72695473f4207df4c6df3b347a63e84c32c0bc36bf42a7d86e8a7c0c67e2"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4414,9 +4408,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8fe84d76944acd834ded14d7562663af995556e0c6594f4b4ac69b0183f99c1a"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4434,9 +4427,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be1007242188e5d53c98717e7f2cb340dc80eb9c94c2b935587598919b3a36bd"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4473,9 +4465,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f80088e418941f39cf5599d166ae1a6ef498cc2d967652a0692477d4871a9277"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4507,9 +4498,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0011daf1ddde99becffd2ae235ad324576736a526c54ffbc4d7e583872f1215"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4523,6 +4513,7 @@ dependencies = [
|
||||
"bitpacking",
|
||||
"bitvec",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crossbeam-queue",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
@@ -4572,9 +4563,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cfa8a74e93753d19a27ce3adaeb99e31227df13ad5926dd43572be76b43dd284"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4615,9 +4605,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e2d8da8f6b8dd37ab3b8199896ee265817f86232e3727c0b0eeb3c9093b64d9"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4633,23 +4622,22 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f176e427d9c35938d8a7097876114bc35dfd280b06077779753f2effe3e86aab"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"lance-core",
|
||||
"lance-namespace-reqwest-client",
|
||||
"serde",
|
||||
"snafu 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "663c32086ecfab311acb0813c65a4bb352a5b648ccf8b513c24697ce8d412039"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -4680,9 +4668,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-reqwest-client"
|
||||
version = "0.5.2"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ad4c947349acd6e37e984eba0254588bd894e6128434338b9e6904e56fb4633"
|
||||
checksum = "ee2e48de899e2931afb67fcddd0a08e439bf5d8b6ea2a2ed9cb8f4df669bd5cc"
|
||||
dependencies = [
|
||||
"reqwest",
|
||||
"serde",
|
||||
@@ -4693,9 +4681,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aa189b3081481a97b64cf1161297947a63b8adb941b1950989d0269858703a43"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4734,9 +4721,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "3.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "79a6f4ab0788ee82893bac5de4ff0d0d88bba96de87db4b6e18b1883616d4dbe"
|
||||
version = "4.1.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.3#244c721504c6ef0b4c2f9700a342509976898d6e"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -4747,7 +4733,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.27.0"
|
||||
version = "0.27.1"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
@@ -4829,7 +4815,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.27.0"
|
||||
version = "0.27.1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-ipc",
|
||||
@@ -4849,7 +4835,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.30.0"
|
||||
version = "0.30.1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -15,20 +15,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { version = "=3.0.1", default-features = false }
|
||||
lance-core = { version = "=3.0.1" }
|
||||
lance-datagen = { version = "=3.0.1" }
|
||||
lance-file = { version = "=3.0.1" }
|
||||
lance-io = { version = "=3.0.1", default-features = false }
|
||||
lance-index = { version = "=3.0.1" }
|
||||
lance-linalg = { version = "=3.0.1" }
|
||||
lance-namespace = { version = "=3.0.1" }
|
||||
lance-namespace-impls = { version = "=3.0.1", default-features = false }
|
||||
lance-table = { version = "=3.0.1" }
|
||||
lance-testing = { version = "=3.0.1" }
|
||||
lance-datafusion = { version = "=3.0.1" }
|
||||
lance-encoding = { version = "=3.0.1" }
|
||||
lance-arrow = { version = "=3.0.1" }
|
||||
lance = { "version" = "=4.1.0-beta.3", default-features = false, "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=4.1.0-beta.3", default-features = false, "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=4.1.0-beta.3", default-features = false, "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=4.1.0-beta.3", "tag" = "v4.1.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "57.2", optional = false }
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
mkdocs==1.5.3
|
||||
mkdocs==1.6.1
|
||||
mkdocs-jupyter==0.24.1
|
||||
mkdocs-material==9.5.3
|
||||
mkdocs-material==9.6.23
|
||||
mkdocs-autorefs>=0.5,<=1.0
|
||||
mkdocstrings[python]==0.25.2
|
||||
mkdocstrings[python]>=0.24,<1.0
|
||||
griffe>=0.40,<1.0
|
||||
mkdocs-render-swagger-plugin>=0.1.0
|
||||
pydantic>=2.0,<3.0
|
||||
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.27.0</version>
|
||||
<version>0.27.1</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.27.0-final.0</version>
|
||||
<version>0.27.1-final.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.27.0-final.0</version>
|
||||
<version>0.27.1-final.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>3.1.0-beta.2</lance-core.version>
|
||||
<lance-core.version>4.1.0-beta.3</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.27.0"
|
||||
version = "0.27.1"
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.27.0",
|
||||
"version": "0.27.1",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -4751,7 +4751,16 @@ class IndexStatistics:
|
||||
num_indexed_rows: int
|
||||
num_unindexed_rows: int
|
||||
index_type: Literal[
|
||||
"IVF_PQ", "IVF_HNSW_PQ", "IVF_HNSW_SQ", "FTS", "BTREE", "BITMAP", "LABEL_LIST"
|
||||
"IVF_FLAT",
|
||||
"IVF_SQ",
|
||||
"IVF_PQ",
|
||||
"IVF_RQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"FTS",
|
||||
"BTREE",
|
||||
"BITMAP",
|
||||
"LABEL_LIST",
|
||||
]
|
||||
distance_type: Optional[Literal["l2", "cosine", "dot"]] = None
|
||||
num_indices: Optional[int] = None
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
from datetime import timedelta
|
||||
import random
|
||||
from typing import get_args, get_type_hints
|
||||
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
@@ -22,6 +23,7 @@ from lancedb.index import (
|
||||
HnswSq,
|
||||
FTS,
|
||||
)
|
||||
from lancedb.table import IndexStatistics
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
@@ -283,3 +285,23 @@ async def test_create_index_with_binary_vectors(binary_table: AsyncTable):
|
||||
for v in range(256):
|
||||
res = await binary_table.query().nearest_to([v] * 128).to_arrow()
|
||||
assert res["id"][0].as_py() == v
|
||||
|
||||
|
||||
def test_index_statistics_index_type_lists_all_supported_values():
|
||||
expected_index_types = {
|
||||
"IVF_FLAT",
|
||||
"IVF_SQ",
|
||||
"IVF_PQ",
|
||||
"IVF_RQ",
|
||||
"IVF_HNSW_SQ",
|
||||
"IVF_HNSW_PQ",
|
||||
"FTS",
|
||||
"BTREE",
|
||||
"BITMAP",
|
||||
"LABEL_LIST",
|
||||
}
|
||||
|
||||
assert (
|
||||
set(get_args(get_type_hints(IndexStatistics)["index_type"]))
|
||||
== expected_index_types
|
||||
)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.27.0"
|
||||
version = "0.27.1"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
|
||||
@@ -72,6 +72,10 @@ impl ServerVersion {
|
||||
pub fn support_structural_fts(&self) -> bool {
|
||||
self.0 >= semver::Version::new(0, 3, 0)
|
||||
}
|
||||
|
||||
pub fn support_multipart_write(&self) -> bool {
|
||||
self.0 >= semver::Version::new(0, 4, 0)
|
||||
}
|
||||
}
|
||||
|
||||
pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
|
||||
|
||||
@@ -10,6 +10,7 @@ use super::ARROW_STREAM_CONTENT_TYPE;
|
||||
use super::client::RequestResultExt;
|
||||
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
|
||||
use super::db::ServerVersion;
|
||||
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
|
||||
use crate::index::Index;
|
||||
use crate::index::IndexStatistics;
|
||||
use crate::index::waiter::wait_for_index;
|
||||
@@ -23,7 +24,7 @@ use crate::table::MergeResult;
|
||||
use crate::table::Tags;
|
||||
use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AnyQuery, Filter, TableStatistics};
|
||||
use crate::table::{AnyQuery, Filter, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
|
||||
use crate::{DistanceType, Error};
|
||||
@@ -43,7 +44,7 @@ use async_trait::async_trait;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
|
||||
use datafusion_physical_plan::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
|
||||
use futures::TryStreamExt;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http::{HeaderName, StatusCode};
|
||||
use lance::arrow::json::{JsonDataType, JsonSchema};
|
||||
@@ -614,6 +615,66 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
Ok(bodies)
|
||||
}
|
||||
|
||||
async fn create_multipart_write(&self) -> Result<String> {
|
||||
let request = self.client.post(&format!(
|
||||
"/v1/table/{}/multipart_write/create",
|
||||
self.identifier
|
||||
));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
let parsed: serde_json::Value = serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse multipart create response: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
parsed["upload_id"]
|
||||
.as_str()
|
||||
.map(|s| s.to_string())
|
||||
.ok_or_else(|| Error::Http {
|
||||
source: "Missing upload_id in multipart create response".into(),
|
||||
request_id: String::new(),
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn complete_multipart_write(&self, upload_id: &str) -> Result<AddResult> {
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/multipart_write/complete",
|
||||
self.identifier
|
||||
))
|
||||
.query(&[("upload_id", upload_id)]);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
let parsed: serde_json::Value = serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse multipart complete response: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
let version = parsed["version"].as_u64().ok_or_else(|| Error::Http {
|
||||
source: "Missing version in multipart complete response".into(),
|
||||
request_id: String::new(),
|
||||
status_code: None,
|
||||
})?;
|
||||
Ok(AddResult { version })
|
||||
}
|
||||
|
||||
async fn abort_multipart_write(&self, upload_id: &str) -> Result<()> {
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/multipart_write/abort",
|
||||
self.identifier
|
||||
))
|
||||
.query(&[("upload_id", upload_id)]);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_mutable(&self) -> Result<()> {
|
||||
let read_guard = self.version.read().await;
|
||||
match *read_guard {
|
||||
@@ -817,6 +878,19 @@ mod test_utils {
|
||||
}
|
||||
|
||||
pub fn new_mock_with_config<F, T>(name: String, handler: F, config: ClientConfig) -> Self
|
||||
where
|
||||
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
Self::new_mock_with_version_and_config(name, handler, None, config)
|
||||
}
|
||||
|
||||
pub fn new_mock_with_version_and_config<F, T>(
|
||||
name: String,
|
||||
handler: F,
|
||||
version: Option<semver::Version>,
|
||||
config: ClientConfig,
|
||||
) -> Self
|
||||
where
|
||||
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
|
||||
T: Into<reqwest::Body>,
|
||||
@@ -827,7 +901,7 @@ mod test_utils {
|
||||
name: name.clone(),
|
||||
namespace: vec![],
|
||||
identifier: name,
|
||||
server_version: ServerVersion::default(),
|
||||
server_version: version.map(ServerVersion).unwrap_or_default(),
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
@@ -836,6 +910,185 @@ mod test_utils {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
fn is_retryable_write_error(&self, err: &Error) -> bool {
|
||||
match err {
|
||||
Error::Http {
|
||||
source,
|
||||
status_code,
|
||||
..
|
||||
} => {
|
||||
// Don't retry read errors (is_body/is_decode): the
|
||||
// server may have committed the write already, and
|
||||
// without an idempotency key we'd duplicate data.
|
||||
source
|
||||
.downcast_ref::<reqwest::Error>()
|
||||
.is_some_and(|e| e.is_connect())
|
||||
|| status_code.is_some_and(|s| self.client.retry_config.statuses.contains(&s))
|
||||
}
|
||||
// send_with_retry exhausted its internal retries on a retryable
|
||||
// status. The outer loop can still retry the whole operation with
|
||||
// a fresh session.
|
||||
Error::Retry { status_code, .. } => {
|
||||
status_code.is_some_and(|s| self.client.retry_config.statuses.contains(&s))
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_single_partition(&self, output: PreprocessingOutput) -> Result<AddResult> {
|
||||
use crate::remote::retry::RetryCounter;
|
||||
|
||||
let mut insert: Arc<dyn ExecutionPlan> = Arc::new(RemoteInsertExec::new(
|
||||
self.name.clone(),
|
||||
self.identifier.clone(),
|
||||
self.client.clone(),
|
||||
output.plan,
|
||||
output.overwrite,
|
||||
));
|
||||
|
||||
let mut retry_counter =
|
||||
RetryCounter::new(&self.client.retry_config, uuid::Uuid::new_v4().to_string());
|
||||
|
||||
loop {
|
||||
let stream = execute_plan(insert.clone(), Default::default())?;
|
||||
let result: Result<Vec<_>> = stream.try_collect().await.map_err(Error::from);
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let add_result = insert
|
||||
.as_any()
|
||||
.downcast_ref::<RemoteInsertExec<S>>()
|
||||
.and_then(|i| i.add_result())
|
||||
.unwrap_or(AddResult { version: 0 });
|
||||
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
|
||||
return Ok(add_result);
|
||||
}
|
||||
Err(err) if output.rescannable && self.is_retryable_write_error(&err) => {
|
||||
retry_counter.increment_from_error(err)?;
|
||||
tokio::time::sleep(retry_counter.next_sleep_time()).await;
|
||||
insert = insert.reset_state()?;
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_multipart(
|
||||
&self,
|
||||
output: PreprocessingOutput,
|
||||
num_partitions: usize,
|
||||
) -> Result<AddResult> {
|
||||
use crate::remote::retry::RetryCounter;
|
||||
|
||||
let mut retry_counter =
|
||||
RetryCounter::new(&self.client.retry_config, uuid::Uuid::new_v4().to_string());
|
||||
|
||||
loop {
|
||||
let upload_id = self.create_multipart_write().await?;
|
||||
|
||||
let result = self
|
||||
.execute_multipart_inserts(&upload_id, &output, num_partitions)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => match self.complete_multipart_write(&upload_id).await {
|
||||
Ok(result) => {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
if let Err(abort_err) = self.abort_multipart_write(&upload_id).await {
|
||||
log::warn!(
|
||||
"Failed to abort multipart write {}: {}",
|
||||
upload_id,
|
||||
abort_err
|
||||
);
|
||||
}
|
||||
if output.rescannable && self.is_retryable_write_error(&e) {
|
||||
retry_counter.increment_from_error(e)?;
|
||||
tokio::time::sleep(retry_counter.next_sleep_time()).await;
|
||||
continue;
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
if let Err(abort_err) = self.abort_multipart_write(&upload_id).await {
|
||||
log::warn!(
|
||||
"Failed to abort multipart write {}: {}",
|
||||
upload_id,
|
||||
abort_err
|
||||
);
|
||||
}
|
||||
if output.rescannable && self.is_retryable_write_error(&e) {
|
||||
retry_counter.increment_from_error(e)?;
|
||||
tokio::time::sleep(retry_counter.next_sleep_time()).await;
|
||||
continue;
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_multipart_inserts(
|
||||
&self,
|
||||
upload_id: &str,
|
||||
output: &PreprocessingOutput,
|
||||
num_partitions: usize,
|
||||
) -> Result<()> {
|
||||
let plan = Arc::new(
|
||||
datafusion_physical_plan::repartition::RepartitionExec::try_new(
|
||||
output.plan.clone(),
|
||||
datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
|
||||
)?,
|
||||
) as Arc<dyn ExecutionPlan>;
|
||||
|
||||
let insert = Arc::new(RemoteInsertExec::new_multipart(
|
||||
self.name.clone(),
|
||||
self.identifier.clone(),
|
||||
self.client.clone(),
|
||||
plan,
|
||||
output.overwrite,
|
||||
upload_id.to_string(),
|
||||
));
|
||||
|
||||
let task_ctx = Arc::new(datafusion_execution::TaskContext::default());
|
||||
let mut join_set = tokio::task::JoinSet::new();
|
||||
for partition in 0..num_partitions {
|
||||
let exec = insert.clone();
|
||||
let ctx = task_ctx.clone();
|
||||
join_set.spawn(async move {
|
||||
let mut stream = exec
|
||||
.execute(partition, ctx)
|
||||
.map_err(|e| -> Error { e.into() })?;
|
||||
while let Some(batch) = stream.next().await {
|
||||
batch.map_err(|e| -> Error { e.into() })?;
|
||||
}
|
||||
Ok::<_, Error>(())
|
||||
});
|
||||
}
|
||||
|
||||
// JoinSet aborts all remaining tasks when dropped, so if we return
|
||||
// early on error the orphaned tasks are automatically cancelled.
|
||||
while let Some(result) = join_set.join_next().await {
|
||||
result.map_err(|e| Error::Runtime {
|
||||
message: format!("Insert task panicked: {}", e),
|
||||
})??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
@@ -986,74 +1239,44 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
async fn add(&self, add: AddDataBuilder) -> Result<AddResult> {
|
||||
use crate::remote::retry::RetryCounter;
|
||||
|
||||
async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
|
||||
self.check_mutable().await?;
|
||||
|
||||
let table_schema = self.schema().await?;
|
||||
let table_def = TableDefinition::try_from_rich_schema(table_schema.clone())?;
|
||||
|
||||
let num_partitions = if let Some(parallelism) = add.write_parallelism {
|
||||
if parallelism > 1 && self.server_version.support_multipart_write() {
|
||||
parallelism
|
||||
} else {
|
||||
1
|
||||
}
|
||||
} else if self.server_version.support_multipart_write() {
|
||||
// Peek at the first batch to estimate write partitions, same as NativeTable.
|
||||
let mut peeked = PeekedScannable::new(add.data);
|
||||
let n = if let Some(first_batch) = peeked.peek().await {
|
||||
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
|
||||
estimate_write_partitions(
|
||||
first_batch.get_array_memory_size(),
|
||||
first_batch.num_rows(),
|
||||
peeked.num_rows(),
|
||||
max_partitions,
|
||||
)
|
||||
} else {
|
||||
1
|
||||
};
|
||||
add.data = Box::new(peeked);
|
||||
n
|
||||
} else {
|
||||
1
|
||||
};
|
||||
|
||||
let output = add.into_plan(&table_schema, &table_def)?;
|
||||
|
||||
let mut insert: Arc<dyn ExecutionPlan> = Arc::new(RemoteInsertExec::new(
|
||||
self.name.clone(),
|
||||
self.identifier.clone(),
|
||||
self.client.clone(),
|
||||
output.plan,
|
||||
output.overwrite,
|
||||
));
|
||||
|
||||
let mut retry_counter =
|
||||
RetryCounter::new(&self.client.retry_config, uuid::Uuid::new_v4().to_string());
|
||||
|
||||
loop {
|
||||
let stream = execute_plan(insert.clone(), Default::default())?;
|
||||
let result: Result<Vec<_>> = stream.try_collect().await.map_err(Error::from);
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let add_result = insert
|
||||
.as_any()
|
||||
.downcast_ref::<RemoteInsertExec<S>>()
|
||||
.and_then(|i| i.add_result())
|
||||
.unwrap_or(AddResult { version: 0 });
|
||||
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
|
||||
return Ok(add_result);
|
||||
}
|
||||
Err(err) if output.rescannable => {
|
||||
let retryable = match &err {
|
||||
Error::Http {
|
||||
source,
|
||||
status_code,
|
||||
..
|
||||
} => {
|
||||
// Don't retry read errors (is_body/is_decode): the
|
||||
// server may have committed the write already, and
|
||||
// without an idempotency key we'd duplicate data.
|
||||
source
|
||||
.downcast_ref::<reqwest::Error>()
|
||||
.is_some_and(|e| e.is_connect())
|
||||
|| status_code
|
||||
.is_some_and(|s| self.client.retry_config.statuses.contains(&s))
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if retryable {
|
||||
retry_counter.increment_from_error(err)?;
|
||||
tokio::time::sleep(retry_counter.next_sleep_time()).await;
|
||||
insert = insert.reset_state()?;
|
||||
continue;
|
||||
}
|
||||
|
||||
return Err(err);
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
if num_partitions > 1 {
|
||||
self.add_multipart(output, num_partitions).await
|
||||
} else {
|
||||
self.add_single_partition(output).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1811,6 +2034,7 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::remote::client::{ClientConfig, RetryConfig};
|
||||
use crate::table::AddDataMode;
|
||||
|
||||
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
|
||||
@@ -4831,4 +5055,516 @@ mod tests {
|
||||
assert_eq!(data.len(), 1);
|
||||
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
|
||||
}
|
||||
|
||||
fn schema_json() -> &'static str {
|
||||
r#"{"fields": [{"name": "id", "type": {"type": "int32"}, "nullable": true}]}"#
|
||||
}
|
||||
|
||||
fn simple_describe_response() -> http::Response<String> {
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(format!(r#"{{"version": 1, "schema": {}}}"#, schema_json()))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_happy_path() {
|
||||
use std::sync::Mutex;
|
||||
|
||||
let create_count = Arc::new(AtomicUsize::new(0));
|
||||
let insert_count = Arc::new(AtomicUsize::new(0));
|
||||
let complete_count = Arc::new(AtomicUsize::new(0));
|
||||
let abort_count = Arc::new(AtomicUsize::new(0));
|
||||
let upload_ids = Arc::new(Mutex::new(Vec::<String>::new()));
|
||||
|
||||
let create_count_c = create_count.clone();
|
||||
let insert_count_c = insert_count.clone();
|
||||
let complete_count_c = complete_count.clone();
|
||||
let abort_count_c = abort_count.clone();
|
||||
let upload_ids_c = upload_ids.clone();
|
||||
|
||||
let table = Table::new_with_handler_version(
|
||||
"my_table",
|
||||
semver::Version::new(0, 4, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
let query = request.url().query().unwrap_or("");
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/create" {
|
||||
create_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"upload_id": "test-upload-123"}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
insert_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
let uid = url::form_urlencoded::parse(query.as_bytes())
|
||||
.find(|(k, _)| k == "upload_id")
|
||||
.map(|(_, v)| v.to_string());
|
||||
upload_ids_c
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push(uid.expect("missing upload_id on insert"));
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/complete" {
|
||||
complete_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
let uid = url::form_urlencoded::parse(query.as_bytes())
|
||||
.find(|(k, _)| k == "upload_id")
|
||||
.map(|(_, v)| v.to_string());
|
||||
upload_ids_c
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push(uid.expect("missing upload_id on complete"));
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 5}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/abort" {
|
||||
abort_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(String::new())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
);
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table
|
||||
.add(vec![batch])
|
||||
.write_parallelism(2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.version, 5);
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 1);
|
||||
assert!(
|
||||
insert_count.load(Ordering::SeqCst) > 1,
|
||||
"Expected multiple insert calls, got {}",
|
||||
insert_count.load(Ordering::SeqCst)
|
||||
);
|
||||
assert_eq!(complete_count.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 0);
|
||||
|
||||
let ids = upload_ids.lock().unwrap();
|
||||
assert!(
|
||||
ids.iter().all(|id| id == "test-upload-123"),
|
||||
"All requests should use the same upload_id, got: {:?}",
|
||||
*ids
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_fallback_old_server() {
|
||||
let insert_count = Arc::new(AtomicUsize::new(0));
|
||||
let create_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let insert_count_c = insert_count.clone();
|
||||
let create_count_c = create_count.clone();
|
||||
|
||||
// Server version 0.3.0 does not support multipart writes
|
||||
let table = Table::new_with_handler_version(
|
||||
"my_table",
|
||||
semver::Version::new(0, 3, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path.contains("multipart_write") {
|
||||
create_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
panic!("Should not call multipart write endpoints on old server");
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
let query = request.url().query().unwrap_or("");
|
||||
assert!(
|
||||
!query.contains("upload_id"),
|
||||
"Should not have upload_id for old server"
|
||||
);
|
||||
insert_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 2}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
);
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table
|
||||
.add(vec![batch])
|
||||
.write_parallelism(2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.version, 2);
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(insert_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_small_data_single_partition() {
|
||||
let insert_count = Arc::new(AtomicUsize::new(0));
|
||||
let create_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let insert_count_c = insert_count.clone();
|
||||
let create_count_c = create_count.clone();
|
||||
|
||||
let table = Table::new_with_handler_version(
|
||||
"my_table",
|
||||
semver::Version::new(0, 4, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path.contains("multipart_write") {
|
||||
create_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
panic!("Should not call multipart write endpoints for small data");
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
let query = request.url().query().unwrap_or("");
|
||||
assert!(
|
||||
!query.contains("upload_id"),
|
||||
"Should not have upload_id for small data"
|
||||
);
|
||||
insert_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 2}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
);
|
||||
|
||||
// Small data: only 3 rows
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table.add(vec![batch]).execute().await.unwrap();
|
||||
|
||||
assert_eq!(result.version, 2);
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(insert_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_abort_on_insert_failure() {
|
||||
let create_count = Arc::new(AtomicUsize::new(0));
|
||||
let insert_count = Arc::new(AtomicUsize::new(0));
|
||||
let complete_count = Arc::new(AtomicUsize::new(0));
|
||||
let abort_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let create_count_c = create_count.clone();
|
||||
let insert_count_c = insert_count.clone();
|
||||
let complete_count_c = complete_count.clone();
|
||||
let abort_count_c = abort_count.clone();
|
||||
|
||||
let table = Table::new_with_handler_version(
|
||||
"my_table",
|
||||
semver::Version::new(0, 4, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/create" {
|
||||
create_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"upload_id": "test-upload-456"}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
let count = insert_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
// Fail on the first insert with non-retryable status
|
||||
if count == 0 {
|
||||
return http::Response::builder()
|
||||
.status(400)
|
||||
.body("Bad Request".to_string())
|
||||
.unwrap();
|
||||
}
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/complete" {
|
||||
complete_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 5}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/abort" {
|
||||
abort_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(String::new())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
);
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table.add(vec![batch]).write_parallelism(2).execute().await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(complete_count.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_abort_on_complete_failure() {
|
||||
let abort_count = Arc::new(AtomicUsize::new(0));
|
||||
let abort_count_c = abort_count.clone();
|
||||
|
||||
let table = Table::new_with_handler_version(
|
||||
"my_table",
|
||||
semver::Version::new(0, 4, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/create" {
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"upload_id": "test-upload-789"}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/complete" {
|
||||
return http::Response::builder()
|
||||
.status(400)
|
||||
.body("Bad Request".to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/abort" {
|
||||
abort_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(String::new())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
);
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table.add(vec![batch]).write_parallelism(2).execute().await;
|
||||
|
||||
assert!(result.is_err());
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
fn retry_config_no_backoff() -> ClientConfig {
|
||||
ClientConfig {
|
||||
retry_config: RetryConfig {
|
||||
retries: Some(3),
|
||||
connect_retries: Some(3),
|
||||
read_retries: Some(3),
|
||||
backoff_factor: Some(0.0),
|
||||
backoff_jitter: Some(0.0),
|
||||
statuses: Some(vec![502, 503]),
|
||||
},
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_retry_on_partition_failure() {
|
||||
// All inserts for the first upload session return 503 (retryable).
|
||||
// After exhausting internal retries, the outer loop retries with a
|
||||
// new session and succeeds.
|
||||
let create_count = Arc::new(AtomicUsize::new(0));
|
||||
let complete_count = Arc::new(AtomicUsize::new(0));
|
||||
let abort_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let create_count_c = create_count.clone();
|
||||
let complete_count_c = complete_count.clone();
|
||||
let abort_count_c = abort_count.clone();
|
||||
|
||||
let table = Table::new_with_handler_version_and_config(
|
||||
"my_table",
|
||||
semver::Version::new(0, 4, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
let query = request.url().query().unwrap_or("");
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/create" {
|
||||
let n = create_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
let body = format!(r#"{{"upload_id": "upload-{}"}}"#, n + 1);
|
||||
return http::Response::builder().status(200).body(body).unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
// Fail all inserts for the first session
|
||||
if query.contains("upload_id=upload-1") {
|
||||
return http::Response::builder()
|
||||
.status(503)
|
||||
.body("Service Unavailable".to_string())
|
||||
.unwrap();
|
||||
}
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/complete" {
|
||||
complete_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 7}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/abort" {
|
||||
abort_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(String::new())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
retry_config_no_backoff(),
|
||||
);
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table
|
||||
.add(vec![batch])
|
||||
.write_parallelism(2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.version, 7);
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(complete_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_write_retry_on_complete_failure() {
|
||||
// Complete returns 503 for the first session, succeeds for the second.
|
||||
let create_count = Arc::new(AtomicUsize::new(0));
|
||||
let abort_count = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let create_count_c = create_count.clone();
|
||||
let abort_count_c = abort_count.clone();
|
||||
|
||||
let table = Table::new_with_handler_version_and_config(
|
||||
"my_table",
|
||||
semver::Version::new(0, 4, 0),
|
||||
move |request| {
|
||||
let path = request.url().path();
|
||||
let query = request.url().query().unwrap_or("");
|
||||
|
||||
if path == "/v1/table/my_table/describe/" {
|
||||
return simple_describe_response();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/create" {
|
||||
let n = create_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
let body = format!(r#"{{"upload_id": "upload-{}"}}"#, n + 1);
|
||||
return http::Response::builder().status(200).body(body).unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/insert/" {
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/complete" {
|
||||
// Fail complete for first session
|
||||
if query.contains("upload_id=upload-1") {
|
||||
return http::Response::builder()
|
||||
.status(503)
|
||||
.body("Service Unavailable".to_string())
|
||||
.unwrap();
|
||||
}
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 9}"#.to_string())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
if path == "/v1/table/my_table/multipart_write/abort" {
|
||||
abort_count_c.fetch_add(1, Ordering::SeqCst);
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(String::new())
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
panic!("Unexpected request path: {}", path);
|
||||
},
|
||||
retry_config_no_backoff(),
|
||||
);
|
||||
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
|
||||
let result = table
|
||||
.add(vec![batch])
|
||||
.write_parallelism(2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.version, 9);
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,9 @@ use datafusion_common::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
|
||||
use datafusion_physical_expr::EquivalenceProperties;
|
||||
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
|
||||
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
|
||||
use datafusion_physical_plan::{
|
||||
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use http::header::CONTENT_TYPE;
|
||||
|
||||
@@ -25,10 +27,12 @@ use crate::table::datafusion::insert::COUNT_SCHEMA;
|
||||
|
||||
/// ExecutionPlan for inserting data into a remote LanceDB table.
|
||||
///
|
||||
/// This plan:
|
||||
/// 1. Requires single partition (no parallel remote inserts yet)
|
||||
/// 2. Streams data as Arrow IPC to `/v1/table/{id}/insert/` endpoint
|
||||
/// 3. Stores AddResult for retrieval after execution
|
||||
/// Streams data as Arrow IPC to `/v1/table/{id}/insert/` endpoint.
|
||||
///
|
||||
/// When `upload_id` is set, inserts are staged as part of a multipart write
|
||||
/// session and the plan supports multiple partitions for parallel uploads.
|
||||
/// Without `upload_id`, the plan requires a single partition and commits
|
||||
/// immediately.
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteInsertExec<S: HttpSend = Sender> {
|
||||
table_name: String,
|
||||
@@ -38,10 +42,11 @@ pub struct RemoteInsertExec<S: HttpSend = Sender> {
|
||||
overwrite: bool,
|
||||
properties: PlanProperties,
|
||||
add_result: Arc<Mutex<Option<AddResult>>>,
|
||||
upload_id: Option<String>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend + 'static> RemoteInsertExec<S> {
|
||||
/// Create a new RemoteInsertExec.
|
||||
/// Create a new single-partition RemoteInsertExec.
|
||||
pub fn new(
|
||||
table_name: String,
|
||||
identifier: String,
|
||||
@@ -49,10 +54,49 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
overwrite: bool,
|
||||
) -> Self {
|
||||
Self::new_inner(table_name, identifier, client, input, overwrite, None)
|
||||
}
|
||||
|
||||
/// Create a multi-partition RemoteInsertExec for use with multipart writes.
|
||||
///
|
||||
/// Each partition's insert is staged under the given `upload_id` without
|
||||
/// committing. The caller is responsible for calling the complete (or abort)
|
||||
/// endpoint after all partitions finish.
|
||||
pub fn new_multipart(
|
||||
table_name: String,
|
||||
identifier: String,
|
||||
client: RestfulLanceDbClient<S>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
overwrite: bool,
|
||||
upload_id: String,
|
||||
) -> Self {
|
||||
Self::new_inner(
|
||||
table_name,
|
||||
identifier,
|
||||
client,
|
||||
input,
|
||||
overwrite,
|
||||
Some(upload_id),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_inner(
|
||||
table_name: String,
|
||||
identifier: String,
|
||||
client: RestfulLanceDbClient<S>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
overwrite: bool,
|
||||
upload_id: Option<String>,
|
||||
) -> Self {
|
||||
let num_partitions = if upload_id.is_some() {
|
||||
input.output_partitioning().partition_count()
|
||||
} else {
|
||||
1
|
||||
};
|
||||
let schema = COUNT_SCHEMA.clone();
|
||||
let properties = PlanProperties::new(
|
||||
EquivalenceProperties::new(schema),
|
||||
datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
|
||||
datafusion_physical_plan::Partitioning::UnknownPartitioning(num_partitions),
|
||||
datafusion_physical_plan::execution_plan::EmissionType::Final,
|
||||
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
|
||||
);
|
||||
@@ -65,6 +109,7 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
|
||||
overwrite,
|
||||
properties,
|
||||
add_result: Arc::new(Mutex::new(None)),
|
||||
upload_id,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,8 +219,11 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
|
||||
}
|
||||
|
||||
fn required_input_distribution(&self) -> Vec<datafusion_physical_plan::Distribution> {
|
||||
// Until we have a separate commit endpoint, we need to do all inserts in a single partition
|
||||
vec![datafusion_physical_plan::Distribution::SinglePartition]
|
||||
if self.upload_id.is_some() {
|
||||
vec![datafusion_physical_plan::Distribution::UnspecifiedDistribution]
|
||||
} else {
|
||||
vec![datafusion_physical_plan::Distribution::SinglePartition]
|
||||
}
|
||||
}
|
||||
|
||||
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
|
||||
@@ -191,12 +239,13 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
|
||||
"RemoteInsertExec requires exactly one child".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(Arc::new(Self::new(
|
||||
Ok(Arc::new(Self::new_inner(
|
||||
self.table_name.clone(),
|
||||
self.identifier.clone(),
|
||||
self.client.clone(),
|
||||
children[0].clone(),
|
||||
self.overwrite,
|
||||
self.upload_id.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
@@ -205,18 +254,20 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
|
||||
partition: usize,
|
||||
context: Arc<TaskContext>,
|
||||
) -> DataFusionResult<SendableRecordBatchStream> {
|
||||
if partition != 0 {
|
||||
if self.upload_id.is_none() && partition != 0 {
|
||||
return Err(DataFusionError::Internal(
|
||||
"RemoteInsertExec only supports single partition execution".to_string(),
|
||||
"RemoteInsertExec only supports single partition execution without upload_id"
|
||||
.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let input_stream = self.input.execute(0, context)?;
|
||||
let input_stream = self.input.execute(partition, context)?;
|
||||
let client = self.client.clone();
|
||||
let identifier = self.identifier.clone();
|
||||
let overwrite = self.overwrite;
|
||||
let add_result = self.add_result.clone();
|
||||
let table_name = self.table_name.clone();
|
||||
let upload_id = self.upload_id.clone();
|
||||
|
||||
let stream = futures::stream::once(async move {
|
||||
let mut request = client
|
||||
@@ -226,6 +277,9 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
|
||||
if overwrite {
|
||||
request = request.query(&[("mode", "overwrite")]);
|
||||
}
|
||||
if let Some(ref uid) = upload_id {
|
||||
request = request.query(&[("upload_id", uid.as_str())]);
|
||||
}
|
||||
|
||||
let (error_tx, mut error_rx) = tokio::sync::oneshot::channel();
|
||||
let body = Self::stream_as_http_body(input_stream, error_tx)?;
|
||||
@@ -262,28 +316,30 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
|
||||
|
||||
let (request_id, response) = result?;
|
||||
|
||||
let body_text = response.text().await.map_err(|e| {
|
||||
DataFusionError::External(Box::new(Error::Http {
|
||||
source: Box::new(e),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
}))
|
||||
})?;
|
||||
|
||||
let parsed_result = if body_text.trim().is_empty() {
|
||||
// Backward compatible with old servers
|
||||
AddResult { version: 0 }
|
||||
} else {
|
||||
serde_json::from_str(&body_text).map_err(|e| {
|
||||
// For multipart writes, the staging response is not the final
|
||||
// version. Only parse AddResult for non-multipart inserts.
|
||||
if upload_id.is_none() {
|
||||
let body_text = response.text().await.map_err(|e| {
|
||||
DataFusionError::External(Box::new(Error::Http {
|
||||
source: format!("Failed to parse add response: {}", e).into(),
|
||||
source: Box::new(e),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
}))
|
||||
})?
|
||||
};
|
||||
})?;
|
||||
|
||||
let parsed_result = if body_text.trim().is_empty() {
|
||||
// Backward compatible with old servers
|
||||
AddResult { version: 0 }
|
||||
} else {
|
||||
serde_json::from_str(&body_text).map_err(|e| {
|
||||
DataFusionError::External(Box::new(Error::Http {
|
||||
source: format!("Failed to parse add response: {}", e).into(),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
}))
|
||||
})?
|
||||
};
|
||||
|
||||
{
|
||||
let mut res_lock = add_result.lock().map_err(|_| {
|
||||
DataFusionError::Execution("Failed to acquire lock for add_result".to_string())
|
||||
})?;
|
||||
|
||||
@@ -75,6 +75,8 @@ pub mod query;
|
||||
pub mod schema_evolution;
|
||||
pub mod update;
|
||||
use crate::index::waiter::wait_for_index;
|
||||
#[cfg(feature = "remote")]
|
||||
pub(crate) use add_data::PreprocessingOutput;
|
||||
pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
|
||||
pub use chrono::Duration;
|
||||
pub use delete::DeleteResult;
|
||||
@@ -440,6 +442,34 @@ mod test_utils {
|
||||
embedding_registry: Arc::new(MemoryRegistry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_version_and_config<T>(
|
||||
name: impl Into<String>,
|
||||
version: semver::Version,
|
||||
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
|
||||
config: crate::remote::ClientConfig,
|
||||
) -> Self
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
let inner = Arc::new(
|
||||
crate::remote::table::RemoteTable::new_mock_with_version_and_config(
|
||||
name.into(),
|
||||
handler.clone(),
|
||||
Some(version),
|
||||
config.clone(),
|
||||
),
|
||||
);
|
||||
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
|
||||
handler, config,
|
||||
));
|
||||
Self {
|
||||
inner,
|
||||
database: Some(database),
|
||||
// Registry is unused.
|
||||
embedding_registry: Arc::new(MemoryRegistry::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2198,21 +2228,26 @@ impl BaseTable for NativeTable {
|
||||
|
||||
let table_schema = Schema::from(&ds.schema().clone());
|
||||
|
||||
// Peek at the first batch to estimate a good partition count for
|
||||
// write parallelism.
|
||||
let mut peeked = PeekedScannable::new(add.data);
|
||||
let num_partitions = if let Some(first_batch) = peeked.peek().await {
|
||||
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
|
||||
estimate_write_partitions(
|
||||
first_batch.get_array_memory_size(),
|
||||
first_batch.num_rows(),
|
||||
peeked.num_rows(),
|
||||
max_partitions,
|
||||
)
|
||||
let num_partitions = if let Some(parallelism) = add.write_parallelism {
|
||||
parallelism
|
||||
} else {
|
||||
1
|
||||
// Peek at the first batch to estimate a good partition count for
|
||||
// write parallelism.
|
||||
let mut peeked = PeekedScannable::new(add.data);
|
||||
let n = if let Some(first_batch) = peeked.peek().await {
|
||||
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
|
||||
estimate_write_partitions(
|
||||
first_batch.get_array_memory_size(),
|
||||
first_batch.num_rows(),
|
||||
peeked.num_rows(),
|
||||
max_partitions,
|
||||
)
|
||||
} else {
|
||||
1
|
||||
};
|
||||
add.data = Box::new(peeked);
|
||||
n
|
||||
};
|
||||
add.data = Box::new(peeked);
|
||||
|
||||
let output = add.into_plan(&table_schema, &table_def)?;
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ pub struct AddDataBuilder {
|
||||
pub(crate) write_options: WriteOptions,
|
||||
pub(crate) on_nan_vectors: NaNVectorBehavior,
|
||||
pub(crate) embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
|
||||
pub(crate) write_parallelism: Option<usize>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for AddDataBuilder {
|
||||
@@ -77,6 +78,7 @@ impl AddDataBuilder {
|
||||
write_options: WriteOptions::default(),
|
||||
on_nan_vectors: NaNVectorBehavior::default(),
|
||||
embedding_registry,
|
||||
write_parallelism: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,7 +103,22 @@ impl AddDataBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the number of parallel write streams.
|
||||
///
|
||||
/// By default, the number of streams is estimated from the data size.
|
||||
/// Setting this to `1` disables parallel writes.
|
||||
pub fn write_parallelism(mut self, parallelism: usize) -> Self {
|
||||
self.write_parallelism = Some(parallelism);
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn execute(self) -> Result<AddResult> {
|
||||
if self.write_parallelism.map(|p| p == 0).unwrap_or(false) {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "write_parallelism must be greater than 0".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
self.parent.clone().add(self).await
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user