Compare commits

...

6 Commits

Author SHA1 Message Date
lancedb automation
108442a1a1 chore: update lance dependency to v4.1.0-beta.1 2026-03-22 22:34:57 +00:00
Will Jones
e6fd8d071e feat(rust): parallel inserts for remote tables via multipart write (#3071)
Similar to https://github.com/lancedb/lancedb/pull/3062, we can write in
parallel to remote tables if the input data source is large enough.

We take advantage of new endpoints coming in server version 0.4.0, which
allow writing data in multiple requests, and the committing at the end
in a single request.

To make testing easier, I also introduce a `write_parallelism`
parameter. In the future, we can expose that in Python and NodeJS so
users can manually specify the parallelism they get.

Closes #2861

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 13:19:07 -07:00
LanceDB Robot
670dcca551 feat: update lance dependency to v3.0.1 (#3168)
## Summary
- Updated Lance Rust workspace dependencies to `3.0.1` using
`ci/set_lance_version.py`.
- Updated Java `lance-core` dependency property in `java/pom.xml` to
`3.0.1`.
- Refreshed `Cargo.lock` entries for Lance crates at `3.0.1`.

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`

## Trigger
- Tag:
[`refs/tags/v3.0.1`](https://github.com/lancedb/lance/tree/v3.0.1)

Co-authored-by: Esteban Gutierrez <estebangtz@gmail.com>
2026-03-20 09:53:20 -07:00
Prashanth Rao
ed7e01a58b docs: fix rendering issues with missing index types in API docs (#3143)
## Problem

The generated Python API docs for
`lancedb.table.IndexStatistics.index_type` were misleading because
mkdocstrings renders that field’s type annotation directly, and the
existing `Literal[...]` listed only a subset of the actual canonical SDK
index type strings.

Current (missing index types):
<img width="823" height="83" alt="image"
src="https://github.com/user-attachments/assets/f6f29fe3-4c16-4d00-a4e9-28a7cd6e19ec"
/>


## Fix

- Update the `IndexStatistics.index_type` annotation in
`python/python/lancedb/table.py` to include the full supported set of
canonical values, so the generated docs show all valid index_type
strings inline.
- Add a small regression test in `python/python/tests/test_index.py` to
ensure the docs-facing annotation does not drift silently again in case
we add a new index/quantization type in the future.
- Bumps mkdocs and material theme versions to mkdocs 1.6 to allow access
to more features like hooks

After fix (all index types are included and tested for in the
annotations):
<img width="1017" height="93" alt="image"
src="https://github.com/user-attachments/assets/66c74d5c-34b3-4b44-8173-3ee23e3648ac"
/>
2026-03-20 09:34:42 -07:00
Lance Release
3450ccaf7f Bump version: 0.27.1-beta.0 → 0.27.1 2026-03-20 00:35:36 +00:00
Lance Release
9b229f1e7c Bump version: 0.27.0 → 0.27.1-beta.0 2026-03-20 00:35:19 +00:00
25 changed files with 1060 additions and 196 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.27.0"
current_version = "0.27.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

92
Cargo.lock generated
View File

@@ -3070,9 +3070,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a32ddfc5478379cd1782bdd9d7d1411063f563e5b338fc73bafe5916451a5b9d"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-array",
"rand 0.9.2",
@@ -4242,9 +4241,8 @@ dependencies = [
[[package]]
name = "lance"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95c5ce428fda0721f5c48bfde17a1921c4da2d2142b2f46a16c89abf5fce8003"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-arith",
@@ -4310,9 +4308,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9fdaf99863fa0d631e422881e88be4837d8b82f36a87143d723a9d285acec4b"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4332,9 +4329,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "866b1634d38d94e8ab86fbcf238ac82dc8a5f72a4a6a90525f29899772e7cc7f"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrayref",
"paste",
@@ -4343,9 +4339,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "977c29f4e48c201c2806fe6ae117b65d0287eda236acd07357b556a54b0d5c5a"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4382,9 +4377,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ccc72695473f4207df4c6df3b347a63e84c32c0bc36bf42a7d86e8a7c0c67e2"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-array",
@@ -4414,9 +4408,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fe84d76944acd834ded14d7562663af995556e0c6594f4b4ac69b0183f99c1a"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-array",
@@ -4434,9 +4427,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1007242188e5d53c98717e7f2cb340dc80eb9c94c2b935587598919b3a36bd"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4473,9 +4465,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f80088e418941f39cf5599d166ae1a6ef498cc2d967652a0692477d4871a9277"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4507,9 +4498,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0011daf1ddde99becffd2ae235ad324576736a526c54ffbc4d7e583872f1215"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-arith",
@@ -4523,6 +4513,7 @@ dependencies = [
"bitpacking",
"bitvec",
"bytes",
"chrono",
"crossbeam-queue",
"datafusion",
"datafusion-common",
@@ -4572,9 +4563,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfa8a74e93753d19a27ce3adaeb99e31227df13ad5926dd43572be76b43dd284"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-arith",
@@ -4615,9 +4605,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e2d8da8f6b8dd37ab3b8199896ee265817f86232e3727c0b0eeb3c9093b64d9"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4633,23 +4622,22 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f176e427d9c35938d8a7097876114bc35dfd280b06077779753f2effe3e86aab"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"async-trait",
"bytes",
"lance-core",
"lance-namespace-reqwest-client",
"serde",
"snafu 0.9.0",
]
[[package]]
name = "lance-namespace-impls"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "663c32086ecfab311acb0813c65a4bb352a5b648ccf8b513c24697ce8d412039"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4680,9 +4668,9 @@ dependencies = [
[[package]]
name = "lance-namespace-reqwest-client"
version = "0.5.2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ad4c947349acd6e37e984eba0254588bd894e6128434338b9e6904e56fb4633"
checksum = "ee2e48de899e2931afb67fcddd0a08e439bf5d8b6ea2a2ed9cb8f4df669bd5cc"
dependencies = [
"reqwest",
"serde",
@@ -4693,9 +4681,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa189b3081481a97b64cf1161297947a63b8adb941b1950989d0269858703a43"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow",
"arrow-array",
@@ -4734,9 +4721,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79a6f4ab0788ee82893bac5de4ff0d0d88bba96de87db4b6e18b1883616d4dbe"
version = "4.1.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v4.1.0-beta.1#88c627bb093c0fb078e80fdd9140a6c5f7f94d5b"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4747,7 +4733,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.27.0"
version = "0.27.1"
dependencies = [
"ahash",
"anyhow",
@@ -4829,7 +4815,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.27.0"
version = "0.27.1"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4849,7 +4835,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.30.0"
version = "0.30.1"
dependencies = [
"arrow",
"async-trait",

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { version = "=3.0.1", default-features = false }
lance-core = { version = "=3.0.1" }
lance-datagen = { version = "=3.0.1" }
lance-file = { version = "=3.0.1" }
lance-io = { version = "=3.0.1", default-features = false }
lance-index = { version = "=3.0.1" }
lance-linalg = { version = "=3.0.1" }
lance-namespace = { version = "=3.0.1" }
lance-namespace-impls = { version = "=3.0.1", default-features = false }
lance-table = { version = "=3.0.1" }
lance-testing = { version = "=3.0.1" }
lance-datafusion = { version = "=3.0.1" }
lance-encoding = { version = "=3.0.1" }
lance-arrow = { version = "=3.0.1" }
lance = { "version" = "=4.1.0-beta.1", default-features = false, "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=4.1.0-beta.1", default-features = false, "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=4.1.0-beta.1", default-features = false, "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=4.1.0-beta.1", "tag" = "v4.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }

View File

@@ -1,8 +1,8 @@
mkdocs==1.5.3
mkdocs==1.6.1
mkdocs-jupyter==0.24.1
mkdocs-material==9.5.3
mkdocs-material==9.6.23
mkdocs-autorefs>=0.5,<=1.0
mkdocstrings[python]==0.25.2
mkdocstrings[python]>=0.24,<1.0
griffe>=0.40,<1.0
mkdocs-render-swagger-plugin>=0.1.0
pydantic>=2.0,<3.0

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.27.0</version>
<version>0.27.1</version>
</dependency>
```

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-final.0</version>
<version>0.27.1-final.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-final.0</version>
<version>0.27.1-final.0</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>3.1.0-beta.2</lance-core.version>
<lance-core.version>4.1.0-beta.1</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.27.0"
version = "0.27.1"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.27.0",
"version": "0.27.1",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.27.0",
"version": "0.27.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.27.0",
"version": "0.27.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.27.0",
"version": "0.27.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.27.0",
"version": "0.27.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.27.0",
"version": "0.27.1",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.27.0",
"version": "0.27.1",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.27.0",
"version": "0.27.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.27.0",
"version": "0.27.1",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.27.0",
"version": "0.27.1",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -4751,7 +4751,16 @@ class IndexStatistics:
num_indexed_rows: int
num_unindexed_rows: int
index_type: Literal[
"IVF_PQ", "IVF_HNSW_PQ", "IVF_HNSW_SQ", "FTS", "BTREE", "BITMAP", "LABEL_LIST"
"IVF_FLAT",
"IVF_SQ",
"IVF_PQ",
"IVF_RQ",
"IVF_HNSW_SQ",
"IVF_HNSW_PQ",
"FTS",
"BTREE",
"BITMAP",
"LABEL_LIST",
]
distance_type: Optional[Literal["l2", "cosine", "dot"]] = None
num_indices: Optional[int] = None

View File

@@ -3,6 +3,7 @@
from datetime import timedelta
import random
from typing import get_args, get_type_hints
import pyarrow as pa
import pytest
@@ -22,6 +23,7 @@ from lancedb.index import (
HnswSq,
FTS,
)
from lancedb.table import IndexStatistics
@pytest_asyncio.fixture
@@ -283,3 +285,23 @@ async def test_create_index_with_binary_vectors(binary_table: AsyncTable):
for v in range(256):
res = await binary_table.query().nearest_to([v] * 128).to_arrow()
assert res["id"][0].as_py() == v
def test_index_statistics_index_type_lists_all_supported_values():
expected_index_types = {
"IVF_FLAT",
"IVF_SQ",
"IVF_PQ",
"IVF_RQ",
"IVF_HNSW_SQ",
"IVF_HNSW_PQ",
"FTS",
"BTREE",
"BITMAP",
"LABEL_LIST",
}
assert (
set(get_args(get_type_hints(IndexStatistics)["index_type"]))
== expected_index_types
)

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.27.0"
version = "0.27.1"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -72,6 +72,10 @@ impl ServerVersion {
pub fn support_structural_fts(&self) -> bool {
self.0 >= semver::Version::new(0, 3, 0)
}
pub fn support_multipart_write(&self) -> bool {
self.0 >= semver::Version::new(0, 4, 0)
}
}
pub const OPT_REMOTE_PREFIX: &str = "remote_database_";

View File

@@ -10,6 +10,7 @@ use super::ARROW_STREAM_CONTENT_TYPE;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::index::Index;
use crate::index::IndexStatistics;
use crate::index::waiter::wait_for_index;
@@ -23,7 +24,7 @@ use crate::table::MergeResult;
use crate::table::Tags;
use crate::table::UpdateResult;
use crate::table::query::create_multi_vector_plan;
use crate::table::{AnyQuery, Filter, TableStatistics};
use crate::table::{AnyQuery, Filter, PreprocessingOutput, TableStatistics};
use crate::utils::background_cache::BackgroundCache;
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error};
@@ -43,7 +44,7 @@ use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use http::header::CONTENT_TYPE;
use http::{HeaderName, StatusCode};
use lance::arrow::json::{JsonDataType, JsonSchema};
@@ -614,6 +615,66 @@ impl<S: HttpSend> RemoteTable<S> {
Ok(bodies)
}
async fn create_multipart_write(&self) -> Result<String> {
let request = self.client.post(&format!(
"/v1/table/{}/multipart_write/create",
self.identifier
));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
let parsed: serde_json::Value = serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse multipart create response: {}", e).into(),
request_id,
status_code: None,
})?;
parsed["upload_id"]
.as_str()
.map(|s| s.to_string())
.ok_or_else(|| Error::Http {
source: "Missing upload_id in multipart create response".into(),
request_id: String::new(),
status_code: None,
})
}
async fn complete_multipart_write(&self, upload_id: &str) -> Result<AddResult> {
let request = self
.client
.post(&format!(
"/v1/table/{}/multipart_write/complete",
self.identifier
))
.query(&[("upload_id", upload_id)]);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
let parsed: serde_json::Value = serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse multipart complete response: {}", e).into(),
request_id,
status_code: None,
})?;
let version = parsed["version"].as_u64().ok_or_else(|| Error::Http {
source: "Missing version in multipart complete response".into(),
request_id: String::new(),
status_code: None,
})?;
Ok(AddResult { version })
}
async fn abort_multipart_write(&self, upload_id: &str) -> Result<()> {
let request = self
.client
.post(&format!(
"/v1/table/{}/multipart_write/abort",
self.identifier
))
.query(&[("upload_id", upload_id)]);
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn check_mutable(&self) -> Result<()> {
let read_guard = self.version.read().await;
match *read_guard {
@@ -817,6 +878,19 @@ mod test_utils {
}
pub fn new_mock_with_config<F, T>(name: String, handler: F, config: ClientConfig) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
{
Self::new_mock_with_version_and_config(name, handler, None, config)
}
pub fn new_mock_with_version_and_config<F, T>(
name: String,
handler: F,
version: Option<semver::Version>,
config: ClientConfig,
) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
@@ -827,7 +901,7 @@ mod test_utils {
name: name.clone(),
namespace: vec![],
identifier: name,
server_version: ServerVersion::default(),
server_version: version.map(ServerVersion).unwrap_or_default(),
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
@@ -836,6 +910,185 @@ mod test_utils {
}
}
impl<S: HttpSend + 'static> RemoteTable<S> {
fn is_retryable_write_error(&self, err: &Error) -> bool {
match err {
Error::Http {
source,
status_code,
..
} => {
// Don't retry read errors (is_body/is_decode): the
// server may have committed the write already, and
// without an idempotency key we'd duplicate data.
source
.downcast_ref::<reqwest::Error>()
.is_some_and(|e| e.is_connect())
|| status_code.is_some_and(|s| self.client.retry_config.statuses.contains(&s))
}
// send_with_retry exhausted its internal retries on a retryable
// status. The outer loop can still retry the whole operation with
// a fresh session.
Error::Retry { status_code, .. } => {
status_code.is_some_and(|s| self.client.retry_config.statuses.contains(&s))
}
_ => false,
}
}
async fn add_single_partition(&self, output: PreprocessingOutput) -> Result<AddResult> {
use crate::remote::retry::RetryCounter;
let mut insert: Arc<dyn ExecutionPlan> = Arc::new(RemoteInsertExec::new(
self.name.clone(),
self.identifier.clone(),
self.client.clone(),
output.plan,
output.overwrite,
));
let mut retry_counter =
RetryCounter::new(&self.client.retry_config, uuid::Uuid::new_v4().to_string());
loop {
let stream = execute_plan(insert.clone(), Default::default())?;
let result: Result<Vec<_>> = stream.try_collect().await.map_err(Error::from);
match result {
Ok(_) => {
let add_result = insert
.as_any()
.downcast_ref::<RemoteInsertExec<S>>()
.and_then(|i| i.add_result())
.unwrap_or(AddResult { version: 0 });
if output.overwrite {
self.invalidate_schema_cache();
}
return Ok(add_result);
}
Err(err) if output.rescannable && self.is_retryable_write_error(&err) => {
retry_counter.increment_from_error(err)?;
tokio::time::sleep(retry_counter.next_sleep_time()).await;
insert = insert.reset_state()?;
continue;
}
Err(err) => return Err(err),
}
}
}
async fn add_multipart(
&self,
output: PreprocessingOutput,
num_partitions: usize,
) -> Result<AddResult> {
use crate::remote::retry::RetryCounter;
let mut retry_counter =
RetryCounter::new(&self.client.retry_config, uuid::Uuid::new_v4().to_string());
loop {
let upload_id = self.create_multipart_write().await?;
let result = self
.execute_multipart_inserts(&upload_id, &output, num_partitions)
.await;
match result {
Ok(()) => match self.complete_multipart_write(&upload_id).await {
Ok(result) => {
if output.overwrite {
self.invalidate_schema_cache();
}
return Ok(result);
}
Err(e) => {
if let Err(abort_err) = self.abort_multipart_write(&upload_id).await {
log::warn!(
"Failed to abort multipart write {}: {}",
upload_id,
abort_err
);
}
if output.rescannable && self.is_retryable_write_error(&e) {
retry_counter.increment_from_error(e)?;
tokio::time::sleep(retry_counter.next_sleep_time()).await;
continue;
}
return Err(e);
}
},
Err(e) => {
if let Err(abort_err) = self.abort_multipart_write(&upload_id).await {
log::warn!(
"Failed to abort multipart write {}: {}",
upload_id,
abort_err
);
}
if output.rescannable && self.is_retryable_write_error(&e) {
retry_counter.increment_from_error(e)?;
tokio::time::sleep(retry_counter.next_sleep_time()).await;
continue;
}
return Err(e);
}
}
}
}
async fn execute_multipart_inserts(
&self,
upload_id: &str,
output: &PreprocessingOutput,
num_partitions: usize,
) -> Result<()> {
let plan = Arc::new(
datafusion_physical_plan::repartition::RepartitionExec::try_new(
output.plan.clone(),
datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
)?,
) as Arc<dyn ExecutionPlan>;
let insert = Arc::new(RemoteInsertExec::new_multipart(
self.name.clone(),
self.identifier.clone(),
self.client.clone(),
plan,
output.overwrite,
upload_id.to_string(),
));
let task_ctx = Arc::new(datafusion_execution::TaskContext::default());
let mut join_set = tokio::task::JoinSet::new();
for partition in 0..num_partitions {
let exec = insert.clone();
let ctx = task_ctx.clone();
join_set.spawn(async move {
let mut stream = exec
.execute(partition, ctx)
.map_err(|e| -> Error { e.into() })?;
while let Some(batch) = stream.next().await {
batch.map_err(|e| -> Error { e.into() })?;
}
Ok::<_, Error>(())
});
}
// JoinSet aborts all remaining tasks when dropped, so if we return
// early on error the orphaned tasks are automatically cancelled.
while let Some(result) = join_set.join_next().await {
result.map_err(|e| Error::Runtime {
message: format!("Insert task panicked: {}", e),
})??;
}
Ok(())
}
}
#[async_trait]
impl<S: HttpSend> BaseTable for RemoteTable<S> {
fn as_any(&self) -> &dyn std::any::Any {
@@ -986,74 +1239,44 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})
}
async fn add(&self, add: AddDataBuilder) -> Result<AddResult> {
use crate::remote::retry::RetryCounter;
async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
self.check_mutable().await?;
let table_schema = self.schema().await?;
let table_def = TableDefinition::try_from_rich_schema(table_schema.clone())?;
let num_partitions = if let Some(parallelism) = add.write_parallelism {
if parallelism > 1 && self.server_version.support_multipart_write() {
parallelism
} else {
1
}
} else if self.server_version.support_multipart_write() {
// Peek at the first batch to estimate write partitions, same as NativeTable.
let mut peeked = PeekedScannable::new(add.data);
let n = if let Some(first_batch) = peeked.peek().await {
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
estimate_write_partitions(
first_batch.get_array_memory_size(),
first_batch.num_rows(),
peeked.num_rows(),
max_partitions,
)
} else {
1
};
add.data = Box::new(peeked);
n
} else {
1
};
let output = add.into_plan(&table_schema, &table_def)?;
let mut insert: Arc<dyn ExecutionPlan> = Arc::new(RemoteInsertExec::new(
self.name.clone(),
self.identifier.clone(),
self.client.clone(),
output.plan,
output.overwrite,
));
let mut retry_counter =
RetryCounter::new(&self.client.retry_config, uuid::Uuid::new_v4().to_string());
loop {
let stream = execute_plan(insert.clone(), Default::default())?;
let result: Result<Vec<_>> = stream.try_collect().await.map_err(Error::from);
match result {
Ok(_) => {
let add_result = insert
.as_any()
.downcast_ref::<RemoteInsertExec<S>>()
.and_then(|i| i.add_result())
.unwrap_or(AddResult { version: 0 });
if output.overwrite {
self.invalidate_schema_cache();
}
return Ok(add_result);
}
Err(err) if output.rescannable => {
let retryable = match &err {
Error::Http {
source,
status_code,
..
} => {
// Don't retry read errors (is_body/is_decode): the
// server may have committed the write already, and
// without an idempotency key we'd duplicate data.
source
.downcast_ref::<reqwest::Error>()
.is_some_and(|e| e.is_connect())
|| status_code
.is_some_and(|s| self.client.retry_config.statuses.contains(&s))
}
_ => false,
};
if retryable {
retry_counter.increment_from_error(err)?;
tokio::time::sleep(retry_counter.next_sleep_time()).await;
insert = insert.reset_state()?;
continue;
}
return Err(err);
}
Err(err) => return Err(err),
}
if num_partitions > 1 {
self.add_multipart(output, num_partitions).await
} else {
self.add_single_partition(output).await
}
}
@@ -1811,6 +2034,7 @@ mod tests {
use super::*;
use crate::remote::client::{ClientConfig, RetryConfig};
use crate::table::AddDataMode;
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
@@ -4831,4 +5055,516 @@ mod tests {
assert_eq!(data.len(), 1);
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
}
fn schema_json() -> &'static str {
r#"{"fields": [{"name": "id", "type": {"type": "int32"}, "nullable": true}]}"#
}
fn simple_describe_response() -> http::Response<String> {
http::Response::builder()
.status(200)
.body(format!(r#"{{"version": 1, "schema": {}}}"#, schema_json()))
.unwrap()
}
#[tokio::test]
async fn test_multipart_write_happy_path() {
use std::sync::Mutex;
let create_count = Arc::new(AtomicUsize::new(0));
let insert_count = Arc::new(AtomicUsize::new(0));
let complete_count = Arc::new(AtomicUsize::new(0));
let abort_count = Arc::new(AtomicUsize::new(0));
let upload_ids = Arc::new(Mutex::new(Vec::<String>::new()));
let create_count_c = create_count.clone();
let insert_count_c = insert_count.clone();
let complete_count_c = complete_count.clone();
let abort_count_c = abort_count.clone();
let upload_ids_c = upload_ids.clone();
let table = Table::new_with_handler_version(
"my_table",
semver::Version::new(0, 4, 0),
move |request| {
let path = request.url().path();
let query = request.url().query().unwrap_or("");
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path == "/v1/table/my_table/multipart_write/create" {
create_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(r#"{"upload_id": "test-upload-123"}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/insert/" {
insert_count_c.fetch_add(1, Ordering::SeqCst);
let uid = url::form_urlencoded::parse(query.as_bytes())
.find(|(k, _)| k == "upload_id")
.map(|(_, v)| v.to_string());
upload_ids_c
.lock()
.unwrap()
.push(uid.expect("missing upload_id on insert"));
return http::Response::builder()
.status(200)
.body(r#"{"version": 1}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/complete" {
complete_count_c.fetch_add(1, Ordering::SeqCst);
let uid = url::form_urlencoded::parse(query.as_bytes())
.find(|(k, _)| k == "upload_id")
.map(|(_, v)| v.to_string());
upload_ids_c
.lock()
.unwrap()
.push(uid.expect("missing upload_id on complete"));
return http::Response::builder()
.status(200)
.body(r#"{"version": 5}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/abort" {
abort_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(String::new())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
);
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table
.add(vec![batch])
.write_parallelism(2)
.execute()
.await
.unwrap();
assert_eq!(result.version, 5);
assert_eq!(create_count.load(Ordering::SeqCst), 1);
assert!(
insert_count.load(Ordering::SeqCst) > 1,
"Expected multiple insert calls, got {}",
insert_count.load(Ordering::SeqCst)
);
assert_eq!(complete_count.load(Ordering::SeqCst), 1);
assert_eq!(abort_count.load(Ordering::SeqCst), 0);
let ids = upload_ids.lock().unwrap();
assert!(
ids.iter().all(|id| id == "test-upload-123"),
"All requests should use the same upload_id, got: {:?}",
*ids
);
}
#[tokio::test]
async fn test_multipart_write_fallback_old_server() {
let insert_count = Arc::new(AtomicUsize::new(0));
let create_count = Arc::new(AtomicUsize::new(0));
let insert_count_c = insert_count.clone();
let create_count_c = create_count.clone();
// Server version 0.3.0 does not support multipart writes
let table = Table::new_with_handler_version(
"my_table",
semver::Version::new(0, 3, 0),
move |request| {
let path = request.url().path();
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path.contains("multipart_write") {
create_count_c.fetch_add(1, Ordering::SeqCst);
panic!("Should not call multipart write endpoints on old server");
}
if path == "/v1/table/my_table/insert/" {
let query = request.url().query().unwrap_or("");
assert!(
!query.contains("upload_id"),
"Should not have upload_id for old server"
);
insert_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(r#"{"version": 2}"#.to_string())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
);
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table
.add(vec![batch])
.write_parallelism(2)
.execute()
.await
.unwrap();
assert_eq!(result.version, 2);
assert_eq!(create_count.load(Ordering::SeqCst), 0);
assert_eq!(insert_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multipart_write_small_data_single_partition() {
let insert_count = Arc::new(AtomicUsize::new(0));
let create_count = Arc::new(AtomicUsize::new(0));
let insert_count_c = insert_count.clone();
let create_count_c = create_count.clone();
let table = Table::new_with_handler_version(
"my_table",
semver::Version::new(0, 4, 0),
move |request| {
let path = request.url().path();
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path.contains("multipart_write") {
create_count_c.fetch_add(1, Ordering::SeqCst);
panic!("Should not call multipart write endpoints for small data");
}
if path == "/v1/table/my_table/insert/" {
let query = request.url().query().unwrap_or("");
assert!(
!query.contains("upload_id"),
"Should not have upload_id for small data"
);
insert_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(r#"{"version": 2}"#.to_string())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
);
// Small data: only 3 rows
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table.add(vec![batch]).execute().await.unwrap();
assert_eq!(result.version, 2);
assert_eq!(create_count.load(Ordering::SeqCst), 0);
assert_eq!(insert_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multipart_write_abort_on_insert_failure() {
let create_count = Arc::new(AtomicUsize::new(0));
let insert_count = Arc::new(AtomicUsize::new(0));
let complete_count = Arc::new(AtomicUsize::new(0));
let abort_count = Arc::new(AtomicUsize::new(0));
let create_count_c = create_count.clone();
let insert_count_c = insert_count.clone();
let complete_count_c = complete_count.clone();
let abort_count_c = abort_count.clone();
let table = Table::new_with_handler_version(
"my_table",
semver::Version::new(0, 4, 0),
move |request| {
let path = request.url().path();
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path == "/v1/table/my_table/multipart_write/create" {
create_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(r#"{"upload_id": "test-upload-456"}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/insert/" {
let count = insert_count_c.fetch_add(1, Ordering::SeqCst);
// Fail on the first insert with non-retryable status
if count == 0 {
return http::Response::builder()
.status(400)
.body("Bad Request".to_string())
.unwrap();
}
return http::Response::builder()
.status(200)
.body(r#"{"version": 1}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/complete" {
complete_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(r#"{"version": 5}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/abort" {
abort_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(String::new())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
);
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table.add(vec![batch]).write_parallelism(2).execute().await;
assert!(result.is_err());
assert_eq!(create_count.load(Ordering::SeqCst), 1);
assert_eq!(complete_count.load(Ordering::SeqCst), 0);
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multipart_write_abort_on_complete_failure() {
let abort_count = Arc::new(AtomicUsize::new(0));
let abort_count_c = abort_count.clone();
let table = Table::new_with_handler_version(
"my_table",
semver::Version::new(0, 4, 0),
move |request| {
let path = request.url().path();
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path == "/v1/table/my_table/multipart_write/create" {
return http::Response::builder()
.status(200)
.body(r#"{"upload_id": "test-upload-789"}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/insert/" {
return http::Response::builder()
.status(200)
.body(r#"{"version": 1}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/complete" {
return http::Response::builder()
.status(400)
.body("Bad Request".to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/abort" {
abort_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(String::new())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
);
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table.add(vec![batch]).write_parallelism(2).execute().await;
assert!(result.is_err());
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
}
fn retry_config_no_backoff() -> ClientConfig {
ClientConfig {
retry_config: RetryConfig {
retries: Some(3),
connect_retries: Some(3),
read_retries: Some(3),
backoff_factor: Some(0.0),
backoff_jitter: Some(0.0),
statuses: Some(vec![502, 503]),
},
..Default::default()
}
}
#[tokio::test]
async fn test_multipart_write_retry_on_partition_failure() {
// All inserts for the first upload session return 503 (retryable).
// After exhausting internal retries, the outer loop retries with a
// new session and succeeds.
let create_count = Arc::new(AtomicUsize::new(0));
let complete_count = Arc::new(AtomicUsize::new(0));
let abort_count = Arc::new(AtomicUsize::new(0));
let create_count_c = create_count.clone();
let complete_count_c = complete_count.clone();
let abort_count_c = abort_count.clone();
let table = Table::new_with_handler_version_and_config(
"my_table",
semver::Version::new(0, 4, 0),
move |request| {
let path = request.url().path();
let query = request.url().query().unwrap_or("");
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path == "/v1/table/my_table/multipart_write/create" {
let n = create_count_c.fetch_add(1, Ordering::SeqCst);
let body = format!(r#"{{"upload_id": "upload-{}"}}"#, n + 1);
return http::Response::builder().status(200).body(body).unwrap();
}
if path == "/v1/table/my_table/insert/" {
// Fail all inserts for the first session
if query.contains("upload_id=upload-1") {
return http::Response::builder()
.status(503)
.body("Service Unavailable".to_string())
.unwrap();
}
return http::Response::builder()
.status(200)
.body(r#"{"version": 1}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/complete" {
complete_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(r#"{"version": 7}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/abort" {
abort_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(String::new())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
retry_config_no_backoff(),
);
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table
.add(vec![batch])
.write_parallelism(2)
.execute()
.await
.unwrap();
assert_eq!(result.version, 7);
assert_eq!(create_count.load(Ordering::SeqCst), 2);
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
assert_eq!(complete_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_multipart_write_retry_on_complete_failure() {
// Complete returns 503 for the first session, succeeds for the second.
let create_count = Arc::new(AtomicUsize::new(0));
let abort_count = Arc::new(AtomicUsize::new(0));
let create_count_c = create_count.clone();
let abort_count_c = abort_count.clone();
let table = Table::new_with_handler_version_and_config(
"my_table",
semver::Version::new(0, 4, 0),
move |request| {
let path = request.url().path();
let query = request.url().query().unwrap_or("");
if path == "/v1/table/my_table/describe/" {
return simple_describe_response();
}
if path == "/v1/table/my_table/multipart_write/create" {
let n = create_count_c.fetch_add(1, Ordering::SeqCst);
let body = format!(r#"{{"upload_id": "upload-{}"}}"#, n + 1);
return http::Response::builder().status(200).body(body).unwrap();
}
if path == "/v1/table/my_table/insert/" {
return http::Response::builder()
.status(200)
.body(r#"{"version": 1}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/complete" {
// Fail complete for first session
if query.contains("upload_id=upload-1") {
return http::Response::builder()
.status(503)
.body("Service Unavailable".to_string())
.unwrap();
}
return http::Response::builder()
.status(200)
.body(r#"{"version": 9}"#.to_string())
.unwrap();
}
if path == "/v1/table/my_table/multipart_write/abort" {
abort_count_c.fetch_add(1, Ordering::SeqCst);
return http::Response::builder()
.status(200)
.body(String::new())
.unwrap();
}
panic!("Unexpected request path: {}", path);
},
retry_config_no_backoff(),
);
let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap();
let result = table
.add(vec![batch])
.write_parallelism(2)
.execute()
.await
.unwrap();
assert_eq!(result.version, 9);
assert_eq!(create_count.load(Ordering::SeqCst), 2);
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
}
}

View File

@@ -12,7 +12,9 @@ use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
use futures::StreamExt;
use http::header::CONTENT_TYPE;
@@ -25,10 +27,12 @@ use crate::table::datafusion::insert::COUNT_SCHEMA;
/// ExecutionPlan for inserting data into a remote LanceDB table.
///
/// This plan:
/// 1. Requires single partition (no parallel remote inserts yet)
/// 2. Streams data as Arrow IPC to `/v1/table/{id}/insert/` endpoint
/// 3. Stores AddResult for retrieval after execution
/// Streams data as Arrow IPC to `/v1/table/{id}/insert/` endpoint.
///
/// When `upload_id` is set, inserts are staged as part of a multipart write
/// session and the plan supports multiple partitions for parallel uploads.
/// Without `upload_id`, the plan requires a single partition and commits
/// immediately.
#[derive(Debug)]
pub struct RemoteInsertExec<S: HttpSend = Sender> {
table_name: String,
@@ -38,10 +42,11 @@ pub struct RemoteInsertExec<S: HttpSend = Sender> {
overwrite: bool,
properties: PlanProperties,
add_result: Arc<Mutex<Option<AddResult>>>,
upload_id: Option<String>,
}
impl<S: HttpSend + 'static> RemoteInsertExec<S> {
/// Create a new RemoteInsertExec.
/// Create a new single-partition RemoteInsertExec.
pub fn new(
table_name: String,
identifier: String,
@@ -49,10 +54,49 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
) -> Self {
Self::new_inner(table_name, identifier, client, input, overwrite, None)
}
/// Create a multi-partition RemoteInsertExec for use with multipart writes.
///
/// Each partition's insert is staged under the given `upload_id` without
/// committing. The caller is responsible for calling the complete (or abort)
/// endpoint after all partitions finish.
pub fn new_multipart(
table_name: String,
identifier: String,
client: RestfulLanceDbClient<S>,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
upload_id: String,
) -> Self {
Self::new_inner(
table_name,
identifier,
client,
input,
overwrite,
Some(upload_id),
)
}
fn new_inner(
table_name: String,
identifier: String,
client: RestfulLanceDbClient<S>,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
upload_id: Option<String>,
) -> Self {
let num_partitions = if upload_id.is_some() {
input.output_partitioning().partition_count()
} else {
1
};
let schema = COUNT_SCHEMA.clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(schema),
datafusion_physical_plan::Partitioning::UnknownPartitioning(1),
datafusion_physical_plan::Partitioning::UnknownPartitioning(num_partitions),
datafusion_physical_plan::execution_plan::EmissionType::Final,
datafusion_physical_plan::execution_plan::Boundedness::Bounded,
);
@@ -65,6 +109,7 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
overwrite,
properties,
add_result: Arc::new(Mutex::new(None)),
upload_id,
}
}
@@ -174,8 +219,11 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
}
fn required_input_distribution(&self) -> Vec<datafusion_physical_plan::Distribution> {
// Until we have a separate commit endpoint, we need to do all inserts in a single partition
vec![datafusion_physical_plan::Distribution::SinglePartition]
if self.upload_id.is_some() {
vec![datafusion_physical_plan::Distribution::UnspecifiedDistribution]
} else {
vec![datafusion_physical_plan::Distribution::SinglePartition]
}
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
@@ -191,12 +239,13 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
"RemoteInsertExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
Ok(Arc::new(Self::new_inner(
self.table_name.clone(),
self.identifier.clone(),
self.client.clone(),
children[0].clone(),
self.overwrite,
self.upload_id.clone(),
)))
}
@@ -205,18 +254,20 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
if partition != 0 {
if self.upload_id.is_none() && partition != 0 {
return Err(DataFusionError::Internal(
"RemoteInsertExec only supports single partition execution".to_string(),
"RemoteInsertExec only supports single partition execution without upload_id"
.to_string(),
));
}
let input_stream = self.input.execute(0, context)?;
let input_stream = self.input.execute(partition, context)?;
let client = self.client.clone();
let identifier = self.identifier.clone();
let overwrite = self.overwrite;
let add_result = self.add_result.clone();
let table_name = self.table_name.clone();
let upload_id = self.upload_id.clone();
let stream = futures::stream::once(async move {
let mut request = client
@@ -226,6 +277,9 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
if overwrite {
request = request.query(&[("mode", "overwrite")]);
}
if let Some(ref uid) = upload_id {
request = request.query(&[("upload_id", uid.as_str())]);
}
let (error_tx, mut error_rx) = tokio::sync::oneshot::channel();
let body = Self::stream_as_http_body(input_stream, error_tx)?;
@@ -262,28 +316,30 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
let (request_id, response) = result?;
let body_text = response.text().await.map_err(|e| {
DataFusionError::External(Box::new(Error::Http {
source: Box::new(e),
request_id: request_id.clone(),
status_code: None,
}))
})?;
let parsed_result = if body_text.trim().is_empty() {
// Backward compatible with old servers
AddResult { version: 0 }
} else {
serde_json::from_str(&body_text).map_err(|e| {
// For multipart writes, the staging response is not the final
// version. Only parse AddResult for non-multipart inserts.
if upload_id.is_none() {
let body_text = response.text().await.map_err(|e| {
DataFusionError::External(Box::new(Error::Http {
source: format!("Failed to parse add response: {}", e).into(),
source: Box::new(e),
request_id: request_id.clone(),
status_code: None,
}))
})?
};
})?;
let parsed_result = if body_text.trim().is_empty() {
// Backward compatible with old servers
AddResult { version: 0 }
} else {
serde_json::from_str(&body_text).map_err(|e| {
DataFusionError::External(Box::new(Error::Http {
source: format!("Failed to parse add response: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
}))
})?
};
{
let mut res_lock = add_result.lock().map_err(|_| {
DataFusionError::Execution("Failed to acquire lock for add_result".to_string())
})?;

View File

@@ -75,6 +75,7 @@ pub mod query;
pub mod schema_evolution;
pub mod update;
use crate::index::waiter::wait_for_index;
pub(crate) use add_data::PreprocessingOutput;
pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
pub use chrono::Duration;
pub use delete::DeleteResult;
@@ -440,6 +441,34 @@ mod test_utils {
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version_and_config<T>(
name: impl Into<String>,
version: semver::Version,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
config: crate::remote::ClientConfig,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(
crate::remote::table::RemoteTable::new_mock_with_version_and_config(
name.into(),
handler.clone(),
Some(version),
config.clone(),
),
);
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock_with_config(
handler, config,
));
Self {
inner,
database: Some(database),
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
}
}
@@ -2198,21 +2227,26 @@ impl BaseTable for NativeTable {
let table_schema = Schema::from(&ds.schema().clone());
// Peek at the first batch to estimate a good partition count for
// write parallelism.
let mut peeked = PeekedScannable::new(add.data);
let num_partitions = if let Some(first_batch) = peeked.peek().await {
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
estimate_write_partitions(
first_batch.get_array_memory_size(),
first_batch.num_rows(),
peeked.num_rows(),
max_partitions,
)
let num_partitions = if let Some(parallelism) = add.write_parallelism {
parallelism
} else {
1
// Peek at the first batch to estimate a good partition count for
// write parallelism.
let mut peeked = PeekedScannable::new(add.data);
let n = if let Some(first_batch) = peeked.peek().await {
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
estimate_write_partitions(
first_batch.get_array_memory_size(),
first_batch.num_rows(),
peeked.num_rows(),
max_partitions,
)
} else {
1
};
add.data = Box::new(peeked);
n
};
add.data = Box::new(peeked);
let output = add.into_plan(&table_schema, &table_def)?;

View File

@@ -52,6 +52,7 @@ pub struct AddDataBuilder {
pub(crate) write_options: WriteOptions,
pub(crate) on_nan_vectors: NaNVectorBehavior,
pub(crate) embedding_registry: Option<Arc<dyn EmbeddingRegistry>>,
pub(crate) write_parallelism: Option<usize>,
}
impl std::fmt::Debug for AddDataBuilder {
@@ -77,6 +78,7 @@ impl AddDataBuilder {
write_options: WriteOptions::default(),
on_nan_vectors: NaNVectorBehavior::default(),
embedding_registry,
write_parallelism: None,
}
}
@@ -101,7 +103,22 @@ impl AddDataBuilder {
self
}
/// Set the number of parallel write streams.
///
/// By default, the number of streams is estimated from the data size.
/// Setting this to `1` disables parallel writes.
pub fn write_parallelism(mut self, parallelism: usize) -> Self {
self.write_parallelism = Some(parallelism);
self
}
pub async fn execute(self) -> Result<AddResult> {
if self.write_parallelism.map(|p| p == 0).unwrap_or(false) {
return Err(Error::InvalidInput {
message: "write_parallelism must be greater than 0".to_string(),
});
}
self.parent.clone().add(self).await
}