Compare commits

..

21 Commits

Author SHA1 Message Date
Lance Release
a8b5ad7e74 Bump version: 0.22.0-beta.12 → 0.22.0 2025-04-25 21:16:07 +00:00
Lance Release
f8f6264883 Bump version: 0.22.0-beta.11 → 0.22.0-beta.12 2025-04-25 21:16:07 +00:00
Will Jones
d8517117f1 feat: upgrade Lance to v0.26.0 (#2359)
Upstream changelog:
https://github.com/lancedb/lance/releases/tag/v0.26.0

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Updated dependency management to use published crate versions for
improved reliability and maintainability.
- Added a temporary workaround for build issues by pinning a specific
version of a dependency.
- **Refactor**
- Improved resource management and concurrency by updating internal
ownership models for object storage components.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-25 13:59:12 -07:00
Lance Release
ab66dd5ed2 Updating package-lock.json 2025-04-25 06:04:06 +00:00
Lance Release
cbb9a7877c Updating package-lock.json 2025-04-25 05:02:47 +00:00
Lance Release
b7fc223535 Updating package-lock.json 2025-04-25 05:02:32 +00:00
Lance Release
1fdaf7a1a4 Bump version: 0.19.0-beta.10 → 0.19.0-beta.11 2025-04-25 05:02:16 +00:00
Lance Release
d11819c90c Bump version: 0.22.0-beta.10 → 0.22.0-beta.11 2025-04-25 05:01:57 +00:00
BubbleCal
9b902272f1 fix: sync hybrid search ignores the distance range params (#2356)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added support for distance range filtering in hybrid vector queries,
allowing users to specify lower and upper bounds for search results.

- **Tests**
- Introduced new tests to validate distance range filtering and
reranking in both synchronous and asynchronous hybrid query scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-04-25 13:01:22 +08:00
Will Jones
8c0622fa2c fix: remote limit to avoid "Limit must be non-negative" (#2354)
To workaround this issue: https://github.com/lancedb/lancedb/issues/2211

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Improved handling of large query parameters to prevent potential
overflow issues when using the "k" parameter in queries.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-24 15:04:06 -07:00
Philip Meier
2191f948c3 fix: add missing pydantic model config compat (#2316)
Fixes #2315.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Refactor**
- Enhanced query processing to maintain smooth functionality across
different dependency versions, ensuring improved stability and
performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 14:46:10 -07:00
Will Jones
acc3b03004 ci: fix docs deploy (#2351)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Improved CI workflow for documentation builds by optimizing Rust build
settings and updating the runner environment.
  - Fixed a typo in a workflow step name.
- Streamlined caching steps to reduce redundancy and improve efficiency.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 13:55:34 -07:00
Lance Release
7f091b8c8e Updating package-lock.json 2025-04-22 19:16:43 +00:00
Lance Release
c19bdd9a24 Updating package-lock.json 2025-04-22 18:24:16 +00:00
Lance Release
dad0ff5cd2 Updating package-lock.json 2025-04-22 18:23:59 +00:00
Lance Release
a705621067 Bump version: 0.19.0-beta.9 → 0.19.0-beta.10 2025-04-22 18:23:39 +00:00
Lance Release
39614fdb7d Bump version: 0.22.0-beta.9 → 0.22.0-beta.10 2025-04-22 18:23:17 +00:00
Ryan Green
96d534d4bc feat: add retries to remote client for requests with stream bodies (#2349)
Closes https://github.com/lancedb/lancedb/issues/2307
* Adds retries to remote operations with stream bodies (add,
merge_insert)
* Change default retryable status codes to 409, 429, 500, 502, 503, 504
* Don't retry add or merge_insert operations on 5xx responses

Notes:
* Supporting retries on stream bodies means we have to buffer the body
into memory so it can be cloned on retry. This will impact memory use
patterns for the remote client. This buffering can be disabled by
disabling retries (i.e. setting retries to 0 in RetryConfig)
* It does not seem that retry config can be specified by env vars as the
documentation suggests. I added a follow-up issue
[here](https://github.com/lancedb/lancedb/issues/2350)



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Enhanced retry support for remote requests with configurable limits
and exponential backoff with jitter.
- Added robust retry logic for streaming data uploads, enabling retries
with buffered data to ensure reliability.

- **Bug Fixes**
- Improved error handling and retry behavior for HTTP status codes 409
and 504.

- **Refactor**
- Centralized and modularized HTTP request sending and retry logic
across remote database and table operations.
  - Streamlined request ID management for improved traceability.
- Simplified error message construction in index waiting functionality.

- **Tests**
  - Added a test verifying merge-insert retries on HTTP 409 responses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 15:40:44 -02:30
Lance Release
5051d30d09 Updating package-lock.json 2025-04-21 23:55:43 +00:00
Lance Release
db853c4041 Updating package-lock.json 2025-04-21 22:50:56 +00:00
Lance Release
76d1d22bdc Updating package-lock.json 2025-04-21 22:50:40 +00:00
34 changed files with 606 additions and 339 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.19.0-beta.9"
current_version = "0.19.0-beta.11"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -18,17 +18,24 @@ concurrency:
group: "pages"
cancel-in-progress: true
env:
# This reduces the disk space needed for the build
RUSTFLAGS: "-C debuginfo=0"
# according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html
# CI builds are faster with incremental disabled.
CARGO_INCREMENTAL: "0"
jobs:
# Single deploy job since we're just deploying
build:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: buildjet-8vcpu-ubuntu-2204
runs-on: ubuntu-24.04
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install dependecies needed for ubuntu
- name: Install dependencies needed for ubuntu
run: |
sudo apt install -y protobuf-compiler libssl-dev
rustup update && rustup default
@@ -38,6 +45,7 @@ jobs:
python-version: "3.10"
cache: "pip"
cache-dependency-path: "docs/requirements.txt"
- uses: Swatinem/rust-cache@v2
- name: Build Python
working-directory: python
run: |
@@ -49,7 +57,6 @@ jobs:
node-version: 20
cache: 'npm'
cache-dependency-path: node/package-lock.json
- uses: Swatinem/rust-cache@v2
- name: Install node dependencies
working-directory: node
run: |

159
Cargo.lock generated
View File

@@ -128,9 +128,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.97"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
[[package]]
name = "arbitrary"
@@ -390,9 +390,9 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.22"
version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64"
checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07"
dependencies = [
"flate2",
"futures-core",
@@ -882,7 +882,7 @@ dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
"aws-smithy-types",
"h2 0.4.8",
"h2 0.4.9",
"http 0.2.12",
"http 1.3.1",
"http-body 0.4.6",
@@ -1185,9 +1185,9 @@ dependencies = [
[[package]]
name = "blake3"
version = "1.8.1"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389a099b34312839e16420d499a9cad9650541715937ffbdd40d36f49e77eeb3"
checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0"
dependencies = [
"arrayref",
"arrayvec",
@@ -2377,7 +2377,16 @@ version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
dependencies = [
"dirs-sys",
"dirs-sys 0.4.1",
]
[[package]]
name = "dirs"
version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e"
dependencies = [
"dirs-sys 0.5.0",
]
[[package]]
@@ -2388,10 +2397,22 @@ checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
dependencies = [
"libc",
"option-ext",
"redox_users",
"redox_users 0.4.6",
"windows-sys 0.48.0",
]
[[package]]
name = "dirs-sys"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab"
dependencies = [
"libc",
"option-ext",
"redox_users 0.5.0",
"windows-sys 0.59.0",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -2558,9 +2579,9 @@ dependencies = [
[[package]]
name = "ethnum"
version = "1.5.0"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c"
checksum = "0939f82868b77ef93ce3c3c3daf2b3c526b456741da5a1a4559e590965b6026b"
[[package]]
name = "event-listener"
@@ -2722,7 +2743,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbbb5d86fdf9f56c54cf7ec48f4471c0e901af458ee9821677dc8ba0c38bc0be"
dependencies = [
"rand 0.8.5",
]
@@ -3049,9 +3071,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.4.8"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
dependencies = [
"atomic-waker",
"bytes",
@@ -3138,7 +3160,7 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc03dcb0b0a83ae3f3363ec811014ae669f083e4e499c66602f447c4828737a1"
dependencies = [
"dirs",
"dirs 5.0.1",
"futures",
"http 1.3.1",
"indicatif",
@@ -3286,7 +3308,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.8",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
@@ -3645,9 +3667,9 @@ checksum = "9028f49264629065d057f340a86acb84867925865f73bbf8d47b4d149a7e88b8"
[[package]]
name = "jiff"
version = "0.2.6"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f33145a5cbea837164362c7bd596106eb7c5198f97d1ba6f6ebb3223952e488"
checksum = "5a064218214dc6a10fbae5ec5fa888d80c45d611aba169222fc272072bf7aef6"
dependencies = [
"jiff-static",
"log",
@@ -3658,9 +3680,9 @@ dependencies = [
[[package]]
name = "jiff-static"
version = "0.2.6"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43ce13c40ec6956157a3635d97a1ee2df323b263f09ea14165131289cb0f5c19"
checksum = "199b7932d97e325aff3a7030e141eafe7f2c6268e1d1b24859b753a627f45254"
dependencies = [
"proc-macro2",
"quote",
@@ -3712,12 +3734,14 @@ dependencies = [
[[package]]
name = "lance"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "285c3f98a7182e2f35eabc9be67927bb9167b236c6d9c45d894928cbe330067c"
dependencies = [
"arrow",
"arrow-arith",
"arrow-array",
"arrow-buffer",
"arrow-ipc",
"arrow-ord",
"arrow-row",
"arrow-schema",
@@ -3736,6 +3760,7 @@ dependencies = [
"datafusion-functions",
"datafusion-physical-expr",
"deepsize",
"either",
"futures",
"half",
"humantime",
@@ -3773,7 +3798,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f877f217d6f93b24b54a2390a988a32f99d6608fe8af7766d93bd515e77dd2a"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -3791,7 +3817,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52fa4661c532db5b53102e2b394c9735bf6e707c337dfa5b9d98baba5c0cba13"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -3828,7 +3855,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3efa610cc1168aaf96734f2f7911fb874609304716aab3318a86193da883f700"
dependencies = [
"arrow",
"arrow-array",
@@ -3847,8 +3875,10 @@ dependencies = [
"lance-datagen",
"lazy_static",
"log",
"pin-project",
"prost",
"snafu",
"tempfile",
"tokio",
"tracing",
]
@@ -3856,7 +3886,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c61affd39495caa923f6a49a7cb0a9f36fea2d7231a039e557f908e0b3b59cf"
dependencies = [
"arrow",
"arrow-array",
@@ -3872,7 +3903,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "051d65ab02790552c8790fe22915fbdd1629f3e1fa2a6ef69946e77c9d2b6f8e"
dependencies = [
"arrayref",
"arrow",
@@ -3912,7 +3944,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e4aff9ff0801c82d8fcb88cacec4880b6aaf53c6480291d50a4fcc12e6853c4"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -3947,7 +3980,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3823b5147002a3115456c4dd1e2b16c645c08f4653e6e9dc624b9381ba29c87f"
dependencies = [
"arrow",
"arrow-array",
@@ -3965,7 +3999,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-sql",
"deepsize",
"dirs",
"dirs 5.0.1",
"fst",
"futures",
"half",
@@ -4001,7 +4035,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a401202f6a1997db4ea5e9eb1a73a352736b320808a2e8497686c44fe6badf01"
dependencies = [
"arrow",
"arrow-arith",
@@ -4040,7 +4075,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82cc5333ed9d12f1745e849ad161746da0b12ae3d4c9897d1937411e6533f504"
dependencies = [
"arrow-array",
"arrow-ord",
@@ -4064,7 +4100,8 @@ dependencies = [
[[package]]
name = "lance-table"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41a43c76277808c452135f33a6b46ca8ec6ba38167534ff5240b46098ed81e73"
dependencies = [
"arrow",
"arrow-array",
@@ -4104,7 +4141,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "0.26.0"
source = "git+https://github.com/lancedb/lance?tag=v0.26.0-beta.1#8e46047e2dcb171bec28e28b507a9b7858348773"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f697b2c273e4629c782205e563282c08a74fe237ca8dd36cf10f862951887a70"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -4115,7 +4153,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.19.0-beta.8"
version = "0.19.0-beta.11"
dependencies = [
"arrow",
"arrow-array",
@@ -4202,7 +4240,7 @@ dependencies = [
[[package]]
name = "lancedb-node"
version = "0.19.0-beta.8"
version = "0.19.0-beta.11"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4227,12 +4265,13 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.19.0-beta.8"
version = "0.19.0-beta.11"
dependencies = [
"arrow-array",
"arrow-ipc",
"arrow-schema",
"async-trait",
"aws-lc-sys",
"env_logger",
"futures",
"lancedb",
@@ -4245,7 +4284,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.22.0-beta.8"
version = "0.22.0-beta.11"
dependencies = [
"arrow",
"env_logger",
@@ -4342,9 +4381,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.171"
version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libloading"
@@ -4368,9 +4407,9 @@ dependencies = [
[[package]]
name = "libm"
version = "0.2.11"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa"
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
[[package]]
name = "libredox"
@@ -5637,9 +5676,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.94"
version = "1.0.95"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84"
checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
dependencies = [
"unicode-ident",
]
@@ -5837,13 +5876,13 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.11.10"
version = "0.11.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc"
checksum = "bcbafbbdbb0f638fe3f35f3c56739f77a8a1d070cb25603226c83339b391472b"
dependencies = [
"bytes",
"getrandom 0.3.2",
"rand 0.9.0",
"rand 0.9.1",
"ring",
"rustc-hash 2.1.1",
"rustls 0.23.26",
@@ -5903,13 +5942,12 @@ dependencies = [
[[package]]
name = "rand"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
"zerocopy 0.8.24",
]
[[package]]
@@ -6084,6 +6122,17 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "redox_users"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom 0.2.15",
"libredox",
"thiserror 2.0.12",
]
[[package]]
name = "regex"
version = "1.11.1"
@@ -6152,7 +6201,7 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.4.8",
"h2 0.4.9",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
@@ -6701,11 +6750,11 @@ dependencies = [
[[package]]
name = "shellexpand"
version = "3.1.0"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b"
checksum = "8b1fdf65dd6331831494dd616b30351c38e96e45921a27745cf98490458b90bb"
dependencies = [
"dirs",
"dirs 6.0.0",
]
[[package]]
@@ -6716,9 +6765,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
version = "1.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410"
dependencies = [
"libc",
]

View File

@@ -21,16 +21,14 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.26.0", "features" = [
"dynamodb",
], tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-io = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-index = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-linalg = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-table = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-testing = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-datafusion = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance-encoding = { version = "=0.26.0", tag = "v0.26.0-beta.1", git = "https://github.com/lancedb/lance" }
lance = { "version" = "=0.26.0", "features" = ["dynamodb"] }
lance-io = "=0.26.0"
lance-index = "=0.26.0"
lance-linalg = "=0.26.0"
lance-table = "=0.26.0"
lance-testing = "=0.26.0"
lance-datafusion = "=0.26.0"
lance-encoding = "=0.26.0"
# Note that this one does not include pyarrow
arrow = { version = "54.1", optional = false }
arrow-array = "54.1"

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.9</version>
<version>0.19.0-beta.11</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.9</version>
<version>0.19.0-beta.11</version>
<packaging>pom</packaging>
<name>LanceDB Parent</name>

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.11",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.11",
"cpu": [
"x64",
"arm64"
@@ -52,11 +52,11 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.8",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.8",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.8",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.8",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.8"
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.11",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.11"
},
"peerDependencies": {
"@apache-arrow/ts": "^14.0.2",
@@ -327,9 +327,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.8.tgz",
"integrity": "sha512-zNKTlHemHUyU3+WtIQ029tZSl5C5hXWvwI073kfKuYOWGSRZeOcrU8WAuS9b17nfFD40X28YUD5qPB10GbMrNQ==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.11.tgz",
"integrity": "sha512-fLefGJYdlIRIjrJj8MU1r5Zix5LpKktpCYilA7tZrfvBWkubGceJ+U6RPsWz7VGBfWcETo3g5CBooUPhbtSMlQ==",
"cpu": [
"arm64"
],
@@ -340,9 +340,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.8.tgz",
"integrity": "sha512-OdnduXdX5ZTZd2s+5wW5gssDYQKwEfUKxjOWOjjLS8SQeTlPM6pI0z9QP9K1sipbTYpYoCgokr5+PKKhvMPezw==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.11.tgz",
"integrity": "sha512-FkCa1TbPLDXAGhlRI4tafcltzApCsyvgi+I+kX07u5DKPNQVALpQ3R6X6GLlbiFsAFBdyv9t2fqQ9DlgjJIZpA==",
"cpu": [
"x64"
],
@@ -353,9 +353,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.8.tgz",
"integrity": "sha512-9Y52zhZYFbgCJA3Vxj8EFnZ8lVuvqAJNapQPo7bH56ZgnEcAnWikk8yWwT63PtI22T6XOcj1hWWYfWKrUXMggg==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.11.tgz",
"integrity": "sha512-iZkL/01HNUNQ8pGK0+hoNyrM7P1YtShsyIQVzJMfo41SAofCBf9qvi9YaYyd49sDb+dQXeRn1+cfaJ9siz1OHw==",
"cpu": [
"arm64"
],
@@ -366,9 +366,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.8.tgz",
"integrity": "sha512-e0H+gSkvMGYx2DPcriXwwkALvZtmbWNtdpMAZceS8qHYv7xMtUPXG86od5vTbhKTrnC2hJLVj5E3JcAs8sJn6w==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.11.tgz",
"integrity": "sha512-MdKRHxe2tRQqmExNLv3f6Wvx1mEi98eFtD0ysm4tNrQdaS1MJbTp+DUehrRKkfDWsooalHkIi9d02BVw5qseUQ==",
"cpu": [
"x64"
],
@@ -379,9 +379,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.19.0-beta.8",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.8.tgz",
"integrity": "sha512-olQKVpoWKJWOuVsFM92hmtHYFpCtITiKhUQ8gZu7ngrgLe7ofAASyqvWp5THV2zSXpwYITqrYjHOrtLy1/I9Jw==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.11.tgz",
"integrity": "sha512-KWy+t9jr0feJAW9NkmM/w9kfdpp78+7mkeh9lb0g3xI3OdYU1yizNqFjbIQqJf7/L4sou4wmOjAC+FcP8qCtzg==",
"cpu": [
"x64"
],

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"description": " Serverless, low-latency vector database for AI applications",
"private": false,
"main": "dist/index.js",
@@ -89,10 +89,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.9",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.9",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.9",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.9"
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.11",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.11",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.11"
}
}

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.19.0-beta.9"
version = "0.19.0-beta.11"
license.workspace = true
description.workspace = true
repository.workspace = true
@@ -28,6 +28,9 @@ napi-derive = "2.16.4"
lzma-sys = { version = "*", features = ["static"] }
log.workspace = true
# Workaround for build failure until we can fix it.
aws-lc-sys = "=0.28.0"
[build-dependencies]
napi-build = "2.1"

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.11",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.19.0-beta.8",
"version": "0.19.0-beta.11",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.19.0-beta.9",
"version": "0.19.0-beta.11",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.22.0-beta.9"
current_version = "0.22.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.22.0-beta.9"
version = "0.22.0"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -28,6 +28,8 @@ import pyarrow.compute as pc
import pyarrow.fs as pa_fs
import pydantic
from lancedb.pydantic import PYDANTIC_VERSION
from . import __version__
from .arrow import AsyncRecordBatchReader
from .dependencies import pandas as pd
@@ -498,10 +500,14 @@ class Query(pydantic.BaseModel):
)
return query
class Config:
# This tells pydantic to allow custom types (needed for the `vector` query since
# pa.Array wouln't be allowed otherwise)
arbitrary_types_allowed = True
# This tells pydantic to allow custom types (needed for the `vector` query since
# pa.Array wouln't be allowed otherwise)
if PYDANTIC_VERSION.major < 2: # Pydantic 1.x compat
class Config:
arbitrary_types_allowed = True
else:
model_config = {"arbitrary_types_allowed": True}
class LanceQueryBuilder(ABC):
@@ -1586,6 +1592,8 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
self._refine_factor = None
self._distance_type = None
self._phrase_query = None
self._lower_bound = None
self._upper_bound = None
def _validate_query(self, query, vector=None, text=None):
if query is not None and (vector is not None or text is not None):
@@ -1665,6 +1673,10 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
self._vector_query.ef(self._ef)
if self._bypass_vector_index:
self._vector_query.bypass_vector_index()
if self._lower_bound or self._upper_bound:
self._vector_query.distance_range(
lower_bound=self._lower_bound, upper_bound=self._upper_bound
)
if self._reranker is None:
self._reranker = RRFReranker()

View File

@@ -4,13 +4,32 @@
import lancedb
from lancedb.query import LanceHybridQueryBuilder
from lancedb.rerankers.rrf import RRFReranker
import pyarrow as pa
import pyarrow.compute as pc
import pytest
import pytest_asyncio
from lancedb.index import FTS
from lancedb.table import AsyncTable
from lancedb.table import AsyncTable, Table
@pytest.fixture
def sync_table(tmpdir_factory) -> Table:
tmp_path = str(tmpdir_factory.mktemp("data"))
db = lancedb.connect(tmp_path)
data = pa.table(
{
"text": pa.array(["a", "b", "cat", "dog"]),
"vector": pa.array(
[[0.1, 0.1], [2, 2], [-0.1, -0.1], [0.5, -0.5]],
type=pa.list_(pa.float32(), list_size=2),
),
}
)
table = db.create_table("test", data)
table.create_fts_index("text", with_position=False, use_tantivy=False)
return table
@pytest_asyncio.fixture
@@ -102,6 +121,42 @@ async def test_async_hybrid_query_default_limit(table: AsyncTable):
assert texts.count("a") == 1
def test_hybrid_query_distance_range(sync_table: Table):
reranker = RRFReranker(return_score="all")
result = (
sync_table.search(query_type="hybrid")
.vector([0.0, 0.4])
.text("cat and dog")
.distance_range(lower_bound=0.2, upper_bound=0.5)
.rerank(reranker)
.limit(2)
.to_arrow()
)
assert len(result) == 2
print(result)
for dist in result["_distance"]:
if dist.is_valid:
assert 0.2 <= dist.as_py() <= 0.5
@pytest.mark.asyncio
async def test_hybrid_query_distance_range_async(table: AsyncTable):
reranker = RRFReranker(return_score="all")
result = await (
table.query()
.nearest_to([0.0, 0.4])
.nearest_to_text("cat and dog")
.distance_range(lower_bound=0.2, upper_bound=0.5)
.rerank(reranker)
.limit(2)
.to_arrow()
)
assert len(result) == 2
for dist in result["_distance"]:
if dist.is_valid:
assert 0.2 <= dist.as_py() <= 0.5
@pytest.mark.asyncio
async def test_explain_plan(table: AsyncTable):
plan = await (

View File

@@ -652,6 +652,11 @@ impl HybridQuery {
self.inner_vec.bypass_vector_index();
}
#[pyo3(signature = (lower_bound=None, upper_bound=None))]
pub fn distance_range(&mut self, lower_bound: Option<f32>, upper_bound: Option<f32>) {
self.inner_vec.distance_range(lower_bound, upper_bound);
}
pub fn to_vector_query(&mut self) -> PyResult<VectorQuery> {
Ok(VectorQuery {
inner: self.inner_vec.inner.clone(),

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-node"
version = "0.19.0-beta.9"
version = "0.19.0-beta.11"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.19.0-beta.9"
version = "0.19.0-beta.11"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -81,7 +81,7 @@ impl ListingCatalogOptionsBuilder {
/// [`crate::database::listing::ListingDatabase`]
#[derive(Debug)]
pub struct ListingCatalog {
object_store: ObjectStore,
object_store: Arc<ObjectStore>,
uri: String,
@@ -105,7 +105,7 @@ impl ListingCatalog {
}
async fn open_path(path: &str) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_path(path).unwrap();
let (object_store, base_path) = ObjectStore::from_uri(path).await.unwrap();
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}

View File

@@ -201,7 +201,7 @@ impl ListingDatabaseOptionsBuilder {
/// We will have two tables named `table1` and `table2`.
#[derive(Debug)]
pub struct ListingDatabase {
object_store: ObjectStore,
object_store: Arc<ObjectStore>,
query_string: Option<String>,
pub(crate) uri: String,

View File

@@ -20,7 +20,7 @@ pub async fn wait_for_index(
) -> Result<()> {
if timeout > MAX_WAIT {
return Err(Error::InvalidInput {
message: format!("timeout must be less than {:?}", MAX_WAIT).to_string(),
message: format!("timeout must be less than {:?}", MAX_WAIT),
});
}
let start = Instant::now();
@@ -84,7 +84,6 @@ pub async fn wait_for_index(
message: format!(
"timed out waiting for indices: {:?} after {:?}",
remaining, timeout
)
.to_string(),
),
})
}

View File

@@ -8,6 +8,7 @@
pub(crate) mod client;
pub(crate) mod db;
mod retry;
pub(crate) mod table;
pub(crate) mod util;

View File

@@ -1,17 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use http::HeaderName;
use log::debug;
use reqwest::{
header::{HeaderMap, HeaderValue},
Request, RequestBuilder, Response,
Body, Request, RequestBuilder, Response,
};
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use crate::error::{Error, Result};
use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
@@ -118,41 +118,14 @@ pub struct RetryConfig {
/// You can also set the `LANCE_CLIENT_RETRY_STATUSES` environment variable
/// to set this value. Use a comma-separated list of integer values.
///
/// The default is 429, 500, 502, 503.
/// Note that write operations will never be retried on 5xx errors as this may
/// result in duplicated writes.
///
/// The default is 409, 429, 500, 502, 503, 504.
pub statuses: Option<Vec<u16>>,
// TODO: should we allow customizing methods?
}
#[derive(Debug, Clone)]
struct ResolvedRetryConfig {
retries: u8,
connect_retries: u8,
read_retries: u8,
backoff_factor: f32,
backoff_jitter: f32,
statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![429, 500, 502, 503])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}
// We use the `HttpSend` trait to abstract over the `reqwest::Client` so that
// we can mock responses in tests. Based on the patterns from this blog post:
// https://write.as/balrogboogie/testing-reqwest-based-clients
@@ -160,8 +133,8 @@ impl TryFrom<RetryConfig> for ResolvedRetryConfig {
pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
client: reqwest::Client,
host: String,
retry_config: ResolvedRetryConfig,
sender: S,
pub(crate) retry_config: ResolvedRetryConfig,
pub(crate) sender: S,
}
pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static {
@@ -375,74 +348,69 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.client.post(full_uri)
}
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
pub async fn send(&self, req: RequestBuilder) -> Result<(String, Response)> {
let (client, request) = req.build_split();
let mut request = request.unwrap();
let request_id = self.extract_request_id(&mut request);
self.log_request(&request, &request_id);
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
let header = HeaderValue::from_str(&request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
request_id
};
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
if with_retry {
self.send_with_retry_impl(client, request, request_id).await
} else {
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
async fn send_with_retry_impl(
/// Send the request using retries configured in the RetryConfig.
/// If retry_5xx is false, 5xx requests will not be retried regardless of the statuses configured
/// in the RetryConfig.
/// Since this requires arrow serialization, this is implemented here instead of in RestfulLanceDbClient
pub async fn send_with_retry(
&self,
client: reqwest::Client,
req: Request,
request_id: String,
req_builder: RequestBuilder,
mut make_body: Option<Box<dyn FnMut() -> Result<Body> + Send + 'static>>,
retry_5xx: bool,
) -> Result<(String, Response)> {
let mut retry_counter = RetryCounter::new(&self.retry_config, request_id);
let retry_config = &self.retry_config;
let non_5xx_statuses = retry_config
.statuses
.iter()
.filter(|s| !s.is_server_error())
.cloned()
.collect::<Vec<_>>();
// clone and build the request to extract the request id
let tmp_req = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let (_, r) = tmp_req.build_split();
let mut r = r.unwrap();
let request_id = self.extract_request_id(&mut r);
let mut retry_counter = RetryCounter::new(retry_config, request_id.clone());
loop {
// This only works if the request body is not a stream. If it is
// a stream, we can't use the retry path. We would need to implement
// an outer retry.
let request = req.try_clone().ok_or_else(|| Error::Runtime {
let mut req_builder = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let response = self
.sender
.send(&client, request)
.await
.map(|r| (r.status(), r));
// set the streaming body on the request builder after clone
if let Some(body_gen) = make_body.as_mut() {
let body = body_gen()?;
req_builder = req_builder.body(body);
}
let (c, request) = req_builder.build_split();
let mut request = request.unwrap();
self.set_request_id(&mut request, &request_id.clone());
self.log_request(&request, &request_id);
let response = self.sender.send(&c, request).await.map(|r| (r.status(), r));
match response {
Ok((status, response)) if status.is_success() => {
debug!(
@@ -451,7 +419,10 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
return Ok((retry_counter.request_id, response));
}
Ok((status, response)) if self.retry_config.statuses.contains(&status) => {
Ok((status, response))
if (retry_5xx && retry_config.statuses.contains(&status))
|| non_5xx_statuses.contains(&status) =>
{
let source = self
.check_response(&retry_counter.request_id, response)
.await
@@ -480,6 +451,47 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
}
}
fn log_request(&self, request: &Request, request_id: &String) {
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
}
/// Extract the request ID from the request headers.
/// If the request ID header is not set, this will generate a new one and set
/// it on the request headers
pub fn extract_request_id(&self, request: &mut Request) -> String {
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
self.set_request_id(request, &request_id);
request_id
};
request_id
}
/// Set the request ID header
pub fn set_request_id(&self, request: &mut Request, request_id: &str) {
let header = HeaderValue::from_str(request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
}
pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> {
// Try to get the response text, but if that fails, just return the status code
let status = response.status();
@@ -501,91 +513,6 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
}
}
struct RetryCounter<'a> {
request_failures: u8,
connect_failures: u8,
read_failures: u8,
config: &'a ResolvedRetryConfig,
request_id: String,
}
impl<'a> RetryCounter<'a> {
fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
fn increment_request_failures(&mut self, source: crate::Error) -> Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_connect_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_read_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
pub trait RequestResultExt {
type Output;
fn err_to_http(self, request_id: String) -> Result<Self::Output>;

View File

@@ -255,7 +255,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
if let Some(start_after) = request.start_after {
req = req.query(&[("page_token", start_after)]);
}
let (request_id, rsp) = self.client.send(req, true).await?;
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp
@@ -302,7 +302,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, rsp) = self.client.send(req, false).await?;
let (request_id, rsp) = self.client.send(req).await?;
if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await.err_to_http(request_id.clone())?;
@@ -362,7 +362,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, rsp) = self.client.send(req, true).await?;
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: request.name });
}
@@ -383,7 +383,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.client
.post(&format!("/v1/table/{}/rename/", current_name));
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req, false).await?;
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
let table = self.table_cache.remove(current_name).await;
if let Some(table) = table {
@@ -394,7 +394,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn drop_table(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/table/{}/drop/", name));
let (request_id, resp) = self.client.send(req, true).await?;
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(name).await;
Ok(())

View File

@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::remote::RetryConfig;
use crate::Error;
use log::debug;
use std::time::Duration;
pub struct RetryCounter<'a> {
pub request_failures: u8,
pub connect_failures: u8,
pub read_failures: u8,
pub config: &'a ResolvedRetryConfig,
pub request_id: String,
}
impl<'a> RetryCounter<'a> {
pub(crate) fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> crate::Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
pub fn increment_request_failures(&mut self, source: crate::Error) -> crate::Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_connect_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_read_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
#[derive(Debug, Clone)]
pub struct ResolvedRetryConfig {
pub retries: u8,
pub connect_retries: u8,
pub read_retries: u8,
pub backoff_factor: f32,
pub backoff_jitter: f32,
pub statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> crate::Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![409, 429, 500, 502, 503, 504])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}

View File

@@ -7,7 +7,7 @@ use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error, Table};
use arrow_array::RecordBatchReader;
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
@@ -21,6 +21,7 @@ use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec};
use reqwest::{RequestBuilder, Response};
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use std::pin::Pin;
@@ -83,7 +84,7 @@ impl<S: HttpSend> RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -127,6 +128,61 @@ impl<S: HttpSend> RemoteTable<S> {
Ok(reqwest::Body::wrap_stream(body_stream))
}
/// Buffer the reader into memory
async fn buffer_reader<R: RecordBatchReader + ?Sized>(
reader: &mut R,
) -> Result<(SchemaRef, Vec<RecordBatch>)> {
let schema = reader.schema();
let mut batches = Vec::new();
for batch in reader {
batches.push(batch?);
}
Ok((schema, batches))
}
/// Create a new RecordBatchReader from buffered data
fn make_reader(schema: SchemaRef, batches: Vec<RecordBatch>) -> impl RecordBatchReader {
let iter = batches.into_iter().map(Ok);
RecordBatchIterator::new(iter, schema)
}
async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
let res = if with_retry {
self.client.send_with_retry(req, None, true).await?
} else {
self.client.send(req).await?
};
Ok(res)
}
/// Send the request with streaming body.
/// This will use retries if with_retry is set and the number of configured retries is > 0.
/// If retries are enabled, the stream will be buffered into memory.
async fn send_streaming(
&self,
req: RequestBuilder,
mut data: Box<dyn RecordBatchReader + Send>,
with_retry: bool,
) -> Result<(String, Response)> {
if !with_retry || self.client.retry_config.retries == 0 {
let body = Self::reader_as_body(data)?;
return self.client.send(req.body(body)).await;
}
// to support retries, buffer into memory and clone the batches on each retry
let (schema, batches) = Self::buffer_reader(&mut *data).await?;
let make_body = Box::new(move || {
let reader = Self::make_reader(schema.clone(), batches.clone());
Self::reader_as_body(Box::new(reader))
});
let res = self
.client
.send_with_retry(req, Some(make_body), false)
.await?;
Ok(res)
}
async fn check_table_response(
&self,
request_id: &str,
@@ -168,7 +224,8 @@ impl<S: HttpSend> RemoteTable<S> {
}
// Server requires k.
let limit = params.limit.unwrap_or(usize::MAX);
// use isize::MAX as usize to avoid overflow: https://github.com/lancedb/lancedb/issues/2211
let limit = params.limit.unwrap_or(isize::MAX as usize);
body["k"] = serde_json::Value::Number(serde_json::Number::from(limit));
if let Some(filter) = &params.filter {
@@ -353,7 +410,7 @@ impl<S: HttpSend> RemoteTable<S> {
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures);
@@ -471,7 +528,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
self.checkout_latest().await?;
Ok(())
@@ -481,7 +538,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.name));
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
@@ -527,7 +584,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request = request.json(&body);
}
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -545,12 +602,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let body = Self::reader_as_body(data)?;
let mut request = self
.client
.post(&format!("/v1/table/{}/insert/", self.name))
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
match add.mode {
AddDataMode::Append => {}
@@ -559,8 +614,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send_streaming(request, data, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
@@ -628,7 +682,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect::<Vec<_>>();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -670,7 +724,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -712,7 +766,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"predicate": update.filter,
}));
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
@@ -726,7 +780,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/delete/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -812,7 +866,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = request.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
@@ -836,21 +890,21 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let query = MergeInsertRequest::try_from(params)?;
let body = Self::reader_as_body(new_data)?;
let request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.name))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> {
self.check_mutable().await?;
Err(Error::NotSupported {
@@ -879,7 +933,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/add_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?; // todo:
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -918,7 +972,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/alter_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -930,7 +984,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/drop_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -944,7 +998,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
@@ -1001,7 +1055,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
if response.status() == StatusCode::NOT_FOUND {
return Ok(None);
@@ -1011,7 +1065,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = response.text().await.err_to_http(request_id.clone())?;
println!("body: {:?}", body);
let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse index statistics: {}", e).into(),
request_id,
@@ -1026,7 +1079,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"/v1/table/{}/index/{}/drop/",
self.name, index_name
));
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -1487,6 +1540,42 @@ mod tests {
assert_eq!(&body, &expected_body);
}
#[tokio::test]
async fn test_merge_insert_retries_on_409() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
// Default parameters
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/merge_insert/");
let params = request.url().query_pairs().collect::<HashMap<_, _>>();
assert_eq!(params["on"], "some_col");
assert_eq!(params["when_matched_update_all"], "false");
assert_eq!(params["when_not_matched_insert_all"], "false");
assert_eq!(params["when_not_matched_by_source_delete"], "false");
assert!(!params.contains_key("when_matched_update_all_filt"));
assert!(!params.contains_key("when_not_matched_by_source_delete_filt"));
http::Response::builder().status(409).body("").unwrap()
});
let e = table
.merge_insert(&["some_col"])
.execute(data)
.await
.unwrap_err();
assert!(e.to_string().contains("Hit retry limit"));
}
#[tokio::test]
async fn test_delete() {
let table = Table::new_with_handler("my_table", |request| {
@@ -1528,7 +1617,7 @@ mod tests {
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
let expected_body = serde_json::json!({
"k": usize::MAX,
"k": isize::MAX as usize,
"prefilter": true,
"vector": [], // Empty vector means no vector query.
"version": null,