Compare commits

...

6 Commits

Author SHA1 Message Date
Lance Release
39614fdb7d Bump version: 0.22.0-beta.9 → 0.22.0-beta.10 2025-04-22 18:23:17 +00:00
Ryan Green
96d534d4bc feat: add retries to remote client for requests with stream bodies (#2349)
Closes https://github.com/lancedb/lancedb/issues/2307
* Adds retries to remote operations with stream bodies (add,
merge_insert)
* Change default retryable status codes to 409, 429, 500, 502, 503, 504
* Don't retry add or merge_insert operations on 5xx responses

Notes:
* Supporting retries on stream bodies means we have to buffer the body
into memory so it can be cloned on retry. This will impact memory use
patterns for the remote client. This buffering can be disabled by
disabling retries (i.e. setting retries to 0 in RetryConfig)
* It does not seem that retry config can be specified by env vars as the
documentation suggests. I added a follow-up issue
[here](https://github.com/lancedb/lancedb/issues/2350)



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Enhanced retry support for remote requests with configurable limits
and exponential backoff with jitter.
- Added robust retry logic for streaming data uploads, enabling retries
with buffered data to ensure reliability.

- **Bug Fixes**
- Improved error handling and retry behavior for HTTP status codes 409
and 504.

- **Refactor**
- Centralized and modularized HTTP request sending and retry logic
across remote database and table operations.
  - Streamlined request ID management for improved traceability.
- Simplified error message construction in index waiting functionality.

- **Tests**
  - Added a test verifying merge-insert retries on HTTP 409 responses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 15:40:44 -02:30
Lance Release
5051d30d09 Updating package-lock.json 2025-04-21 23:55:43 +00:00
Lance Release
db853c4041 Updating package-lock.json 2025-04-21 22:50:56 +00:00
Lance Release
76d1d22bdc Updating package-lock.json 2025-04-21 22:50:40 +00:00
Lance Release
d8746c61c6 Bump version: 0.19.0-beta.8 → 0.19.0-beta.9 2025-04-21 22:50:20 +00:00
27 changed files with 473 additions and 305 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "0.19.0-beta.8" current_version = "0.19.0-beta.9"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

119
Cargo.lock generated
View File

@@ -128,9 +128,9 @@ dependencies = [
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.97" version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]] [[package]]
name = "arbitrary" name = "arbitrary"
@@ -390,9 +390,9 @@ dependencies = [
[[package]] [[package]]
name = "async-compression" name = "async-compression"
version = "0.4.22" version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64" checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07"
dependencies = [ dependencies = [
"flate2", "flate2",
"futures-core", "futures-core",
@@ -564,9 +564,9 @@ dependencies = [
[[package]] [[package]]
name = "aws-lc-sys" name = "aws-lc-sys"
version = "0.28.0" version = "0.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f7720b74ed28ca77f90769a71fd8c637a0137f6fae4ae947e1050229cff57f" checksum = "bfa9b6986f250236c27e5a204062434a773a13243d2ffc2955f37bdba4c5c6a1"
dependencies = [ dependencies = [
"bindgen", "bindgen",
"cc", "cc",
@@ -882,7 +882,7 @@ dependencies = [
"aws-smithy-async", "aws-smithy-async",
"aws-smithy-runtime-api", "aws-smithy-runtime-api",
"aws-smithy-types", "aws-smithy-types",
"h2 0.4.8", "h2 0.4.9",
"http 0.2.12", "http 0.2.12",
"http 1.3.1", "http 1.3.1",
"http-body 0.4.6", "http-body 0.4.6",
@@ -1185,9 +1185,9 @@ dependencies = [
[[package]] [[package]]
name = "blake3" name = "blake3"
version = "1.8.1" version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389a099b34312839e16420d499a9cad9650541715937ffbdd40d36f49e77eeb3" checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0"
dependencies = [ dependencies = [
"arrayref", "arrayref",
"arrayvec", "arrayvec",
@@ -2377,7 +2377,16 @@ version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
dependencies = [ dependencies = [
"dirs-sys", "dirs-sys 0.4.1",
]
[[package]]
name = "dirs"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e"
dependencies = [
"dirs-sys 0.5.0",
] ]
[[package]] [[package]]
@@ -2388,10 +2397,22 @@ checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
dependencies = [ dependencies = [
"libc", "libc",
"option-ext", "option-ext",
"redox_users", "redox_users 0.4.6",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "dirs-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab"
dependencies = [
"libc",
"option-ext",
"redox_users 0.5.0",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "displaydoc" name = "displaydoc"
version = "0.2.5" version = "0.2.5"
@@ -2558,9 +2579,9 @@ dependencies = [
[[package]] [[package]]
name = "ethnum" name = "ethnum"
version = "1.5.0" version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c" checksum = "0939f82868b77ef93ce3c3c3daf2b3c526b456741da5a1a4559e590965b6026b"
[[package]] [[package]]
name = "event-listener" name = "event-listener"
@@ -3049,9 +3070,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.4.8" version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2" checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
dependencies = [ dependencies = [
"atomic-waker", "atomic-waker",
"bytes", "bytes",
@@ -3138,7 +3159,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc03dcb0b0a83ae3f3363ec811014ae669f083e4e499c66602f447c4828737a1" checksum = "cc03dcb0b0a83ae3f3363ec811014ae669f083e4e499c66602f447c4828737a1"
dependencies = [ dependencies = [
"dirs", "dirs 5.0.1",
"futures", "futures",
"http 1.3.1", "http 1.3.1",
"indicatif", "indicatif",
@@ -3286,7 +3307,7 @@ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
"futures-util", "futures-util",
"h2 0.4.8", "h2 0.4.9",
"http 1.3.1", "http 1.3.1",
"http-body 1.0.1", "http-body 1.0.1",
"httparse", "httparse",
@@ -3645,9 +3666,9 @@ checksum = "9028f49264629065d057f340a86acb84867925865f73bbf8d47b4d149a7e88b8"
[[package]] [[package]]
name = "jiff" name = "jiff"
version = "0.2.6" version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f33145a5cbea837164362c7bd596106eb7c5198f97d1ba6f6ebb3223952e488" checksum = "5a064218214dc6a10fbae5ec5fa888d80c45d611aba169222fc272072bf7aef6"
dependencies = [ dependencies = [
"jiff-static", "jiff-static",
"log", "log",
@@ -3658,9 +3679,9 @@ dependencies = [
[[package]] [[package]]
name = "jiff-static" name = "jiff-static"
version = "0.2.6" version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43ce13c40ec6956157a3635d97a1ee2df323b263f09ea14165131289cb0f5c19" checksum = "199b7932d97e325aff3a7030e141eafe7f2c6268e1d1b24859b753a627f45254"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@@ -3965,7 +3986,7 @@ dependencies = [
"datafusion-physical-expr", "datafusion-physical-expr",
"datafusion-sql", "datafusion-sql",
"deepsize", "deepsize",
"dirs", "dirs 5.0.1",
"fst", "fst",
"futures", "futures",
"half", "half",
@@ -4115,7 +4136,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb" name = "lancedb"
version = "0.19.0-beta.8" version = "0.19.0-beta.9"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-array", "arrow-array",
@@ -4202,7 +4223,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-node" name = "lancedb-node"
version = "0.19.0-beta.8" version = "0.19.0-beta.9"
dependencies = [ dependencies = [
"arrow-array", "arrow-array",
"arrow-ipc", "arrow-ipc",
@@ -4227,7 +4248,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-nodejs" name = "lancedb-nodejs"
version = "0.19.0-beta.8" version = "0.19.0-beta.9"
dependencies = [ dependencies = [
"arrow-array", "arrow-array",
"arrow-ipc", "arrow-ipc",
@@ -4245,7 +4266,7 @@ dependencies = [
[[package]] [[package]]
name = "lancedb-python" name = "lancedb-python"
version = "0.22.0-beta.8" version = "0.22.0-beta.9"
dependencies = [ dependencies = [
"arrow", "arrow",
"env_logger", "env_logger",
@@ -4342,9 +4363,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.171" version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]] [[package]]
name = "libloading" name = "libloading"
@@ -4368,9 +4389,9 @@ dependencies = [
[[package]] [[package]]
name = "libm" name = "libm"
version = "0.2.11" version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
[[package]] [[package]]
name = "libredox" name = "libredox"
@@ -5637,9 +5658,9 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.94" version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
@@ -5837,13 +5858,13 @@ dependencies = [
[[package]] [[package]]
name = "quinn-proto" name = "quinn-proto"
version = "0.11.10" version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc" checksum = "bcbafbbdbb0f638fe3f35f3c56739f77a8a1d070cb25603226c83339b391472b"
dependencies = [ dependencies = [
"bytes", "bytes",
"getrandom 0.3.2", "getrandom 0.3.2",
"rand 0.9.0", "rand 0.9.1",
"ring", "ring",
"rustc-hash 2.1.1", "rustc-hash 2.1.1",
"rustls 0.23.26", "rustls 0.23.26",
@@ -5903,13 +5924,12 @@ dependencies = [
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.9.0" version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [ dependencies = [
"rand_chacha 0.9.0", "rand_chacha 0.9.0",
"rand_core 0.9.3", "rand_core 0.9.3",
"zerocopy 0.8.24",
] ]
[[package]] [[package]]
@@ -6084,6 +6104,17 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "redox_users"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom 0.2.15",
"libredox",
"thiserror 2.0.12",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.11.1" version = "1.11.1"
@@ -6152,7 +6183,7 @@ dependencies = [
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2 0.4.8", "h2 0.4.9",
"http 1.3.1", "http 1.3.1",
"http-body 1.0.1", "http-body 1.0.1",
"http-body-util", "http-body-util",
@@ -6701,11 +6732,11 @@ dependencies = [
[[package]] [[package]]
name = "shellexpand" name = "shellexpand"
version = "3.1.0" version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b" checksum = "8b1fdf65dd6331831494dd616b30351c38e96e45921a27745cf98490458b90bb"
dependencies = [ dependencies = [
"dirs", "dirs 6.0.0",
] ]
[[package]] [[package]]
@@ -6716,9 +6747,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]] [[package]]
name = "signal-hook-registry" name = "signal-hook-registry"
version = "1.4.2" version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
dependencies = [ dependencies = [
"libc", "libc",
] ]

View File

@@ -8,7 +8,7 @@
<parent> <parent>
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.8</version> <version>0.19.0-beta.9</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId> <groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId> <artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.8</version> <version>0.19.0-beta.9</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>LanceDB Parent</name> <name>LanceDB Parent</name>

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{ {
"name": "vectordb", "name": "vectordb",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "vectordb", "name": "vectordb",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"cpu": [ "cpu": [
"x64", "x64",
"arm64" "arm64"
@@ -52,11 +52,11 @@
"uuid": "^9.0.0" "uuid": "^9.0.0"
}, },
"optionalDependencies": { "optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.8", "@lancedb/vectordb-darwin-arm64": "0.19.0-beta.9",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.8", "@lancedb/vectordb-darwin-x64": "0.19.0-beta.9",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.8", "@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.8", "@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.8" "@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.9"
}, },
"peerDependencies": { "peerDependencies": {
"@apache-arrow/ts": "^14.0.2", "@apache-arrow/ts": "^14.0.2",
@@ -327,9 +327,9 @@
} }
}, },
"node_modules/@lancedb/vectordb-darwin-arm64": { "node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.8.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.9.tgz",
"integrity": "sha512-zNKTlHemHUyU3+WtIQ029tZSl5C5hXWvwI073kfKuYOWGSRZeOcrU8WAuS9b17nfFD40X28YUD5qPB10GbMrNQ==", "integrity": "sha512-Sc4FFWT7ck7U2BKXnFl5EdPsb2ay8yOX3AqpIYbhenhpAkzLBS8NPUztuhybFoPeLSWKVUtwp9NGAxNyL9lgrQ==",
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
@@ -340,9 +340,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-darwin-x64": { "node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.8.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.9.tgz",
"integrity": "sha512-OdnduXdX5ZTZd2s+5wW5gssDYQKwEfUKxjOWOjjLS8SQeTlPM6pI0z9QP9K1sipbTYpYoCgokr5+PKKhvMPezw==", "integrity": "sha512-KiYzFFX7Hvq/OvsA7VZt0xfGRW4eoDcLwgWdXxgsNzB6X6d3eupUqkrTKe3KlHrnm170s5fwSFrJHIU/NPoA/A==",
"cpu": [ "cpu": [
"x64" "x64"
], ],
@@ -353,9 +353,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-linux-arm64-gnu": { "node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.8.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.9.tgz",
"integrity": "sha512-9Y52zhZYFbgCJA3Vxj8EFnZ8lVuvqAJNapQPo7bH56ZgnEcAnWikk8yWwT63PtI22T6XOcj1hWWYfWKrUXMggg==", "integrity": "sha512-TCK/PEqvoVa/oxYHWsjpB+BaQ/KDfwOI76grbezAEDGYS76USAnNb8KB8bDnnadOu9/i4Zsha0Bk0fsHfDSSpg==",
"cpu": [ "cpu": [
"arm64" "arm64"
], ],
@@ -366,9 +366,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-linux-x64-gnu": { "node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.8.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.9.tgz",
"integrity": "sha512-e0H+gSkvMGYx2DPcriXwwkALvZtmbWNtdpMAZceS8qHYv7xMtUPXG86od5vTbhKTrnC2hJLVj5E3JcAs8sJn6w==", "integrity": "sha512-Ti7Z2Am5ziYZmQBpLfVwGWe+zl11xed084Gv6WFcYfyLz6FJ/0+CEMVx1wIsoZ1gq6tvH0vgZLYxsMF5C5xVnw==",
"cpu": [ "cpu": [
"x64" "x64"
], ],
@@ -379,9 +379,9 @@
] ]
}, },
"node_modules/@lancedb/vectordb-win32-x64-msvc": { "node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.8.tgz", "resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.9.tgz",
"integrity": "sha512-olQKVpoWKJWOuVsFM92hmtHYFpCtITiKhUQ8gZu7ngrgLe7ofAASyqvWp5THV2zSXpwYITqrYjHOrtLy1/I9Jw==", "integrity": "sha512-MUSECQGbpRi3qbG6CsLJAInT0ZIMpr/RBhEx5CUlwZiEIWaUc/CQMoNJh7W+1s5Cd3UMBgkMpgmfj21Qd9fsHw==",
"cpu": [ "cpu": [
"x64" "x64"
], ],

View File

@@ -1,6 +1,6 @@
{ {
"name": "vectordb", "name": "vectordb",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"description": " Serverless, low-latency vector database for AI applications", "description": " Serverless, low-latency vector database for AI applications",
"private": false, "private": false,
"main": "dist/index.js", "main": "dist/index.js",
@@ -89,10 +89,10 @@
} }
}, },
"optionalDependencies": { "optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.8", "@lancedb/vectordb-darwin-x64": "0.19.0-beta.9",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.8", "@lancedb/vectordb-darwin-arm64": "0.19.0-beta.9",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.8", "@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.8", "@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.8" "@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.9"
} }
} }

View File

@@ -1,7 +1,7 @@
[package] [package]
name = "lancedb-nodejs" name = "lancedb-nodejs"
edition.workspace = true edition.workspace = true
version = "0.19.0-beta.8" version = "0.19.0-beta.9"
license.workspace = true license.workspace = true
description.workspace = true description.workspace = true
repository.workspace = true repository.workspace = true

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-darwin-arm64", "name": "@lancedb/lancedb-darwin-arm64",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["darwin"], "os": ["darwin"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node", "main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-darwin-x64", "name": "@lancedb/lancedb-darwin-x64",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["darwin"], "os": ["darwin"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.darwin-x64.node", "main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-arm64-gnu", "name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["linux"], "os": ["linux"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node", "main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-arm64-musl", "name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["linux"], "os": ["linux"],
"cpu": ["arm64"], "cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node", "main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-x64-gnu", "name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["linux"], "os": ["linux"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node", "main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-linux-x64-musl", "name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["linux"], "os": ["linux"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node", "main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-win32-arm64-msvc", "name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": [ "os": [
"win32" "win32"
], ],

View File

@@ -1,6 +1,6 @@
{ {
"name": "@lancedb/lancedb-win32-x64-msvc", "name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"os": ["win32"], "os": ["win32"],
"cpu": ["x64"], "cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node", "main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{ {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"lockfileVersion": 3, "lockfileVersion": 3,
"requires": true, "requires": true,
"packages": { "packages": {
"": { "": {
"name": "@lancedb/lancedb", "name": "@lancedb/lancedb",
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"cpu": [ "cpu": [
"x64", "x64",
"arm64" "arm64"

View File

@@ -11,7 +11,7 @@
"ann" "ann"
], ],
"private": false, "private": false,
"version": "0.19.0-beta.8", "version": "0.19.0-beta.9",
"main": "dist/index.js", "main": "dist/index.js",
"exports": { "exports": {
".": "./dist/index.js", ".": "./dist/index.js",

View File

@@ -1,5 +1,5 @@
[tool.bumpversion] [tool.bumpversion]
current_version = "0.22.0-beta.9" current_version = "0.22.0-beta.10"
parse = """(?x) parse = """(?x)
(?P<major>0|[1-9]\\d*)\\. (?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\. (?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb-python" name = "lancedb-python"
version = "0.22.0-beta.9" version = "0.22.0-beta.10"
edition.workspace = true edition.workspace = true
description = "Python bindings for LanceDB" description = "Python bindings for LanceDB"
license.workspace = true license.workspace = true

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb-node" name = "lancedb-node"
version = "0.19.0-beta.8" version = "0.19.0-beta.9"
description = "Serverless, low-latency vector database for AI applications" description = "Serverless, low-latency vector database for AI applications"
license.workspace = true license.workspace = true
edition.workspace = true edition.workspace = true

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "lancedb" name = "lancedb"
version = "0.19.0-beta.8" version = "0.19.0-beta.9"
edition.workspace = true edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications" description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true license.workspace = true

View File

@@ -20,7 +20,7 @@ pub async fn wait_for_index(
) -> Result<()> { ) -> Result<()> {
if timeout > MAX_WAIT { if timeout > MAX_WAIT {
return Err(Error::InvalidInput { return Err(Error::InvalidInput {
message: format!("timeout must be less than {:?}", MAX_WAIT).to_string(), message: format!("timeout must be less than {:?}", MAX_WAIT),
}); });
} }
let start = Instant::now(); let start = Instant::now();
@@ -84,7 +84,6 @@ pub async fn wait_for_index(
message: format!( message: format!(
"timed out waiting for indices: {:?} after {:?}", "timed out waiting for indices: {:?} after {:?}",
remaining, timeout remaining, timeout
) ),
.to_string(),
}) })
} }

View File

@@ -8,6 +8,7 @@
pub(crate) mod client; pub(crate) mod client;
pub(crate) mod db; pub(crate) mod db;
mod retry;
pub(crate) mod table; pub(crate) mod table;
pub(crate) mod util; pub(crate) mod util;

View File

@@ -1,17 +1,17 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors // SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use http::HeaderName; use http::HeaderName;
use log::debug; use log::debug;
use reqwest::{ use reqwest::{
header::{HeaderMap, HeaderValue}, header::{HeaderMap, HeaderValue},
Request, RequestBuilder, Response, Body, Request, RequestBuilder, Response,
}; };
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::remote::db::RemoteOptions; use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id"); const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
@@ -118,41 +118,14 @@ pub struct RetryConfig {
/// You can also set the `LANCE_CLIENT_RETRY_STATUSES` environment variable /// You can also set the `LANCE_CLIENT_RETRY_STATUSES` environment variable
/// to set this value. Use a comma-separated list of integer values. /// to set this value. Use a comma-separated list of integer values.
/// ///
/// The default is 429, 500, 502, 503. /// Note that write operations will never be retried on 5xx errors as this may
/// result in duplicated writes.
///
/// The default is 409, 429, 500, 502, 503, 504.
pub statuses: Option<Vec<u16>>, pub statuses: Option<Vec<u16>>,
// TODO: should we allow customizing methods? // TODO: should we allow customizing methods?
} }
#[derive(Debug, Clone)]
struct ResolvedRetryConfig {
retries: u8,
connect_retries: u8,
read_retries: u8,
backoff_factor: f32,
backoff_jitter: f32,
statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![429, 500, 502, 503])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}
// We use the `HttpSend` trait to abstract over the `reqwest::Client` so that // We use the `HttpSend` trait to abstract over the `reqwest::Client` so that
// we can mock responses in tests. Based on the patterns from this blog post: // we can mock responses in tests. Based on the patterns from this blog post:
// https://write.as/balrogboogie/testing-reqwest-based-clients // https://write.as/balrogboogie/testing-reqwest-based-clients
@@ -160,8 +133,8 @@ impl TryFrom<RetryConfig> for ResolvedRetryConfig {
pub struct RestfulLanceDbClient<S: HttpSend = Sender> { pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
client: reqwest::Client, client: reqwest::Client,
host: String, host: String,
retry_config: ResolvedRetryConfig, pub(crate) retry_config: ResolvedRetryConfig,
sender: S, pub(crate) sender: S,
} }
pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static { pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static {
@@ -375,74 +348,69 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.client.post(full_uri) self.client.post(full_uri)
} }
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> { pub async fn send(&self, req: RequestBuilder) -> Result<(String, Response)> {
let (client, request) = req.build_split(); let (client, request) = req.build_split();
let mut request = request.unwrap(); let mut request = request.unwrap();
let request_id = self.extract_request_id(&mut request);
self.log_request(&request, &request_id);
// Set a request id. let response = self
// TODO: allow the user to supply this, through middleware? .sender
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) { .send(&client, request)
request_id.to_str().unwrap().to_string() .await
} else { .err_to_http(request_id.clone())?;
let request_id = uuid::Uuid::new_v4().to_string(); debug!(
let header = HeaderValue::from_str(&request_id).unwrap(); "Received response for request_id={}: {:?}",
request.headers_mut().insert(REQUEST_ID_HEADER, header); request_id, &response
request_id );
}; Ok((request_id, response))
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
if with_retry {
self.send_with_retry_impl(client, request, request_id).await
} else {
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
} }
async fn send_with_retry_impl( /// Send the request using retries configured in the RetryConfig.
/// If retry_5xx is false, 5xx requests will not be retried regardless of the statuses configured
/// in the RetryConfig.
/// Since this requires arrow serialization, this is implemented here instead of in RestfulLanceDbClient
pub async fn send_with_retry(
&self, &self,
client: reqwest::Client, req_builder: RequestBuilder,
req: Request, mut make_body: Option<Box<dyn FnMut() -> Result<Body> + Send + 'static>>,
request_id: String, retry_5xx: bool,
) -> Result<(String, Response)> { ) -> Result<(String, Response)> {
let mut retry_counter = RetryCounter::new(&self.retry_config, request_id); let retry_config = &self.retry_config;
let non_5xx_statuses = retry_config
.statuses
.iter()
.filter(|s| !s.is_server_error())
.cloned()
.collect::<Vec<_>>();
// clone and build the request to extract the request id
let tmp_req = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let (_, r) = tmp_req.build_split();
let mut r = r.unwrap();
let request_id = self.extract_request_id(&mut r);
let mut retry_counter = RetryCounter::new(retry_config, request_id.clone());
loop { loop {
// This only works if the request body is not a stream. If it is let mut req_builder = req_builder.try_clone().ok_or_else(|| Error::Runtime {
// a stream, we can't use the retry path. We would need to implement
// an outer retry.
let request = req.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(), message: "Attempted to retry a request that cannot be cloned".to_string(),
})?; })?;
let response = self
.sender // set the streaming body on the request builder after clone
.send(&client, request) if let Some(body_gen) = make_body.as_mut() {
.await let body = body_gen()?;
.map(|r| (r.status(), r)); req_builder = req_builder.body(body);
}
let (c, request) = req_builder.build_split();
let mut request = request.unwrap();
self.set_request_id(&mut request, &request_id.clone());
self.log_request(&request, &request_id);
let response = self.sender.send(&c, request).await.map(|r| (r.status(), r));
match response { match response {
Ok((status, response)) if status.is_success() => { Ok((status, response)) if status.is_success() => {
debug!( debug!(
@@ -451,7 +419,10 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
); );
return Ok((retry_counter.request_id, response)); return Ok((retry_counter.request_id, response));
} }
Ok((status, response)) if self.retry_config.statuses.contains(&status) => { Ok((status, response))
if (retry_5xx && retry_config.statuses.contains(&status))
|| non_5xx_statuses.contains(&status) =>
{
let source = self let source = self
.check_response(&retry_counter.request_id, response) .check_response(&retry_counter.request_id, response)
.await .await
@@ -480,6 +451,47 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
} }
} }
fn log_request(&self, request: &Request, request_id: &String) {
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
}
/// Extract the request ID from the request headers.
/// If the request ID header is not set, this will generate a new one and set
/// it on the request headers
pub fn extract_request_id(&self, request: &mut Request) -> String {
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
self.set_request_id(request, &request_id);
request_id
};
request_id
}
/// Set the request ID header
pub fn set_request_id(&self, request: &mut Request, request_id: &str) {
let header = HeaderValue::from_str(request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
}
pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> { pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> {
// Try to get the response text, but if that fails, just return the status code // Try to get the response text, but if that fails, just return the status code
let status = response.status(); let status = response.status();
@@ -501,91 +513,6 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
} }
} }
struct RetryCounter<'a> {
request_failures: u8,
connect_failures: u8,
read_failures: u8,
config: &'a ResolvedRetryConfig,
request_id: String,
}
impl<'a> RetryCounter<'a> {
fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
fn increment_request_failures(&mut self, source: crate::Error) -> Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_connect_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_read_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
pub trait RequestResultExt { pub trait RequestResultExt {
type Output; type Output;
fn err_to_http(self, request_id: String) -> Result<Self::Output>; fn err_to_http(self, request_id: String) -> Result<Self::Output>;

View File

@@ -255,7 +255,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
if let Some(start_after) = request.start_after { if let Some(start_after) = request.start_after {
req = req.query(&[("page_token", start_after)]); req = req.query(&[("page_token", start_after)]);
} }
let (request_id, rsp) = self.client.send(req, true).await?; let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?; let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?; let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp let tables = rsp
@@ -302,7 +302,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.body(data_buffer) .body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE); .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, rsp) = self.client.send(req, false).await?; let (request_id, rsp) = self.client.send(req).await?;
if rsp.status() == StatusCode::BAD_REQUEST { if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await.err_to_http(request_id.clone())?; let body = rsp.text().await.err_to_http(request_id.clone())?;
@@ -362,7 +362,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let req = self let req = self
.client .client
.post(&format!("/v1/table/{}/describe/", request.name)); .post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, rsp) = self.client.send(req, true).await?; let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
if rsp.status() == StatusCode::NOT_FOUND { if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: request.name }); return Err(crate::Error::TableNotFound { name: request.name });
} }
@@ -383,7 +383,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.client .client
.post(&format!("/v1/table/{}/rename/", current_name)); .post(&format!("/v1/table/{}/rename/", current_name));
let req = req.json(&serde_json::json!({ "new_table_name": new_name })); let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req, false).await?; let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?; self.client.check_response(&request_id, resp).await?;
let table = self.table_cache.remove(current_name).await; let table = self.table_cache.remove(current_name).await;
if let Some(table) = table { if let Some(table) = table {
@@ -394,7 +394,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn drop_table(&self, name: &str) -> Result<()> { async fn drop_table(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/table/{}/drop/", name)); let req = self.client.post(&format!("/v1/table/{}/drop/", name));
let (request_id, resp) = self.client.send(req, true).await?; let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?; self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(name).await; self.table_cache.remove(name).await;
Ok(()) Ok(())

View File

@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::remote::RetryConfig;
use crate::Error;
use log::debug;
use std::time::Duration;
pub struct RetryCounter<'a> {
pub request_failures: u8,
pub connect_failures: u8,
pub read_failures: u8,
pub config: &'a ResolvedRetryConfig,
pub request_id: String,
}
impl<'a> RetryCounter<'a> {
pub(crate) fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> crate::Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
pub fn increment_request_failures(&mut self, source: crate::Error) -> crate::Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_connect_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_read_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
#[derive(Debug, Clone)]
pub struct ResolvedRetryConfig {
pub retries: u8,
pub connect_retries: u8,
pub read_retries: u8,
pub backoff_factor: f32,
pub backoff_jitter: f32,
pub statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> crate::Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![409, 429, 500, 502, 503, 504])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}

View File

@@ -7,7 +7,7 @@ use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter}; use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type}; use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error, Table}; use crate::{DistanceType, Error, Table};
use arrow_array::RecordBatchReader; use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::reader::FileReader; use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef}; use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait; use async_trait::async_trait;
@@ -21,6 +21,7 @@ use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream; use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version}; use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec}; use lance_datafusion::exec::{execute_plan, OneShotExec};
use reqwest::{RequestBuilder, Response};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::io::Cursor; use std::io::Cursor;
use std::pin::Pin; use std::pin::Pin;
@@ -83,7 +84,7 @@ impl<S: HttpSend> RemoteTable<S> {
let body = serde_json::json!({ "version": version }); let body = serde_json::json!({ "version": version });
request = request.json(&body); request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?; let response = self.check_table_response(&request_id, response).await?;
@@ -127,6 +128,61 @@ impl<S: HttpSend> RemoteTable<S> {
Ok(reqwest::Body::wrap_stream(body_stream)) Ok(reqwest::Body::wrap_stream(body_stream))
} }
/// Buffer the reader into memory
async fn buffer_reader<R: RecordBatchReader + ?Sized>(
reader: &mut R,
) -> Result<(SchemaRef, Vec<RecordBatch>)> {
let schema = reader.schema();
let mut batches = Vec::new();
for batch in reader {
batches.push(batch?);
}
Ok((schema, batches))
}
/// Create a new RecordBatchReader from buffered data
fn make_reader(schema: SchemaRef, batches: Vec<RecordBatch>) -> impl RecordBatchReader {
let iter = batches.into_iter().map(Ok);
RecordBatchIterator::new(iter, schema)
}
async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
let res = if with_retry {
self.client.send_with_retry(req, None, true).await?
} else {
self.client.send(req).await?
};
Ok(res)
}
/// Send the request with streaming body.
/// This will use retries if with_retry is set and the number of configured retries is > 0.
/// If retries are enabled, the stream will be buffered into memory.
async fn send_streaming(
&self,
req: RequestBuilder,
mut data: Box<dyn RecordBatchReader + Send>,
with_retry: bool,
) -> Result<(String, Response)> {
if !with_retry || self.client.retry_config.retries == 0 {
let body = Self::reader_as_body(data)?;
return self.client.send(req.body(body)).await;
}
// to support retries, buffer into memory and clone the batches on each retry
let (schema, batches) = Self::buffer_reader(&mut *data).await?;
let make_body = Box::new(move || {
let reader = Self::make_reader(schema.clone(), batches.clone());
Self::reader_as_body(Box::new(reader))
});
let res = self
.client
.send_with_retry(req, Some(make_body), false)
.await?;
Ok(res)
}
async fn check_table_response( async fn check_table_response(
&self, &self,
request_id: &str, request_id: &str,
@@ -353,7 +409,7 @@ impl<S: HttpSend> RemoteTable<S> {
.collect(); .collect();
let futures = requests.into_iter().map(|req| async move { let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?; let (request_id, response) = self.send(req, true).await?;
self.read_arrow_stream(&request_id, response).await self.read_arrow_stream(&request_id, response).await
}); });
let streams = futures::future::try_join_all(futures); let streams = futures::future::try_join_all(futures);
@@ -471,7 +527,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version }); let body = serde_json::json!({ "version": version });
request = request.json(&body); request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
self.checkout_latest().await?; self.checkout_latest().await?;
Ok(()) Ok(())
@@ -481,7 +537,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = self let request = self
.client .client
.post(&format!("/v1/table/{}/version/list/", self.name)); .post(&format!("/v1/table/{}/version/list/", self.name));
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?; let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)] #[derive(Deserialize)]
@@ -527,7 +583,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request = request.json(&body); request = request.json(&body);
} }
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?; let response = self.check_table_response(&request_id, response).await?;
@@ -545,12 +601,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
data: Box<dyn RecordBatchReader + Send>, data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> { ) -> Result<()> {
self.check_mutable().await?; self.check_mutable().await?;
let body = Self::reader_as_body(data)?;
let mut request = self let mut request = self
.client .client
.post(&format!("/v1/table/{}/insert/", self.name)) .post(&format!("/v1/table/{}/insert/", self.name))
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
.body(body);
match add.mode { match add.mode {
AddDataMode::Append => {} AddDataMode::Append => {}
@@ -559,8 +613,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
} }
} }
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send_streaming(request, data, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
@@ -628,7 +681,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let futures = requests.into_iter().map(|req| async move { let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?; let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?; let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?; let body = response.text().await.err_to_http(request_id.clone())?;
@@ -670,7 +723,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect(); .collect();
let futures = requests.into_iter().map(|req| async move { let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?; let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?; let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?; let body = response.text().await.err_to_http(request_id.clone())?;
@@ -712,7 +765,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"predicate": update.filter, "predicate": update.filter,
})); }));
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
@@ -726,7 +779,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client .client
.post(&format!("/v1/table/{}/delete/", self.name)) .post(&format!("/v1/table/{}/delete/", self.name))
.json(&body); .json(&body);
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
} }
@@ -812,7 +865,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = request.json(&body); let request = request.json(&body);
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
@@ -836,21 +889,21 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
new_data: Box<dyn RecordBatchReader + Send>, new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> { ) -> Result<()> {
self.check_mutable().await?; self.check_mutable().await?;
let query = MergeInsertRequest::try_from(params)?; let query = MergeInsertRequest::try_from(params)?;
let body = Self::reader_as_body(new_data)?;
let request = self let request = self
.client .client
.post(&format!("/v1/table/{}/merge_insert/", self.name)) .post(&format!("/v1/table/{}/merge_insert/", self.name))
.query(&query) .query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE) .header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
.body(body);
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send_streaming(request, new_data, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
} }
async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> { async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> {
self.check_mutable().await?; self.check_mutable().await?;
Err(Error::NotSupported { Err(Error::NotSupported {
@@ -879,7 +932,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client .client
.post(&format!("/v1/table/{}/add_columns/", self.name)) .post(&format!("/v1/table/{}/add_columns/", self.name))
.json(&body); .json(&body);
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send(request, true).await?; // todo:
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
} }
@@ -918,7 +971,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client .client
.post(&format!("/v1/table/{}/alter_columns/", self.name)) .post(&format!("/v1/table/{}/alter_columns/", self.name))
.json(&body); .json(&body);
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
} }
@@ -930,7 +983,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client .client
.post(&format!("/v1/table/{}/drop_columns/", self.name)) .post(&format!("/v1/table/{}/drop_columns/", self.name))
.json(&body); .json(&body);
let (request_id, response) = self.client.send(request, false).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
} }
@@ -944,7 +997,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version }); let body = serde_json::json!({ "version": version });
request = request.json(&body); request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?; let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)] #[derive(Deserialize)]
@@ -1001,7 +1054,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version }); let body = serde_json::json!({ "version": version });
request = request.json(&body); request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
if response.status() == StatusCode::NOT_FOUND { if response.status() == StatusCode::NOT_FOUND {
return Ok(None); return Ok(None);
@@ -1011,7 +1064,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = response.text().await.err_to_http(request_id.clone())?; let body = response.text().await.err_to_http(request_id.clone())?;
println!("body: {:?}", body);
let stats = serde_json::from_str(&body).map_err(|e| Error::Http { let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse index statistics: {}", e).into(), source: format!("Failed to parse index statistics: {}", e).into(),
request_id, request_id,
@@ -1026,7 +1078,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"/v1/table/{}/index/{}/drop/", "/v1/table/{}/index/{}/drop/",
self.name, index_name self.name, index_name
)); ));
let (request_id, response) = self.client.send(request, true).await?; let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?; self.check_table_response(&request_id, response).await?;
Ok(()) Ok(())
} }
@@ -1487,6 +1539,42 @@ mod tests {
assert_eq!(&body, &expected_body); assert_eq!(&body, &expected_body);
} }
#[tokio::test]
async fn test_merge_insert_retries_on_409() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
// Default parameters
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/merge_insert/");
let params = request.url().query_pairs().collect::<HashMap<_, _>>();
assert_eq!(params["on"], "some_col");
assert_eq!(params["when_matched_update_all"], "false");
assert_eq!(params["when_not_matched_insert_all"], "false");
assert_eq!(params["when_not_matched_by_source_delete"], "false");
assert!(!params.contains_key("when_matched_update_all_filt"));
assert!(!params.contains_key("when_not_matched_by_source_delete_filt"));
http::Response::builder().status(409).body("").unwrap()
});
let e = table
.merge_insert(&["some_col"])
.execute(data)
.await
.unwrap_err();
assert!(e.to_string().contains("Hit retry limit"));
}
#[tokio::test] #[tokio::test]
async fn test_delete() { async fn test_delete() {
let table = Table::new_with_handler("my_table", |request| { let table = Table::new_with_handler("my_table", |request| {