Compare commits

...

6 Commits

Author SHA1 Message Date
Lance Release
39614fdb7d Bump version: 0.22.0-beta.9 → 0.22.0-beta.10 2025-04-22 18:23:17 +00:00
Ryan Green
96d534d4bc feat: add retries to remote client for requests with stream bodies (#2349)
Closes https://github.com/lancedb/lancedb/issues/2307
* Adds retries to remote operations with stream bodies (add,
merge_insert)
* Change default retryable status codes to 409, 429, 500, 502, 503, 504
* Don't retry add or merge_insert operations on 5xx responses

Notes:
* Supporting retries on stream bodies means we have to buffer the body
into memory so it can be cloned on retry. This will impact memory use
patterns for the remote client. This buffering can be disabled by
disabling retries (i.e. setting retries to 0 in RetryConfig)
* It does not seem that retry config can be specified by env vars as the
documentation suggests. I added a follow-up issue
[here](https://github.com/lancedb/lancedb/issues/2350)



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Enhanced retry support for remote requests with configurable limits
and exponential backoff with jitter.
- Added robust retry logic for streaming data uploads, enabling retries
with buffered data to ensure reliability.

- **Bug Fixes**
- Improved error handling and retry behavior for HTTP status codes 409
and 504.

- **Refactor**
- Centralized and modularized HTTP request sending and retry logic
across remote database and table operations.
  - Streamlined request ID management for improved traceability.
- Simplified error message construction in index waiting functionality.

- **Tests**
  - Added a test verifying merge-insert retries on HTTP 409 responses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 15:40:44 -02:30
Lance Release
5051d30d09 Updating package-lock.json 2025-04-21 23:55:43 +00:00
Lance Release
db853c4041 Updating package-lock.json 2025-04-21 22:50:56 +00:00
Lance Release
76d1d22bdc Updating package-lock.json 2025-04-21 22:50:40 +00:00
Lance Release
d8746c61c6 Bump version: 0.19.0-beta.8 → 0.19.0-beta.9 2025-04-21 22:50:20 +00:00
27 changed files with 473 additions and 305 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.19.0-beta.8"
current_version = "0.19.0-beta.9"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

119
Cargo.lock generated
View File

@@ -128,9 +128,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.97"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "arbitrary"
@@ -390,9 +390,9 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.22"
version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64"
checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07"
dependencies = [
"flate2",
"futures-core",
@@ -564,9 +564,9 @@ dependencies = [
[[package]]
name = "aws-lc-sys"
version = "0.28.0"
version = "0.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f7720b74ed28ca77f90769a71fd8c637a0137f6fae4ae947e1050229cff57f"
checksum = "bfa9b6986f250236c27e5a204062434a773a13243d2ffc2955f37bdba4c5c6a1"
dependencies = [
"bindgen",
"cc",
@@ -882,7 +882,7 @@ dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
"aws-smithy-types",
"h2 0.4.8",
"h2 0.4.9",
"http 0.2.12",
"http 1.3.1",
"http-body 0.4.6",
@@ -1185,9 +1185,9 @@ dependencies = [
[[package]]
name = "blake3"
version = "1.8.1"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389a099b34312839e16420d499a9cad9650541715937ffbdd40d36f49e77eeb3"
checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0"
dependencies = [
"arrayref",
"arrayvec",
@@ -2377,7 +2377,16 @@ version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
dependencies = [
"dirs-sys",
"dirs-sys 0.4.1",
]
[[package]]
name = "dirs"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e"
dependencies = [
"dirs-sys 0.5.0",
]
[[package]]
@@ -2388,10 +2397,22 @@ checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
dependencies = [
"libc",
"option-ext",
"redox_users",
"redox_users 0.4.6",
"windows-sys 0.48.0",
]
[[package]]
name = "dirs-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab"
dependencies = [
"libc",
"option-ext",
"redox_users 0.5.0",
"windows-sys 0.59.0",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -2558,9 +2579,9 @@ dependencies = [
[[package]]
name = "ethnum"
version = "1.5.0"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c"
checksum = "0939f82868b77ef93ce3c3c3daf2b3c526b456741da5a1a4559e590965b6026b"
[[package]]
name = "event-listener"
@@ -3049,9 +3070,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
dependencies = [
"atomic-waker",
"bytes",
@@ -3138,7 +3159,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc03dcb0b0a83ae3f3363ec811014ae669f083e4e499c66602f447c4828737a1"
dependencies = [
"dirs",
"dirs 5.0.1",
"futures",
"http 1.3.1",
"indicatif",
@@ -3286,7 +3307,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.8",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
@@ -3645,9 +3666,9 @@ checksum = "9028f49264629065d057f340a86acb84867925865f73bbf8d47b4d149a7e88b8"
[[package]]
name = "jiff"
version = "0.2.6"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f33145a5cbea837164362c7bd596106eb7c5198f97d1ba6f6ebb3223952e488"
checksum = "5a064218214dc6a10fbae5ec5fa888d80c45d611aba169222fc272072bf7aef6"
dependencies = [
"jiff-static",
"log",
@@ -3658,9 +3679,9 @@ dependencies = [
[[package]]
name = "jiff-static"
version = "0.2.6"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43ce13c40ec6956157a3635d97a1ee2df323b263f09ea14165131289cb0f5c19"
checksum = "199b7932d97e325aff3a7030e141eafe7f2c6268e1d1b24859b753a627f45254"
dependencies = [
"proc-macro2",
"quote",
@@ -3965,7 +3986,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-sql",
"deepsize",
"dirs",
"dirs 5.0.1",
"fst",
"futures",
"half",
@@ -4115,7 +4136,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.19.0-beta.8"
version = "0.19.0-beta.9"
dependencies = [
"arrow",
"arrow-array",
@@ -4202,7 +4223,7 @@ dependencies = [
[[package]]
name = "lancedb-node"
version = "0.19.0-beta.8"
version = "0.19.0-beta.9"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4227,7 +4248,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.19.0-beta.8"
version = "0.19.0-beta.9"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4245,7 +4266,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.22.0-beta.8"
version = "0.22.0-beta.9"
dependencies = [
"arrow",
"env_logger",
@@ -4342,9 +4363,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.171"
version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libloading"
@@ -4368,9 +4389,9 @@ dependencies = [
[[package]]
name = "libm"
version = "0.2.11"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa"
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
[[package]]
name = "libredox"
@@ -5637,9 +5658,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.94"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
dependencies = [
"unicode-ident",
]
@@ -5837,13 +5858,13 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.11.10"
version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc"
checksum = "bcbafbbdbb0f638fe3f35f3c56739f77a8a1d070cb25603226c83339b391472b"
dependencies = [
"bytes",
"getrandom 0.3.2",
"rand 0.9.0",
"rand 0.9.1",
"ring",
"rustc-hash 2.1.1",
"rustls 0.23.26",
@@ -5903,13 +5924,12 @@ dependencies = [
[[package]]
name = "rand"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"zerocopy 0.8.24",
]
[[package]]
@@ -6084,6 +6104,17 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "redox_users"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom 0.2.15",
"libredox",
"thiserror 2.0.12",
]
[[package]]
name = "regex"
version = "1.11.1"
@@ -6152,7 +6183,7 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.4.8",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
@@ -6701,11 +6732,11 @@ dependencies = [
[[package]]
name = "shellexpand"
version = "3.1.0"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b"
checksum = "8b1fdf65dd6331831494dd616b30351c38e96e45921a27745cf98490458b90bb"
dependencies = [
"dirs",
"dirs 6.0.0",
]
[[package]]
@@ -6716,9 +6747,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
dependencies = [
"libc",
]

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.8</version>
<version>0.19.0-beta.9</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.8</version>
<version>0.19.0-beta.9</version>
<packaging>pom</packaging>
<name>LanceDB Parent</name>

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"cpu": [
"x64",
"arm64"
@@ -52,11 +52,11 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.8",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.8",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.8",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.8",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.8"
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.9",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.9",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.9"
},
"peerDependencies": {
"@apache-arrow/ts": "^14.0.2",
@@ -327,9 +327,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.8.tgz",
"integrity": "sha512-zNKTlHemHUyU3+WtIQ029tZSl5C5hXWvwI073kfKuYOWGSRZeOcrU8WAuS9b17nfFD40X28YUD5qPB10GbMrNQ==",
"version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.9.tgz",
"integrity": "sha512-Sc4FFWT7ck7U2BKXnFl5EdPsb2ay8yOX3AqpIYbhenhpAkzLBS8NPUztuhybFoPeLSWKVUtwp9NGAxNyL9lgrQ==",
"cpu": [
"arm64"
],
@@ -340,9 +340,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.8.tgz",
"integrity": "sha512-OdnduXdX5ZTZd2s+5wW5gssDYQKwEfUKxjOWOjjLS8SQeTlPM6pI0z9QP9K1sipbTYpYoCgokr5+PKKhvMPezw==",
"version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.9.tgz",
"integrity": "sha512-KiYzFFX7Hvq/OvsA7VZt0xfGRW4eoDcLwgWdXxgsNzB6X6d3eupUqkrTKe3KlHrnm170s5fwSFrJHIU/NPoA/A==",
"cpu": [
"x64"
],
@@ -353,9 +353,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.8.tgz",
"integrity": "sha512-9Y52zhZYFbgCJA3Vxj8EFnZ8lVuvqAJNapQPo7bH56ZgnEcAnWikk8yWwT63PtI22T6XOcj1hWWYfWKrUXMggg==",
"version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.9.tgz",
"integrity": "sha512-TCK/PEqvoVa/oxYHWsjpB+BaQ/KDfwOI76grbezAEDGYS76USAnNb8KB8bDnnadOu9/i4Zsha0Bk0fsHfDSSpg==",
"cpu": [
"arm64"
],
@@ -366,9 +366,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.8.tgz",
"integrity": "sha512-e0H+gSkvMGYx2DPcriXwwkALvZtmbWNtdpMAZceS8qHYv7xMtUPXG86od5vTbhKTrnC2hJLVj5E3JcAs8sJn6w==",
"version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.9.tgz",
"integrity": "sha512-Ti7Z2Am5ziYZmQBpLfVwGWe+zl11xed084Gv6WFcYfyLz6FJ/0+CEMVx1wIsoZ1gq6tvH0vgZLYxsMF5C5xVnw==",
"cpu": [
"x64"
],
@@ -379,9 +379,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.8.tgz",
"integrity": "sha512-olQKVpoWKJWOuVsFM92hmtHYFpCtITiKhUQ8gZu7ngrgLe7ofAASyqvWp5THV2zSXpwYITqrYjHOrtLy1/I9Jw==",
"version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.9.tgz",
"integrity": "sha512-MUSECQGbpRi3qbG6CsLJAInT0ZIMpr/RBhEx5CUlwZiEIWaUc/CQMoNJh7W+1s5Cd3UMBgkMpgmfj21Qd9fsHw==",
"cpu": [
"x64"
],

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"description": " Serverless, low-latency vector database for AI applications",
"private": false,
"main": "dist/index.js",
@@ -89,10 +89,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.8",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.8",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.8",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.8",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.8"
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.9",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.9",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.9"
}
}

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.19.0-beta.8"
version = "0.19.0-beta.9"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.9",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.22.0-beta.9"
current_version = "0.22.0-beta.10"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.22.0-beta.9"
version = "0.22.0-beta.10"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-node"
version = "0.19.0-beta.8"
version = "0.19.0-beta.9"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.19.0-beta.8"
version = "0.19.0-beta.9"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -20,7 +20,7 @@ pub async fn wait_for_index(
) -> Result<()> {
if timeout > MAX_WAIT {
return Err(Error::InvalidInput {
message: format!("timeout must be less than {:?}", MAX_WAIT).to_string(),
message: format!("timeout must be less than {:?}", MAX_WAIT),
});
}
let start = Instant::now();
@@ -84,7 +84,6 @@ pub async fn wait_for_index(
message: format!(
"timed out waiting for indices: {:?} after {:?}",
remaining, timeout
)
.to_string(),
),
})
}

View File

@@ -8,6 +8,7 @@
pub(crate) mod client;
pub(crate) mod db;
mod retry;
pub(crate) mod table;
pub(crate) mod util;

View File

@@ -1,17 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use http::HeaderName;
use log::debug;
use reqwest::{
header::{HeaderMap, HeaderValue},
Request, RequestBuilder, Response,
Body, Request, RequestBuilder, Response,
};
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use crate::error::{Error, Result};
use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
@@ -118,41 +118,14 @@ pub struct RetryConfig {
/// You can also set the `LANCE_CLIENT_RETRY_STATUSES` environment variable
/// to set this value. Use a comma-separated list of integer values.
///
/// The default is 429, 500, 502, 503.
/// Note that write operations will never be retried on 5xx errors as this may
/// result in duplicated writes.
///
/// The default is 409, 429, 500, 502, 503, 504.
pub statuses: Option<Vec<u16>>,
// TODO: should we allow customizing methods?
}
#[derive(Debug, Clone)]
struct ResolvedRetryConfig {
retries: u8,
connect_retries: u8,
read_retries: u8,
backoff_factor: f32,
backoff_jitter: f32,
statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![429, 500, 502, 503])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}
// We use the `HttpSend` trait to abstract over the `reqwest::Client` so that
// we can mock responses in tests. Based on the patterns from this blog post:
// https://write.as/balrogboogie/testing-reqwest-based-clients
@@ -160,8 +133,8 @@ impl TryFrom<RetryConfig> for ResolvedRetryConfig {
pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
client: reqwest::Client,
host: String,
retry_config: ResolvedRetryConfig,
sender: S,
pub(crate) retry_config: ResolvedRetryConfig,
pub(crate) sender: S,
}
pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static {
@@ -375,74 +348,69 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.client.post(full_uri)
}
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
pub async fn send(&self, req: RequestBuilder) -> Result<(String, Response)> {
let (client, request) = req.build_split();
let mut request = request.unwrap();
let request_id = self.extract_request_id(&mut request);
self.log_request(&request, &request_id);
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
let header = HeaderValue::from_str(&request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
request_id
};
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
if with_retry {
self.send_with_retry_impl(client, request, request_id).await
} else {
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
async fn send_with_retry_impl(
/// Send the request using retries configured in the RetryConfig.
/// If retry_5xx is false, 5xx requests will not be retried regardless of the statuses configured
/// in the RetryConfig.
/// Since this requires arrow serialization, this is implemented here instead of in RestfulLanceDbClient
pub async fn send_with_retry(
&self,
client: reqwest::Client,
req: Request,
request_id: String,
req_builder: RequestBuilder,
mut make_body: Option<Box<dyn FnMut() -> Result<Body> + Send + 'static>>,
retry_5xx: bool,
) -> Result<(String, Response)> {
let mut retry_counter = RetryCounter::new(&self.retry_config, request_id);
let retry_config = &self.retry_config;
let non_5xx_statuses = retry_config
.statuses
.iter()
.filter(|s| !s.is_server_error())
.cloned()
.collect::<Vec<_>>();
// clone and build the request to extract the request id
let tmp_req = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let (_, r) = tmp_req.build_split();
let mut r = r.unwrap();
let request_id = self.extract_request_id(&mut r);
let mut retry_counter = RetryCounter::new(retry_config, request_id.clone());
loop {
// This only works if the request body is not a stream. If it is
// a stream, we can't use the retry path. We would need to implement
// an outer retry.
let request = req.try_clone().ok_or_else(|| Error::Runtime {
let mut req_builder = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let response = self
.sender
.send(&client, request)
.await
.map(|r| (r.status(), r));
// set the streaming body on the request builder after clone
if let Some(body_gen) = make_body.as_mut() {
let body = body_gen()?;
req_builder = req_builder.body(body);
}
let (c, request) = req_builder.build_split();
let mut request = request.unwrap();
self.set_request_id(&mut request, &request_id.clone());
self.log_request(&request, &request_id);
let response = self.sender.send(&c, request).await.map(|r| (r.status(), r));
match response {
Ok((status, response)) if status.is_success() => {
debug!(
@@ -451,7 +419,10 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
return Ok((retry_counter.request_id, response));
}
Ok((status, response)) if self.retry_config.statuses.contains(&status) => {
Ok((status, response))
if (retry_5xx && retry_config.statuses.contains(&status))
|| non_5xx_statuses.contains(&status) =>
{
let source = self
.check_response(&retry_counter.request_id, response)
.await
@@ -480,6 +451,47 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
}
}
fn log_request(&self, request: &Request, request_id: &String) {
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
}
/// Extract the request ID from the request headers.
/// If the request ID header is not set, this will generate a new one and set
/// it on the request headers
pub fn extract_request_id(&self, request: &mut Request) -> String {
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
self.set_request_id(request, &request_id);
request_id
};
request_id
}
/// Set the request ID header
pub fn set_request_id(&self, request: &mut Request, request_id: &str) {
let header = HeaderValue::from_str(request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
}
pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> {
// Try to get the response text, but if that fails, just return the status code
let status = response.status();
@@ -501,91 +513,6 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
}
}
struct RetryCounter<'a> {
request_failures: u8,
connect_failures: u8,
read_failures: u8,
config: &'a ResolvedRetryConfig,
request_id: String,
}
impl<'a> RetryCounter<'a> {
fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
fn increment_request_failures(&mut self, source: crate::Error) -> Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_connect_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_read_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
pub trait RequestResultExt {
type Output;
fn err_to_http(self, request_id: String) -> Result<Self::Output>;

View File

@@ -255,7 +255,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
if let Some(start_after) = request.start_after {
req = req.query(&[("page_token", start_after)]);
}
let (request_id, rsp) = self.client.send(req, true).await?;
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp
@@ -302,7 +302,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, rsp) = self.client.send(req, false).await?;
let (request_id, rsp) = self.client.send(req).await?;
if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await.err_to_http(request_id.clone())?;
@@ -362,7 +362,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, rsp) = self.client.send(req, true).await?;
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: request.name });
}
@@ -383,7 +383,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.client
.post(&format!("/v1/table/{}/rename/", current_name));
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req, false).await?;
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
let table = self.table_cache.remove(current_name).await;
if let Some(table) = table {
@@ -394,7 +394,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn drop_table(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/table/{}/drop/", name));
let (request_id, resp) = self.client.send(req, true).await?;
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(name).await;
Ok(())

View File

@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::remote::RetryConfig;
use crate::Error;
use log::debug;
use std::time::Duration;
pub struct RetryCounter<'a> {
pub request_failures: u8,
pub connect_failures: u8,
pub read_failures: u8,
pub config: &'a ResolvedRetryConfig,
pub request_id: String,
}
impl<'a> RetryCounter<'a> {
pub(crate) fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> crate::Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
pub fn increment_request_failures(&mut self, source: crate::Error) -> crate::Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_connect_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_read_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
#[derive(Debug, Clone)]
pub struct ResolvedRetryConfig {
pub retries: u8,
pub connect_retries: u8,
pub read_retries: u8,
pub backoff_factor: f32,
pub backoff_jitter: f32,
pub statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> crate::Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![409, 429, 500, 502, 503, 504])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}

View File

@@ -7,7 +7,7 @@ use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error, Table};
use arrow_array::RecordBatchReader;
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
@@ -21,6 +21,7 @@ use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec};
use reqwest::{RequestBuilder, Response};
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use std::pin::Pin;
@@ -83,7 +84,7 @@ impl<S: HttpSend> RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -127,6 +128,61 @@ impl<S: HttpSend> RemoteTable<S> {
Ok(reqwest::Body::wrap_stream(body_stream))
}
/// Buffer the reader into memory
async fn buffer_reader<R: RecordBatchReader + ?Sized>(
reader: &mut R,
) -> Result<(SchemaRef, Vec<RecordBatch>)> {
let schema = reader.schema();
let mut batches = Vec::new();
for batch in reader {
batches.push(batch?);
}
Ok((schema, batches))
}
/// Create a new RecordBatchReader from buffered data
fn make_reader(schema: SchemaRef, batches: Vec<RecordBatch>) -> impl RecordBatchReader {
let iter = batches.into_iter().map(Ok);
RecordBatchIterator::new(iter, schema)
}
async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
let res = if with_retry {
self.client.send_with_retry(req, None, true).await?
} else {
self.client.send(req).await?
};
Ok(res)
}
/// Send the request with streaming body.
/// This will use retries if with_retry is set and the number of configured retries is > 0.
/// If retries are enabled, the stream will be buffered into memory.
async fn send_streaming(
&self,
req: RequestBuilder,
mut data: Box<dyn RecordBatchReader + Send>,
with_retry: bool,
) -> Result<(String, Response)> {
if !with_retry || self.client.retry_config.retries == 0 {
let body = Self::reader_as_body(data)?;
return self.client.send(req.body(body)).await;
}
// to support retries, buffer into memory and clone the batches on each retry
let (schema, batches) = Self::buffer_reader(&mut *data).await?;
let make_body = Box::new(move || {
let reader = Self::make_reader(schema.clone(), batches.clone());
Self::reader_as_body(Box::new(reader))
});
let res = self
.client
.send_with_retry(req, Some(make_body), false)
.await?;
Ok(res)
}
async fn check_table_response(
&self,
request_id: &str,
@@ -353,7 +409,7 @@ impl<S: HttpSend> RemoteTable<S> {
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures);
@@ -471,7 +527,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
self.checkout_latest().await?;
Ok(())
@@ -481,7 +537,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.name));
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
@@ -527,7 +583,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request = request.json(&body);
}
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -545,12 +601,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let body = Self::reader_as_body(data)?;
let mut request = self
.client
.post(&format!("/v1/table/{}/insert/", self.name))
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
match add.mode {
AddDataMode::Append => {}
@@ -559,8 +613,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send_streaming(request, data, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
@@ -628,7 +681,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect::<Vec<_>>();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -670,7 +723,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -712,7 +765,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"predicate": update.filter,
}));
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
@@ -726,7 +779,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/delete/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -812,7 +865,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = request.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
@@ -836,21 +889,21 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let query = MergeInsertRequest::try_from(params)?;
let body = Self::reader_as_body(new_data)?;
let request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.name))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> {
self.check_mutable().await?;
Err(Error::NotSupported {
@@ -879,7 +932,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/add_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?; // todo:
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -918,7 +971,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/alter_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -930,7 +983,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/drop_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -944,7 +997,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
@@ -1001,7 +1054,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
if response.status() == StatusCode::NOT_FOUND {
return Ok(None);
@@ -1011,7 +1064,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = response.text().await.err_to_http(request_id.clone())?;
println!("body: {:?}", body);
let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse index statistics: {}", e).into(),
request_id,
@@ -1026,7 +1078,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"/v1/table/{}/index/{}/drop/",
self.name, index_name
));
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -1487,6 +1539,42 @@ mod tests {
assert_eq!(&body, &expected_body);
}
#[tokio::test]
async fn test_merge_insert_retries_on_409() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
// Default parameters
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/merge_insert/");
let params = request.url().query_pairs().collect::<HashMap<_, _>>();
assert_eq!(params["on"], "some_col");
assert_eq!(params["when_matched_update_all"], "false");
assert_eq!(params["when_not_matched_insert_all"], "false");
assert_eq!(params["when_not_matched_by_source_delete"], "false");
assert!(!params.contains_key("when_matched_update_all_filt"));
assert!(!params.contains_key("when_not_matched_by_source_delete_filt"));
http::Response::builder().status(409).body("").unwrap()
});
let e = table
.merge_insert(&["some_col"])
.execute(data)
.await
.unwrap_err();
assert!(e.to_string().contains("Hit retry limit"));
}
#[tokio::test]
async fn test_delete() {
let table = Table::new_with_handler("my_table", |request| {