Compare commits

...

53 Commits

Author SHA1 Message Date
Lance Release
a8b5ad7e74 Bump version: 0.22.0-beta.12 → 0.22.0 2025-04-25 21:16:07 +00:00
Lance Release
f8f6264883 Bump version: 0.22.0-beta.11 → 0.22.0-beta.12 2025-04-25 21:16:07 +00:00
Will Jones
d8517117f1 feat: upgrade Lance to v0.26.0 (#2359)
Upstream changelog:
https://github.com/lancedb/lance/releases/tag/v0.26.0

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Updated dependency management to use published crate versions for
improved reliability and maintainability.
- Added a temporary workaround for build issues by pinning a specific
version of a dependency.
- **Refactor**
- Improved resource management and concurrency by updating internal
ownership models for object storage components.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-25 13:59:12 -07:00
Lance Release
ab66dd5ed2 Updating package-lock.json 2025-04-25 06:04:06 +00:00
Lance Release
cbb9a7877c Updating package-lock.json 2025-04-25 05:02:47 +00:00
Lance Release
b7fc223535 Updating package-lock.json 2025-04-25 05:02:32 +00:00
Lance Release
1fdaf7a1a4 Bump version: 0.19.0-beta.10 → 0.19.0-beta.11 2025-04-25 05:02:16 +00:00
Lance Release
d11819c90c Bump version: 0.22.0-beta.10 → 0.22.0-beta.11 2025-04-25 05:01:57 +00:00
BubbleCal
9b902272f1 fix: sync hybrid search ignores the distance range params (#2356)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added support for distance range filtering in hybrid vector queries,
allowing users to specify lower and upper bounds for search results.

- **Tests**
- Introduced new tests to validate distance range filtering and
reranking in both synchronous and asynchronous hybrid query scenarios.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-04-25 13:01:22 +08:00
Will Jones
8c0622fa2c fix: remote limit to avoid "Limit must be non-negative" (#2354)
To workaround this issue: https://github.com/lancedb/lancedb/issues/2211

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Improved handling of large query parameters to prevent potential
overflow issues when using the "k" parameter in queries.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-24 15:04:06 -07:00
Philip Meier
2191f948c3 fix: add missing pydantic model config compat (#2316)
Fixes #2315.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Refactor**
- Enhanced query processing to maintain smooth functionality across
different dependency versions, ensuring improved stability and
performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 14:46:10 -07:00
Will Jones
acc3b03004 ci: fix docs deploy (#2351)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Improved CI workflow for documentation builds by optimizing Rust build
settings and updating the runner environment.
  - Fixed a typo in a workflow step name.
- Streamlined caching steps to reduce redundancy and improve efficiency.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 13:55:34 -07:00
Lance Release
7f091b8c8e Updating package-lock.json 2025-04-22 19:16:43 +00:00
Lance Release
c19bdd9a24 Updating package-lock.json 2025-04-22 18:24:16 +00:00
Lance Release
dad0ff5cd2 Updating package-lock.json 2025-04-22 18:23:59 +00:00
Lance Release
a705621067 Bump version: 0.19.0-beta.9 → 0.19.0-beta.10 2025-04-22 18:23:39 +00:00
Lance Release
39614fdb7d Bump version: 0.22.0-beta.9 → 0.22.0-beta.10 2025-04-22 18:23:17 +00:00
Ryan Green
96d534d4bc feat: add retries to remote client for requests with stream bodies (#2349)
Closes https://github.com/lancedb/lancedb/issues/2307
* Adds retries to remote operations with stream bodies (add,
merge_insert)
* Change default retryable status codes to 409, 429, 500, 502, 503, 504
* Don't retry add or merge_insert operations on 5xx responses

Notes:
* Supporting retries on stream bodies means we have to buffer the body
into memory so it can be cloned on retry. This will impact memory use
patterns for the remote client. This buffering can be disabled by
disabling retries (i.e. setting retries to 0 in RetryConfig)
* It does not seem that retry config can be specified by env vars as the
documentation suggests. I added a follow-up issue
[here](https://github.com/lancedb/lancedb/issues/2350)



<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Enhanced retry support for remote requests with configurable limits
and exponential backoff with jitter.
- Added robust retry logic for streaming data uploads, enabling retries
with buffered data to ensure reliability.

- **Bug Fixes**
- Improved error handling and retry behavior for HTTP status codes 409
and 504.

- **Refactor**
- Centralized and modularized HTTP request sending and retry logic
across remote database and table operations.
  - Streamlined request ID management for improved traceability.
- Simplified error message construction in index waiting functionality.

- **Tests**
  - Added a test verifying merge-insert retries on HTTP 409 responses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-22 15:40:44 -02:30
Lance Release
5051d30d09 Updating package-lock.json 2025-04-21 23:55:43 +00:00
Lance Release
db853c4041 Updating package-lock.json 2025-04-21 22:50:56 +00:00
Lance Release
76d1d22bdc Updating package-lock.json 2025-04-21 22:50:40 +00:00
Lance Release
d8746c61c6 Bump version: 0.19.0-beta.8 → 0.19.0-beta.9 2025-04-21 22:50:20 +00:00
Lance Release
1a66df2627 Bump version: 0.22.0-beta.8 → 0.22.0-beta.9 2025-04-21 22:49:59 +00:00
Will Jones
44670076c1 fix: move timeout to avoid retries (#2347)
I added a timeout to query execution options in
https://github.com/lancedb/lancedb/pull/2288. However, this was send to
the request timeout, but the retry implementation is unaware of this
timeout. So once the query timed out, a retry would be triggered.
Instead, this PR changes it so the timeout happens outside the retry
loop.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Bug Fixes**
- Improved query timeout handling to provide clearer error messages and
more reliable cancellation if a query takes too long to complete.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-21 14:27:04 -07:00
Will Jones
92f0b16e46 fix(python): make sure pandas is optional (#2346)
Fixes #2344


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Tests**
- Updated tests to use PyArrow Tables instead of pandas DataFrames where
possible, reducing reliance on pandas.
- Tests that require pandas are now automatically skipped if pandas is
not installed.
- **Chores**
- Improved workflow to uninstall both pylance and pandas in a specific
test step.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-21 13:42:13 -07:00
Eileen Noonan
1620ba3508 docs: make table.update() nodejs guide consistent with API documentation (#2334)
The docs in the Guide here do not match the [API reference]
(https://lancedb.github.io/lancedb/js/classes/Table/#updateopts) for the
nodejs client.

I am writing an Elixir wrapper over the typescript library (Rust
forthcoming!) and confirmed in testing that the API reference is correct
vs the Guide.

Following the Guide docs, the error I got was:

"lance error: Invalid user input: Schema error: No field named bar.
Valid fields are foo. For a query of:

await table.update({foo: "buzz"}, { where: "foo = 'bar'"});
Over a table with a schema of just {foo: Utf8}.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Documentation**
- Reformatted a code snippet in the guide to enhance readability by
splitting it into multiple lines for improved clarity.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-21 08:38:16 -07:00
Ryan Green
3ae90dde80 feat: add new table API to wait for async indexing (#2338)
* Add new wait_for_index() table operation that polls until indices are
created/fully indexed
* Add an optional wait timeout parameter to all create_index operations
* Python and NodeJS interfaces

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
- Added optional waiting for index creation completion with configurable
timeout.
- Introduced methods to poll and wait for indices to be fully built
across sync and async tables.
  - Extended index creation APIs to accept a wait timeout parameter.
- **Bug Fixes**
- Added a new timeout error variant for improved error reporting on
index operations.
- **Tests**
- Added tests covering successful index readiness waiting, timeout
scenarios, and missing index cases.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-21 08:41:21 -02:30
Magnus
4f07fea6df feat: add ColPali embedding support with MultiVector type (#2170)
This PR adds ColPali support with ColPaliEmbeddings class (tagged
"colpali") using ColQwen2.5 for multi-vector text/image embeddings. Also
added MultiVector Pydantic type to handle the vector lists.

I've added some integration test for the embedding model and some unit
test for the new Pydantic type. Could be a template for other ColPali
variants as well. or until transformers🤗 starts supporting it.


Still `TODO`:

- [ ] Documentation
- [ ] Add an example

_Could also allow Image as query, but didn't work well when testing it._

[ColPali-Engine](https://github.com/illuin-tech/colpali) version:
0.3.9.dev17+g3faee24

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced support for ColPali-based multimodal multi-vector
embeddings for both text and images.
- Added a new embedding class for generating multi-vector embeddings,
configurable for various model and processing options.
- Added a new Pydantic type for multi-vector embeddings, supporting
validation and schema generation for lists of fixed-dimension vectors.

- **Bug Fixes**
- Ensured proper asynchronous index creation in query tests for improved
reliability.

- **Tests**
- Added integration tests for ColPali embeddings, including
text-to-image search and validation of multi-vector fields.
- Added comprehensive tests for the new multi-vector Pydantic type,
covering schema, validation, and default value behavior.

- **Chores**
  - Updated optional dependencies to include the ColPali engine.
  - Added utility to check for availability of flash attention support.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-21 11:47:37 +08:00
Lance Release
3d7d82cf86 Updating package-lock.json 2025-04-17 23:13:37 +00:00
Lance Release
edc4e40a7b Updating package-lock.json 2025-04-17 22:16:36 +00:00
Lance Release
ca3806a02f Updating package-lock.json 2025-04-17 22:16:20 +00:00
Lance Release
35cff12e31 Bump version: 0.19.0-beta.7 → 0.19.0-beta.8 2025-04-17 22:16:02 +00:00
Lance Release
c6c20cb2bd Bump version: 0.22.0-beta.7 → 0.22.0-beta.8 2025-04-17 22:15:46 +00:00
Weston Pace
26080ee4c1 feat: add prewarm_index function (#2342)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Added the ability to prewarm (load into memory) table indexes via new
methods in Python, Node.js, and Rust APIs, potentially reducing
cold-start query latency.
- **Bug Fixes**
- Ensured prewarming an index does not interfere with subsequent search
operations.
- **Tests**
- Introduced new test cases to verify full-text search index creation,
prewarming, and search functionalities in both Python and Node.js.
- **Chores**
  - Updated dependencies for improved compatibility and performance.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Lu Qiu <luqiujob@gmail.com>
2025-04-17 15:14:36 -07:00
Guspan Tanadi
ef3a2b5357 docs: intended path relative links (#2321)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Documentation**
- Updated the link in the documentation to correctly reference the
workflow file, ensuring accurate navigation from the current context.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

Signed-off-by: Guspan Tanadi <36249910+guspan-tanadi@users.noreply.github.com>
2025-04-16 13:12:09 -07:00
Adam Azzam
c42a201389 docs: remove trailing commas from AWS IAM Policies (#2324)
Before:

<img width="1173" alt="Screenshot 2025-04-08 at 10 58 50 AM"
src="https://github.com/user-attachments/assets/e5c69c45-ab68-488f-9c7f-e12f7ecbfaab"
/>

After:
<img width="1136" alt="Screenshot 2025-04-08 at 10 58 58 AM"
src="https://github.com/user-attachments/assets/108c11ea-09b3-49b5-9a50-b880e72a0270"
/>


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Documentation**
- Updated JSON policy examples in the storage guides to correct
formatting issues and enhance syntax clarity for readers.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-16 13:09:21 -07:00
Lance Release
24e42ccd4d Updating package-lock.json 2025-04-15 05:29:37 +00:00
Lance Release
8a50944061 Updating package-lock.json 2025-04-15 04:11:16 +00:00
Lance Release
40e066bc7c Updating package-lock.json 2025-04-15 04:11:00 +00:00
Lance Release
b3ad105fa0 Bump version: 0.19.0-beta.6 → 0.19.0-beta.7 2025-04-15 04:10:43 +00:00
Lance Release
6e701d3e1b Bump version: 0.22.0-beta.6 → 0.22.0-beta.7 2025-04-15 04:10:26 +00:00
BubbleCal
2248aa9508 fix: bugs for new FTS APIs (#2314)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced full-text search capabilities with support for phrase
queries, fuzzy matching, boosting, and multi-column matching.
- Search methods now accept full-text query objects directly, improving
query flexibility and precision.
- Python and JavaScript SDKs updated to handle full-text queries
seamlessly, including async search support.

- **Tests**
- Added comprehensive tests covering fuzzy search, phrase search, and
boosted queries to ensure robust full-text search functionality.

- **Documentation**
- Updated query class documentation to reflect new constructor options
and removal of deprecated methods for clarity and simplicity.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-04-15 11:51:35 +08:00
PhorstenkampFuzzy
a6fa69ab89 fix(python): add pylance as its own optional dependency (#2336)
This change allows to centrally manage the plance depndency without
everybody needing to monitor for compatibility manually.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced an optional dependency that enhances development support.
Users can now benefit from improved static analysis capabilities when
installing the recommended version (0.23.2 or later).

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-14 09:28:16 -07:00
Will Jones
b3a4efd587 fix: revert change default read_consistency_interval=5s (#2327)
This reverts commit a547c523c2 or #2281

The current implementation can cause panics and performance degradation.
I will bring this back with more testing in
https://github.com/lancedb/lancedb/pull/2311

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Documentation**
- Enhanced clarity on read consistency settings with updated
descriptions and default behavior.
- Removed outdated warnings about eventual consistency from the
troubleshooting guide.

- **Refactor**
- Streamlined the handling of the read consistency interval across
integrations, now defaulting to "None" for improved performance.
  - Simplified internal logic to offer a more consistent experience.

- **Tests**
- Updated test expectations to reflect the new default representation
for the read consistency interval.
- Removed redundant tests related to "no consistency" settings for
streamlined testing.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
2025-04-14 08:48:15 -07:00
Lei Xu
4708b60bb1 chore: cargo update on main (#2331)
Fix test failures on main
2025-04-12 09:00:47 -05:00
Lei Xu
080ea2f9a4 chore: fix 1.86 warnings (#2312)
Fix rust 1.86 warnings
2025-04-12 08:29:10 -05:00
Ayush Chaurasia
32fdde23f8 fix: robust handling of empty result when reranking (#2313)
I found some edge cases while running experiments that - depending on
the base reranking libraries, some of them don't handle empty lists
well. This PR manually checks if the result set to be reranked is empty

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Bug Fixes**
- Enhanced search result processing by ensuring that reordering only
occurs when valid, non-empty results are available, thereby preventing
unnecessary operations and potential errors.

- **Tests**
- Added automated tests to verify that empty search result sets are
handled correctly, ensuring consistent behavior across various
rerankers.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-04-09 16:26:05 +05:30
Lance Release
c44e5c046c Updating package-lock.json 2025-04-08 07:01:33 +00:00
Lance Release
f23aa0a793 Updating package-lock.json 2025-04-08 06:17:03 +00:00
Lance Release
83fc2b1851 Updating package-lock.json 2025-04-08 06:16:48 +00:00
Lance Release
56aa133ee6 Bump version: 0.19.0-beta.5 → 0.19.0-beta.6 2025-04-08 06:16:30 +00:00
Lance Release
27d9e5c596 Bump version: 0.22.0-beta.5 → 0.22.0-beta.6 2025-04-08 06:16:14 +00:00
BubbleCal
ec8271931f feat: support to create FTS index on list of strings (#2317)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Chores**
- Updated internal library dependencies to the latest beta version for
improved system stability.
- **Tests**
- Added automated tests to validate full-text search functionality on
list-based text fields.
- **Refactor**
- Enhanced the search processing logic to provide robust support for
list-type text data, ensuring more reliable results.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-04-08 14:12:35 +08:00
101 changed files with 2734 additions and 1282 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.19.0-beta.5"
current_version = "0.19.0-beta.11"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -18,17 +18,24 @@ concurrency:
group: "pages"
cancel-in-progress: true
env:
# This reduces the disk space needed for the build
RUSTFLAGS: "-C debuginfo=0"
# according to: https://matklad.github.io/2021/09/04/fast-rust-builds.html
# CI builds are faster with incremental disabled.
CARGO_INCREMENTAL: "0"
jobs:
# Single deploy job since we're just deploying
build:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: buildjet-8vcpu-ubuntu-2204
runs-on: ubuntu-24.04
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install dependecies needed for ubuntu
- name: Install dependencies needed for ubuntu
run: |
sudo apt install -y protobuf-compiler libssl-dev
rustup update && rustup default
@@ -38,6 +45,7 @@ jobs:
python-version: "3.10"
cache: "pip"
cache-dependency-path: "docs/requirements.txt"
- uses: Swatinem/rust-cache@v2
- name: Build Python
working-directory: python
run: |
@@ -49,7 +57,6 @@ jobs:
node-version: 20
cache: 'npm'
cache-dependency-path: node/package-lock.json
- uses: Swatinem/rust-cache@v2
- name: Install node dependencies
working-directory: node
run: |

View File

@@ -136,9 +136,9 @@ jobs:
- uses: ./.github/workflows/run_tests
with:
integration: true
- name: Test without pylance
- name: Test without pylance or pandas
run: |
pip uninstall -y pylance
pip uninstall -y pylance pandas
pytest -vv python/tests/test_table.py
# Make sure wheels are not included in the Rust cache
- name: Delete wheels

436
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -21,16 +21,14 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.25.3", "features" = [
"dynamodb",
], tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-io = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-index = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-linalg = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-table = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-testing = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-datafusion = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance-encoding = { version = "=0.25.3", tag = "v0.25.3-beta.2", git = "https://github.com/lancedb/lance" }
lance = { "version" = "=0.26.0", "features" = ["dynamodb"] }
lance-io = "=0.26.0"
lance-index = "=0.26.0"
lance-linalg = "=0.26.0"
lance-table = "=0.26.0"
lance-testing = "=0.26.0"
lance-datafusion = "=0.26.0"
lance-encoding = "=0.26.0"
# Note that this one does not include pyarrow
arrow = { version = "54.1", optional = false }
arrow-array = "54.1"

View File

@@ -2,7 +2,7 @@
LanceDB docs are deployed to https://lancedb.github.io/lancedb/.
Docs is built and deployed automatically by [Github Actions](.github/workflows/docs.yml)
Docs is built and deployed automatically by [Github Actions](../.github/workflows/docs.yml)
whenever a commit is pushed to the `main` branch. So it is possible for the docs to show
unreleased features.

View File

@@ -342,7 +342,7 @@ For **read and write access**, LanceDB will need a policy such as:
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
},
@@ -374,7 +374,7 @@ For **read-only access**, LanceDB will need a policy such as:
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
},

View File

@@ -765,7 +765,10 @@ This can be used to update zero to all rows depending on how many rows match the
];
const tbl = await db.createTable("my_table", data)
await tbl.update({vector: [10, 10]}, { where: "x = 2"})
await tbl.update({
values: { vector: [10, 10] },
where: "x = 2"
});
```
=== "vectordb (deprecated)"
@@ -784,7 +787,10 @@ This can be used to update zero to all rows depending on how many rows match the
];
const tbl = await db.createTable("my_table", data)
await tbl.update({ where: "x = 2", values: {vector: [10, 10]} })
await tbl.update({
where: "x = 2",
values: { vector: [10, 10] }
});
```
#### Updating using a sql query
@@ -1001,11 +1007,9 @@ In LanceDB OSS, users can set the `read_consistency_interval` parameter on conne
There are three possible settings for `read_consistency_interval`:
1. **Unset**: The database does not check for updates to tables made by other processes. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS. For best performance, combine this setting with the storage option `new_table_enable_v2_manifest_paths` set to `true`.
3. **Custom interval (Eventual consistency, the default)**: The database checks for updates at a custom interval. By default, this is every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
You can always force a synchronization by calling `checkout_latest()` / `checkoutLatest()` on a table.
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
!!! tip "Consistency in LanceDB Cloud"
@@ -1043,21 +1047,7 @@ You can always force a synchronization by calling `checkout_latest()` / `checkou
--8<-- "python/python/tests/docs/test_guide_tables.py:table_async_eventual_consistency"
```
For no consistency, use `None`:
=== "Sync API"
```python
--8<-- "python/python/tests/docs/test_guide_tables.py:table_no_consistency"
```
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_guide_tables.py:table_async_no_consistency"
```
To manually check for updates you can use `checkout_latest`:
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
=== "Sync API"
@@ -1075,25 +1065,15 @@ You can always force a synchronization by calling `checkout_latest()` / `checkou
To set strong consistency, use `0`:
```ts
--8<-- "nodejs/examples/basic.test.ts:table_strong_consistency"
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
const tbl = await db.openTable("my_table");
```
For eventual consistency, specify the update interval as seconds:
```ts
--8<-- "nodejs/examples/basic.test.ts:table_eventual_consistency"
```
For no consistency, use `null`:
```ts
--8<-- "nodejs/examples/basic.test.ts:table_no_consistency"
```
To manually check for updates you can use `checkoutLatest`:
```ts
--8<-- "nodejs/examples/basic.test.ts:table_checkout_latest"
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
const tbl = await db.openTable("my_table");
```
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007

View File

@@ -22,10 +22,13 @@ including methods to retrieve the query type and convert the query to a dictiona
new BoostQuery(
positive,
negative,
negativeBoost): BoostQuery
options?): BoostQuery
```
Creates an instance of BoostQuery.
The boost returns documents that match the positive query,
but penalizes those that match the negative query.
the penalty is controlled by the `negativeBoost` parameter.
#### Parameters
@@ -35,8 +38,11 @@ Creates an instance of BoostQuery.
* **negative**: [`FullTextQuery`](../interfaces/FullTextQuery.md)
The negative query that reduces the relevance score.
* **negativeBoost**: `number`
The factor by which the negative query reduces the score.
* **options?**
Optional parameters for the boost query.
- `negativeBoost`: The boost factor for the negative query (default is 0.0).
* **options.negativeBoost?**: `number`
#### Returns
@@ -50,6 +56,8 @@ Creates an instance of BoostQuery.
queryType(): FullTextQueryType
```
The type of the full-text query.
#### Returns
[`FullTextQueryType`](../enumerations/FullTextQueryType.md)
@@ -57,19 +65,3 @@ queryType(): FullTextQueryType
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`queryType`](../interfaces/FullTextQuery.md#querytype)
***
### toDict()
```ts
toDict(): Record<string, unknown>
```
#### Returns
`Record`&lt;`string`, `unknown`&gt;
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`toDict`](../interfaces/FullTextQuery.md#todict)

View File

@@ -22,9 +22,7 @@ including methods to retrieve the query type and convert the query to a dictiona
new MatchQuery(
query,
column,
boost,
fuzziness,
maxExpansions): MatchQuery
options?): MatchQuery
```
Creates an instance of MatchQuery.
@@ -37,14 +35,17 @@ Creates an instance of MatchQuery.
* **column**: `string`
The name of the column to search within.
* **boost**: `number` = `1.0`
(Optional) The boost factor to influence the relevance score of this query. Default is `1.0`.
* **options?**
Optional parameters for the match query.
- `boost`: The boost factor for the query (default is 1.0).
- `fuzziness`: The fuzziness level for the query (default is 0).
- `maxExpansions`: The maximum number of terms to consider for fuzzy matching (default is 50).
* **fuzziness**: `number` = `0`
(Optional) The allowed edit distance for fuzzy matching. Default is `0`.
* **options.boost?**: `number`
* **maxExpansions**: `number` = `50`
(Optional) The maximum number of terms to consider for fuzzy matching. Default is `50`.
* **options.fuzziness?**: `number`
* **options.maxExpansions?**: `number`
#### Returns
@@ -58,6 +59,8 @@ Creates an instance of MatchQuery.
queryType(): FullTextQueryType
```
The type of the full-text query.
#### Returns
[`FullTextQueryType`](../enumerations/FullTextQueryType.md)
@@ -65,19 +68,3 @@ queryType(): FullTextQueryType
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`queryType`](../interfaces/FullTextQuery.md#querytype)
***
### toDict()
```ts
toDict(): Record<string, unknown>
```
#### Returns
`Record`&lt;`string`, `unknown`&gt;
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`toDict`](../interfaces/FullTextQuery.md#todict)

View File

@@ -22,7 +22,7 @@ including methods to retrieve the query type and convert the query to a dictiona
new MultiMatchQuery(
query,
columns,
boosts): MultiMatchQuery
options?): MultiMatchQuery
```
Creates an instance of MultiMatchQuery.
@@ -35,10 +35,11 @@ Creates an instance of MultiMatchQuery.
* **columns**: `string`[]
An array of column names to search within.
* **boosts**: `number`[] = `...`
(Optional) An array of boost factors corresponding to each column. Default is an array of 1.0 for each column.
The `boosts` array should have the same length as `columns`. If not provided, all columns will have a default boost of 1.0.
If the length of `boosts` is less than `columns`, it will be padded with 1.0s.
* **options?**
Optional parameters for the multi-match query.
- `boosts`: An array of boost factors for each column (default is 1.0 for all).
* **options.boosts?**: `number`[]
#### Returns
@@ -52,6 +53,8 @@ Creates an instance of MultiMatchQuery.
queryType(): FullTextQueryType
```
The type of the full-text query.
#### Returns
[`FullTextQueryType`](../enumerations/FullTextQueryType.md)
@@ -59,19 +62,3 @@ queryType(): FullTextQueryType
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`queryType`](../interfaces/FullTextQuery.md#querytype)
***
### toDict()
```ts
toDict(): Record<string, unknown>
```
#### Returns
`Record`&lt;`string`, `unknown`&gt;
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`toDict`](../interfaces/FullTextQuery.md#todict)

View File

@@ -44,6 +44,8 @@ Creates an instance of `PhraseQuery`.
queryType(): FullTextQueryType
```
The type of the full-text query.
#### Returns
[`FullTextQueryType`](../enumerations/FullTextQueryType.md)
@@ -51,19 +53,3 @@ queryType(): FullTextQueryType
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`queryType`](../interfaces/FullTextQuery.md#querytype)
***
### toDict()
```ts
toDict(): Record<string, unknown>
```
#### Returns
`Record`&lt;`string`, `unknown`&gt;
#### Implementation of
[`FullTextQuery`](../interfaces/FullTextQuery.md).[`toDict`](../interfaces/FullTextQuery.md#todict)

View File

@@ -454,6 +454,28 @@ Modeled after ``VACUUM`` in PostgreSQL.
***
### prewarmIndex()
```ts
abstract prewarmIndex(name): Promise<void>
```
Prewarm an index in the table.
#### Parameters
* **name**: `string`
The name of the index.
This will load the index into memory. This may reduce the cold-start time for
future queries. If the index does not fit in the cache then this call may be
wasteful.
#### Returns
`Promise`&lt;`void`&gt;
***
### query()
```ts
@@ -575,7 +597,7 @@ of the given query
#### Parameters
* **query**: `string` \| [`IntoVector`](../type-aliases/IntoVector.md)
* **query**: `string` \| [`IntoVector`](../type-aliases/IntoVector.md) \| [`FullTextQuery`](../interfaces/FullTextQuery.md)
the query, a vector or string
* **queryType?**: `string`
@@ -731,3 +753,26 @@ Retrieve the version of the table
#### Returns
`Promise`&lt;`number`&gt;
***
### waitForIndex()
```ts
abstract waitForIndex(indexNames, timeoutSeconds): Promise<void>
```
Waits for asynchronous indexing to complete on the table.
#### Parameters
* **indexNames**: `string`[]
The name of the indices to wait for
* **timeoutSeconds**: `number`
The number of seconds to wait before timing out
This will raise an error if the indices are not created and fully indexed within the timeout.
#### Returns
`Promise`&lt;`void`&gt;

View File

@@ -44,7 +44,7 @@ for testing purposes.
### readConsistencyInterval?
```ts
optional readConsistencyInterval: null | number;
optional readConsistencyInterval: number;
```
(For LanceDB OSS only): The interval, in seconds, at which to check for

View File

@@ -18,18 +18,8 @@ including methods to retrieve the query type and convert the query to a dictiona
queryType(): FullTextQueryType
```
The type of the full-text query.
#### Returns
[`FullTextQueryType`](../enumerations/FullTextQueryType.md)
***
### toDict()
```ts
toDict(): Record<string, unknown>
```
#### Returns
`Record`&lt;`string`, `unknown`&gt;

View File

@@ -39,3 +39,11 @@ and the same name, then an error will be returned. This is true even if
that index is out of date.
The default is true
***
### waitTimeoutSeconds?
```ts
optional waitTimeoutSeconds: number;
```

View File

@@ -11,7 +11,6 @@ likely that someone who knows the answer will see your question.
## Common issues
* Multiprocessing with `fork` is not supported. You should use `spawn` instead.
* Data returned by queries may not reflect the most recent writes, depending on configuration. LanceDB uses eventual consistency by default. See [consistency](/docs/src/guides/tables.md#consistency) for more information.
## Enabling logging

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.5</version>
<version>0.19.0-beta.11</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.19.0-beta.5</version>
<version>0.19.0-beta.11</version>
<packaging>pom</packaging>
<name>LanceDB Parent</name>

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"cpu": [
"x64",
"arm64"
@@ -52,11 +52,11 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.5",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.5",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.5",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.5",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.5"
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.11",
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.11"
},
"peerDependencies": {
"@apache-arrow/ts": "^14.0.2",
@@ -327,9 +327,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.19.0-beta.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.5.tgz",
"integrity": "sha512-NuJVGaV4b6XgH3dlkCEquvtGM1cY5sIJE5M/LgJ3HYYvAbco/seBQM5AHTV/7CULoPEY9eQeJZOj9fWP5oQLYQ==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.11.tgz",
"integrity": "sha512-fLefGJYdlIRIjrJj8MU1r5Zix5LpKktpCYilA7tZrfvBWkubGceJ+U6RPsWz7VGBfWcETo3g5CBooUPhbtSMlQ==",
"cpu": [
"arm64"
],
@@ -340,9 +340,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.19.0-beta.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.5.tgz",
"integrity": "sha512-hbadwvQcUgKJfluUHhN+mx+XeFRwTuh9mD0L3Tf3t3BkDTxyHpEG5WNgOpWrh6e1RU6zW54CoCyQuSEaVqGgGw==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.11.tgz",
"integrity": "sha512-FkCa1TbPLDXAGhlRI4tafcltzApCsyvgi+I+kX07u5DKPNQVALpQ3R6X6GLlbiFsAFBdyv9t2fqQ9DlgjJIZpA==",
"cpu": [
"x64"
],
@@ -353,9 +353,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.19.0-beta.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.5.tgz",
"integrity": "sha512-fu/EOYLr3mx76/SP4dEgbq0vSYHfuTf68lVl5/tL6eIb1Purz42l22+jNKLJ/S3Plase2SkXdxyY90K2Y/CvSg==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.11.tgz",
"integrity": "sha512-iZkL/01HNUNQ8pGK0+hoNyrM7P1YtShsyIQVzJMfo41SAofCBf9qvi9YaYyd49sDb+dQXeRn1+cfaJ9siz1OHw==",
"cpu": [
"arm64"
],
@@ -366,9 +366,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.19.0-beta.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.5.tgz",
"integrity": "sha512-pzb8fl5M8155sc/mEFnKmuh9rCfQohHBlb+j+5qNMe84AyygQ8Me1H3b1h9fOkUPu2Y168zYfuGkjNv4Bjm9eA==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.11.tgz",
"integrity": "sha512-MdKRHxe2tRQqmExNLv3f6Wvx1mEi98eFtD0ysm4tNrQdaS1MJbTp+DUehrRKkfDWsooalHkIi9d02BVw5qseUQ==",
"cpu": [
"x64"
],
@@ -379,9 +379,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.19.0-beta.5",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.5.tgz",
"integrity": "sha512-5z6BSfTuZYJdDL2wwRrEQlnfluahzaUH2U7vj3i4ik4zaAwvaYcrjmdYCTLRYhFscUqzxd2pVFHbfRYe+maYzA==",
"version": "0.19.0-beta.11",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.11.tgz",
"integrity": "sha512-KWy+t9jr0feJAW9NkmM/w9kfdpp78+7mkeh9lb0g3xI3OdYU1yizNqFjbIQqJf7/L4sou4wmOjAC+FcP8qCtzg==",
"cpu": [
"x64"
],

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"description": " Serverless, low-latency vector database for AI applications",
"private": false,
"main": "dist/index.js",
@@ -89,10 +89,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.5",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.5",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.5",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.5",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.5"
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.11",
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.11",
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.11",
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.11"
}
}

View File

@@ -110,7 +110,7 @@ describe('LanceDB Mirrored Store Integration test', function () {
fs.readdir(path.join(mirroredPath, 'data'), { withFileTypes: true }, (err, files) => {
if (err != null) throw err
assert.equal(files.length, 1, `Found files: ${files.map(f => f.name)}`)
assert.equal(files.length, 1)
assert.isTrue(files[0].name.endsWith('.lance'))
})

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.19.0-beta.5"
version = "0.19.0-beta.11"
license.workspace = true
description.workspace = true
repository.workspace = true
@@ -28,6 +28,9 @@ napi-derive = "2.16.4"
lzma-sys = { version = "*", features = ["static"] }
log.workspace = true
# Workaround for build failure until we can fix it.
aws-lc-sys = "=0.28.0"
[build-dependencies]
napi-build = "2.1"

View File

@@ -17,7 +17,7 @@ describe("when connecting", () => {
it("should connect", async () => {
const db = await connect(tmpDir.name);
expect(db.display()).toBe(
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`,
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`,
);
});

View File

@@ -10,7 +10,7 @@ import * as arrow16 from "apache-arrow-16";
import * as arrow17 from "apache-arrow-17";
import * as arrow18 from "apache-arrow-18";
import { Table, connect } from "../lancedb";
import { MatchQuery, PhraseQuery, Table, connect } from "../lancedb";
import {
Table as ArrowTable,
Field,
@@ -33,6 +33,7 @@ import {
register,
} from "../lancedb/embedding";
import { Index } from "../lancedb/indices";
import { instanceOfFullTextQuery } from "../lancedb/query";
describe.each([arrow15, arrow16, arrow17, arrow18])(
"Given a table",
@@ -58,7 +59,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
it("be displayable", async () => {
expect(table.display()).toMatch(
/NativeTable\(some_table, uri=.*, read_consistency_interval=5s\)/,
/NativeTable\(some_table, uri=.*, read_consistency_interval=None\)/,
);
table.close();
expect(table.display()).toBe("ClosedTable(some_table)");
@@ -506,6 +507,15 @@ describe("When creating an index", () => {
expect(indices2.length).toBe(0);
});
it("should wait for index readiness", async () => {
// Create an index and then wait for it to be ready
await tbl.createIndex("vec");
const indices = await tbl.listIndices();
expect(indices.length).toBeGreaterThan(0);
const idxName = indices[0].name;
await expect(tbl.waitForIndex([idxName], 5)).resolves.toBeUndefined();
});
it("should search with distance range", async () => {
await tbl.createIndex("vec");
@@ -823,6 +833,7 @@ describe("When creating an index", () => {
// Only build index over v1
await tbl.createIndex("vec", {
config: Index.ivfPq({ numPartitions: 2, numSubVectors: 2 }),
waitTimeoutSeconds: 30,
});
const rst = await tbl
@@ -1302,6 +1313,56 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
const results = await table.search("hello").toArray();
expect(results[0].text).toBe(data[0].text);
const query = new MatchQuery("goodbye", "text");
expect(instanceOfFullTextQuery(query)).toBe(true);
const results2 = await table
.search(new MatchQuery("goodbye", "text"))
.toArray();
expect(results2[0].text).toBe(data[1].text);
});
test("prewarm full text search index", async () => {
const db = await connect(tmpDir.name);
const data = [
{ text: ["lance database", "the", "search"], vector: [0.1, 0.2, 0.3] },
{ text: ["lance database"], vector: [0.4, 0.5, 0.6] },
{ text: ["lance", "search"], vector: [0.7, 0.8, 0.9] },
{ text: ["database", "search"], vector: [1.0, 1.1, 1.2] },
{ text: ["unrelated", "doc"], vector: [1.3, 1.4, 1.5] },
];
const table = await db.createTable("test", data);
await table.createIndex("text", {
config: Index.fts(),
});
// For the moment, we just confirm we can call prewarmIndex without error
// and still search it afterwards
await table.prewarmIndex("text_idx");
const results = await table.search("lance").toArray();
expect(results.length).toBe(3);
});
test("full text index on list", async () => {
const db = await connect(tmpDir.name);
const data = [
{ text: ["lance database", "the", "search"], vector: [0.1, 0.2, 0.3] },
{ text: ["lance database"], vector: [0.4, 0.5, 0.6] },
{ text: ["lance", "search"], vector: [0.7, 0.8, 0.9] },
{ text: ["database", "search"], vector: [1.0, 1.1, 1.2] },
{ text: ["unrelated", "doc"], vector: [1.3, 1.4, 1.5] },
];
const table = await db.createTable("test", data);
await table.createIndex("text", {
config: Index.fts(),
});
const results = await table.search("lance").toArray();
expect(results.length).toBe(3);
const results2 = await table.search('"lance database"').toArray();
expect(results2.length).toBe(2);
});
test("full text search without positions", async () => {
@@ -1354,6 +1415,43 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
expect(results.length).toBe(2);
const phraseResults = await table.search('"hello world"').toArray();
expect(phraseResults.length).toBe(1);
const phraseResults2 = await table
.search(new PhraseQuery("hello world", "text"))
.toArray();
expect(phraseResults2.length).toBe(1);
});
test("full text search fuzzy query", async () => {
const db = await connect(tmpDir.name);
const data = [
{ text: "fa", vector: [0.1, 0.2, 0.3] },
{ text: "fo", vector: [0.4, 0.5, 0.6] },
{ text: "fob", vector: [0.4, 0.5, 0.6] },
{ text: "focus", vector: [0.4, 0.5, 0.6] },
{ text: "foo", vector: [0.4, 0.5, 0.6] },
{ text: "food", vector: [0.4, 0.5, 0.6] },
{ text: "foul", vector: [0.4, 0.5, 0.6] },
];
const table = await db.createTable("test", data);
await table.createIndex("text", {
config: Index.fts(),
});
const results = await table
.search(new MatchQuery("foo", "text"))
.toArray();
expect(results.length).toBe(1);
expect(results[0].text).toBe("foo");
const fuzzyResults = await table
.search(new MatchQuery("foo", "text", { fuzziness: 1 }))
.toArray();
expect(fuzzyResults.length).toBe(4);
const resultSet = new Set(fuzzyResults.map((r) => r.text));
expect(resultSet.has("foo")).toBe(true);
expect(resultSet.has("fob")).toBe(true);
expect(resultSet.has("fo")).toBe(true);
expect(resultSet.has("food")).toBe(true);
});
test.each([

View File

@@ -202,35 +202,5 @@ test("basic table examples", async () => {
// --8<-- [end:create_f16_table]
await db.dropTable("f16_tbl");
}
const uri = databaseDir;
await db.createTable("my_table", [{ id: 1 }, { id: 2 }]);
{
// --8<-- [start:table_strong_consistency]
const db = await lancedb.connect({ uri, readConsistencyInterval: 0 });
const tbl = await db.openTable("my_table");
// --8<-- [end:table_strong_consistency]
}
{
// --8<-- [start:table_eventual_consistency]
const db = await lancedb.connect({ uri, readConsistencyInterval: 5 });
const tbl = await db.openTable("my_table");
// --8<-- [end:table_eventual_consistency]
}
{
// --8<-- [start:table_no_consistency]
const db = await lancedb.connect({ uri, readConsistencyInterval: null });
const tbl = await db.openTable("my_table");
// --8<-- [end:table_no_consistency]
}
{
// --8<-- [start:table_checkout_latest]
const tbl = await db.openTable("my_table");
// (Other writes happen to test_table_async from another process)
// Check for updates
tbl.checkoutLatest();
// --8<-- [end:table_checkout_latest]
}
});
});

View File

@@ -681,4 +681,6 @@ export interface IndexOptions {
* The default is true
*/
replace?: boolean;
waitTimeoutSeconds?: number;
}

View File

@@ -11,6 +11,7 @@ import {
} from "./arrow";
import { type IvfPqOptions } from "./indices";
import {
JsFullTextQuery,
RecordBatchIterator as NativeBatchIterator,
Query as NativeQuery,
Table as NativeTable,
@@ -177,9 +178,7 @@ export class QueryBase<NativeQueryType extends NativeQuery | NativeVectorQuery>
columns: columns,
});
} else {
// If query is a FullTextQuery object, convert it to a dict
const queryObj = query.toDict();
inner.fullTextSearch(queryObj);
inner.fullTextSearch({ query: query.inner });
}
});
return this;
@@ -743,8 +742,7 @@ export class Query extends QueryBase<NativeQuery> {
columns: columns,
});
} else {
const queryObj = query.toDict();
inner.fullTextSearch(queryObj);
inner.fullTextSearch({ query: query.inner });
}
});
return this;
@@ -772,130 +770,141 @@ export enum FullTextQueryType {
* including methods to retrieve the query type and convert the query to a dictionary format.
*/
export interface FullTextQuery {
/**
* Returns the inner query object.
* This is the underlying query object used by the database engine.
* @ignore
*/
inner: JsFullTextQuery;
/**
* The type of the full-text query.
*/
queryType(): FullTextQueryType;
toDict(): Record<string, unknown>;
}
// biome-ignore lint/suspicious/noExplicitAny: we want any here
export function instanceOfFullTextQuery(obj: any): obj is FullTextQuery {
return obj != null && obj.inner instanceof JsFullTextQuery;
}
export class MatchQuery implements FullTextQuery {
/** @ignore */
public readonly inner: JsFullTextQuery;
/**
* Creates an instance of MatchQuery.
*
* @param query - The text query to search for.
* @param column - The name of the column to search within.
* @param boost - (Optional) The boost factor to influence the relevance score of this query. Default is `1.0`.
* @param fuzziness - (Optional) The allowed edit distance for fuzzy matching. Default is `0`.
* @param maxExpansions - (Optional) The maximum number of terms to consider for fuzzy matching. Default is `50`.
* @param options - Optional parameters for the match query.
* - `boost`: The boost factor for the query (default is 1.0).
* - `fuzziness`: The fuzziness level for the query (default is 0).
* - `maxExpansions`: The maximum number of terms to consider for fuzzy matching (default is 50).
*/
constructor(
private query: string,
private column: string,
private boost: number = 1.0,
private fuzziness: number = 0,
private maxExpansions: number = 50,
) {}
query: string,
column: string,
options?: {
boost?: number;
fuzziness?: number;
maxExpansions?: number;
},
) {
let fuzziness = options?.fuzziness;
if (fuzziness === undefined) {
fuzziness = 0;
}
this.inner = JsFullTextQuery.matchQuery(
query,
column,
options?.boost ?? 1.0,
fuzziness,
options?.maxExpansions ?? 50,
);
}
queryType(): FullTextQueryType {
return FullTextQueryType.Match;
}
toDict(): Record<string, unknown> {
return {
[this.queryType()]: {
[this.column]: {
query: this.query,
boost: this.boost,
fuzziness: this.fuzziness,
// biome-ignore lint/style/useNamingConvention: use underscore for consistency with the other APIs
max_expansions: this.maxExpansions,
},
},
};
}
}
export class PhraseQuery implements FullTextQuery {
/** @ignore */
public readonly inner: JsFullTextQuery;
/**
* Creates an instance of `PhraseQuery`.
*
* @param query - The phrase to search for in the specified column.
* @param column - The name of the column to search within.
*/
constructor(
private query: string,
private column: string,
) {}
constructor(query: string, column: string) {
this.inner = JsFullTextQuery.phraseQuery(query, column);
}
queryType(): FullTextQueryType {
return FullTextQueryType.MatchPhrase;
}
toDict(): Record<string, unknown> {
return {
[this.queryType()]: {
[this.column]: this.query,
},
};
}
}
export class BoostQuery implements FullTextQuery {
/** @ignore */
public readonly inner: JsFullTextQuery;
/**
* Creates an instance of BoostQuery.
* The boost returns documents that match the positive query,
* but penalizes those that match the negative query.
* the penalty is controlled by the `negativeBoost` parameter.
*
* @param positive - The positive query that boosts the relevance score.
* @param negative - The negative query that reduces the relevance score.
* @param negativeBoost - The factor by which the negative query reduces the score.
* @param options - Optional parameters for the boost query.
* - `negativeBoost`: The boost factor for the negative query (default is 0.0).
*/
constructor(
private positive: FullTextQuery,
private negative: FullTextQuery,
private negativeBoost: number,
) {}
positive: FullTextQuery,
negative: FullTextQuery,
options?: {
negativeBoost?: number;
},
) {
this.inner = JsFullTextQuery.boostQuery(
positive.inner,
negative.inner,
options?.negativeBoost,
);
}
queryType(): FullTextQueryType {
return FullTextQueryType.Boost;
}
toDict(): Record<string, unknown> {
return {
[this.queryType()]: {
positive: this.positive.toDict(),
negative: this.negative.toDict(),
// biome-ignore lint/style/useNamingConvention: use underscore for consistency with the other APIs
negative_boost: this.negativeBoost,
},
};
}
}
export class MultiMatchQuery implements FullTextQuery {
/** @ignore */
public readonly inner: JsFullTextQuery;
/**
* Creates an instance of MultiMatchQuery.
*
* @param query - The text query to search for across multiple columns.
* @param columns - An array of column names to search within.
* @param boosts - (Optional) An array of boost factors corresponding to each column. Default is an array of 1.0 for each column.
*
* The `boosts` array should have the same length as `columns`. If not provided, all columns will have a default boost of 1.0.
* If the length of `boosts` is less than `columns`, it will be padded with 1.0s.
* @param options - Optional parameters for the multi-match query.
* - `boosts`: An array of boost factors for each column (default is 1.0 for all).
*/
constructor(
private query: string,
private columns: string[],
private boosts: number[] = columns.map(() => 1.0),
) {}
query: string,
columns: string[],
options?: {
boosts?: number[];
},
) {
this.inner = JsFullTextQuery.multiMatchQuery(
query,
columns,
options?.boosts,
);
}
queryType(): FullTextQueryType {
return FullTextQueryType.MultiMatch;
}
toDict(): Record<string, unknown> {
return {
[this.queryType()]: {
query: this.query,
columns: this.columns,
boost: this.boosts,
},
};
}
}

View File

@@ -22,7 +22,12 @@ import {
OptimizeStats,
Table as _NativeTable,
} from "./native";
import { Query, VectorQuery } from "./query";
import {
FullTextQuery,
Query,
VectorQuery,
instanceOfFullTextQuery,
} from "./query";
import { sanitizeType } from "./sanitize";
import { IntoSql, toSQL } from "./util";
export { IndexConfig } from "./native";
@@ -230,6 +235,30 @@ export abstract class Table {
*/
abstract dropIndex(name: string): Promise<void>;
/**
* Prewarm an index in the table.
*
* @param name The name of the index.
*
* This will load the index into memory. This may reduce the cold-start time for
* future queries. If the index does not fit in the cache then this call may be
* wasteful.
*/
abstract prewarmIndex(name: string): Promise<void>;
/**
* Waits for asynchronous indexing to complete on the table.
*
* @param indexNames The name of the indices to wait for
* @param timeoutSeconds The number of seconds to wait before timing out
*
* This will raise an error if the indices are not created and fully indexed within the timeout.
*/
abstract waitForIndex(
indexNames: string[],
timeoutSeconds: number,
): Promise<void>;
/**
* Create a {@link Query} Builder.
*
@@ -294,7 +323,7 @@ export abstract class Table {
* if the query is a string and no embedding function is defined, it will be treated as a full text search query
*/
abstract search(
query: string | IntoVector,
query: string | IntoVector | FullTextQuery,
queryType?: string,
ftsColumns?: string | string[],
): VectorQuery | Query;
@@ -553,23 +582,39 @@ export class LocalTable extends Table {
// Bit of a hack to get around the fact that TS has no package-scope.
// biome-ignore lint/suspicious/noExplicitAny: skip
const nativeIndex = (options?.config as any)?.inner;
await this.inner.createIndex(nativeIndex, column, options?.replace);
await this.inner.createIndex(
nativeIndex,
column,
options?.replace,
options?.waitTimeoutSeconds,
);
}
async dropIndex(name: string): Promise<void> {
await this.inner.dropIndex(name);
}
async prewarmIndex(name: string): Promise<void> {
await this.inner.prewarmIndex(name);
}
async waitForIndex(
indexNames: string[],
timeoutSeconds: number,
): Promise<void> {
await this.inner.waitForIndex(indexNames, timeoutSeconds);
}
query(): Query {
return new Query(this.inner);
}
search(
query: string | IntoVector,
query: string | IntoVector | FullTextQuery,
queryType: string = "auto",
ftsColumns?: string | string[],
): VectorQuery | Query {
if (typeof query !== "string") {
if (typeof query !== "string" && !instanceOfFullTextQuery(query)) {
if (queryType === "fts") {
throw new Error("Cannot perform full text search on a vector query");
}
@@ -585,7 +630,10 @@ export class LocalTable extends Table {
// The query type is auto or vector
// fall back to full text search if no embedding functions are defined and the query is a string
if (queryType === "auto" && getRegistry().length() === 0) {
if (
queryType === "auto" &&
(getRegistry().length() === 0 || instanceOfFullTextQuery(query))
) {
return this.query().fullTextSearch(query, {
columns: ftsColumns,
});

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.19.0-beta.5",
"version": "0.19.0-beta.11",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -48,16 +48,8 @@ impl Connection {
pub async fn new(uri: String, options: ConnectionOptions) -> napi::Result<Self> {
let mut builder = ConnectBuilder::new(&uri);
if let Some(interval) = options.read_consistency_interval {
match interval {
Either::A(seconds) => {
builder = builder.read_consistency_interval(Some(
std::time::Duration::from_secs_f64(seconds),
));
}
Either::B(_) => {
builder = builder.read_consistency_interval(None);
}
}
builder =
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
}
if let Some(storage_options) = options.storage_options {
for (key, value) in storage_options {

View File

@@ -4,7 +4,6 @@
use std::collections::HashMap;
use env_logger::Env;
use napi::{bindgen_prelude::Null, Either};
use napi_derive::*;
mod connection;
@@ -19,6 +18,7 @@ mod table;
mod util;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
@@ -29,7 +29,7 @@ pub struct ConnectionOptions {
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
pub read_consistency_interval: Option<Either<f64, Null>>,
pub read_consistency_interval: Option<f64>,
/// (For LanceDB OSS only): configuration for object storage.
///
/// The available options are described at https://lancedb.github.io/lancedb/guides/storage/

View File

@@ -3,7 +3,9 @@
use std::sync::Arc;
use lancedb::index::scalar::{FtsQuery, FullTextSearchQuery, MatchQuery, PhraseQuery};
use lancedb::index::scalar::{
BoostQuery, FtsQuery, FullTextSearchQuery, MatchQuery, MultiMatchQuery, PhraseQuery,
};
use lancedb::query::ExecutableQuery;
use lancedb::query::Query as LanceDbQuery;
use lancedb::query::QueryBase;
@@ -18,7 +20,7 @@ use crate::error::NapiErrorExt;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::Reranker;
use crate::rerankers::RerankerCallbacks;
use crate::util::{parse_distance_type, parse_fts_query};
use crate::util::parse_distance_type;
#[napi]
pub struct Query {
@@ -38,51 +40,8 @@ impl Query {
}
#[napi]
pub fn full_text_search(&mut self, query: napi::JsUnknown) -> napi::Result<()> {
let query = unsafe { query.cast::<napi::JsObject>() };
let query = if let Some(query_text) = query.get::<_, String>("query").transpose() {
let mut query_text = query_text?;
let columns = query.get::<_, Option<Vec<String>>>("columns")?.flatten();
let is_phrase =
query_text.len() >= 2 && query_text.starts_with('"') && query_text.ends_with('"');
let is_multi_match = columns.as_ref().map(|cols| cols.len() > 1).unwrap_or(false);
if is_phrase {
// Remove the surrounding quotes for phrase queries
query_text = query_text[1..query_text.len() - 1].to_string();
}
let query: FtsQuery = match (is_phrase, is_multi_match) {
(false, _) => MatchQuery::new(query_text).into(),
(true, false) => PhraseQuery::new(query_text).into(),
(true, true) => {
return Err(napi::Error::from_reason(
"Phrase queries cannot be used with multiple columns.",
));
}
};
let mut query = FullTextSearchQuery::new_query(query);
if let Some(cols) = columns {
if !cols.is_empty() {
query = query.with_columns(&cols).map_err(|e| {
napi::Error::from_reason(format!(
"Failed to set full text search columns: {}",
e
))
})?;
}
}
query
} else if let Some(query) = query.get::<_, napi::JsObject>("query")? {
let query = parse_fts_query(&query)?;
FullTextSearchQuery::new_query(query)
} else {
return Err(napi::Error::from_reason(
"Invalid full text search query object".to_string(),
));
};
pub fn full_text_search(&mut self, query: napi::JsObject) -> napi::Result<()> {
let query = parse_fts_query(query)?;
self.inner = self.inner.clone().full_text_search(query);
Ok(())
}
@@ -243,51 +202,8 @@ impl VectorQuery {
}
#[napi]
pub fn full_text_search(&mut self, query: napi::JsUnknown) -> napi::Result<()> {
let query = unsafe { query.cast::<napi::JsObject>() };
let query = if let Some(query_text) = query.get::<_, String>("query").transpose() {
let mut query_text = query_text?;
let columns = query.get::<_, Option<Vec<String>>>("columns")?.flatten();
let is_phrase =
query_text.len() >= 2 && query_text.starts_with('"') && query_text.ends_with('"');
let is_multi_match = columns.as_ref().map(|cols| cols.len() > 1).unwrap_or(false);
if is_phrase {
// Remove the surrounding quotes for phrase queries
query_text = query_text[1..query_text.len() - 1].to_string();
}
let query: FtsQuery = match (is_phrase, is_multi_match) {
(false, _) => MatchQuery::new(query_text).into(),
(true, false) => PhraseQuery::new(query_text).into(),
(true, true) => {
return Err(napi::Error::from_reason(
"Phrase queries cannot be used with multiple columns.",
));
}
};
let mut query = FullTextSearchQuery::new_query(query);
if let Some(cols) = columns {
if !cols.is_empty() {
query = query.with_columns(&cols).map_err(|e| {
napi::Error::from_reason(format!(
"Failed to set full text search columns: {}",
e
))
})?;
}
}
query
} else if let Some(query) = query.get::<_, napi::JsObject>("query")? {
let query = parse_fts_query(&query)?;
FullTextSearchQuery::new_query(query)
} else {
return Err(napi::Error::from_reason(
"Invalid full text search query object".to_string(),
));
};
pub fn full_text_search(&mut self, query: napi::JsObject) -> napi::Result<()> {
let query = parse_fts_query(query)?;
self.inner = self.inner.clone().full_text_search(query);
Ok(())
}
@@ -376,3 +292,116 @@ impl VectorQuery {
})
}
}
#[napi]
#[derive(Debug, Clone)]
pub struct JsFullTextQuery {
pub(crate) inner: FtsQuery,
}
#[napi]
impl JsFullTextQuery {
#[napi(factory)]
pub fn match_query(
query: String,
column: String,
boost: f64,
fuzziness: Option<u32>,
max_expansions: u32,
) -> napi::Result<Self> {
Ok(Self {
inner: MatchQuery::new(query)
.with_column(Some(column))
.with_boost(boost as f32)
.with_fuzziness(fuzziness)
.with_max_expansions(max_expansions as usize)
.into(),
})
}
#[napi(factory)]
pub fn phrase_query(query: String, column: String) -> napi::Result<Self> {
Ok(Self {
inner: PhraseQuery::new(query).with_column(Some(column)).into(),
})
}
#[napi(factory)]
#[allow(clippy::use_self)] // NAPI doesn't allow Self here but clippy reports it
pub fn boost_query(
positive: &JsFullTextQuery,
negative: &JsFullTextQuery,
negative_boost: Option<f64>,
) -> napi::Result<Self> {
Ok(Self {
inner: BoostQuery::new(
positive.inner.clone(),
negative.inner.clone(),
negative_boost.map(|v| v as f32),
)
.into(),
})
}
#[napi(factory)]
pub fn multi_match_query(
query: String,
columns: Vec<String>,
boosts: Option<Vec<f64>>,
) -> napi::Result<Self> {
let q = match boosts {
Some(boosts) => MultiMatchQuery::try_new(query, columns)
.and_then(|q| q.try_with_boosts(boosts.into_iter().map(|v| v as f32).collect())),
None => MultiMatchQuery::try_new(query, columns),
}
.map_err(|e| {
napi::Error::from_reason(format!("Failed to create multi match query: {}", e))
})?;
Ok(Self { inner: q.into() })
}
}
fn parse_fts_query(query: napi::JsObject) -> napi::Result<FullTextSearchQuery> {
if let Ok(Some(query)) = query.get::<_, &JsFullTextQuery>("query") {
Ok(FullTextSearchQuery::new_query(query.inner.clone()))
} else if let Ok(Some(query_text)) = query.get::<_, String>("query") {
let mut query_text = query_text;
let columns = query.get::<_, Option<Vec<String>>>("columns")?.flatten();
let is_phrase =
query_text.len() >= 2 && query_text.starts_with('"') && query_text.ends_with('"');
let is_multi_match = columns.as_ref().map(|cols| cols.len() > 1).unwrap_or(false);
if is_phrase {
// Remove the surrounding quotes for phrase queries
query_text = query_text[1..query_text.len() - 1].to_string();
}
let query: FtsQuery = match (is_phrase, is_multi_match) {
(false, _) => MatchQuery::new(query_text).into(),
(true, false) => PhraseQuery::new(query_text).into(),
(true, true) => {
return Err(napi::Error::from_reason(
"Phrase queries cannot be used with multiple columns.",
));
}
};
let mut query = FullTextSearchQuery::new_query(query);
if let Some(cols) = columns {
if !cols.is_empty() {
query = query.with_columns(&cols).map_err(|e| {
napi::Error::from_reason(format!(
"Failed to set full text search columns: {}",
e
))
})?;
}
}
Ok(query)
} else {
Err(napi::Error::from_reason(
"Invalid full text search query object".to_string(),
))
}
}

View File

@@ -111,6 +111,7 @@ impl Table {
index: Option<&Index>,
column: String,
replace: Option<bool>,
wait_timeout_s: Option<i64>,
) -> napi::Result<()> {
let lancedb_index = if let Some(index) = index {
index.consume()?
@@ -121,6 +122,10 @@ impl Table {
if let Some(replace) = replace {
builder = builder.replace(replace);
}
if let Some(timeout) = wait_timeout_s {
builder =
builder.wait_timeout(std::time::Duration::from_secs(timeout.try_into().unwrap()));
}
builder.execute().await.default_error()
}
@@ -132,6 +137,26 @@ impl Table {
.default_error()
}
#[napi(catch_unwind)]
pub async fn prewarm_index(&self, index_name: String) -> napi::Result<()> {
self.inner_ref()?
.prewarm_index(&index_name)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn wait_for_index(&self, index_names: Vec<String>, timeout_s: i64) -> Result<()> {
let timeout = std::time::Duration::from_secs(timeout_s.try_into().unwrap());
let index_names: Vec<&str> = index_names.iter().map(|s| s.as_str()).collect();
let slice: &[&str] = &index_names;
self.inner_ref()?
.wait_for_index(slice, timeout)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn update(
&self,

View File

@@ -1,7 +1,6 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use lancedb::index::scalar::{BoostQuery, FtsQuery, MatchQuery, MultiMatchQuery, PhraseQuery};
use lancedb::DistanceType;
pub fn parse_distance_type(distance_type: impl AsRef<str>) -> napi::Result<DistanceType> {
@@ -16,144 +15,3 @@ pub fn parse_distance_type(distance_type: impl AsRef<str>) -> napi::Result<Dista
))),
}
}
pub fn parse_fts_query(query: &napi::JsObject) -> napi::Result<FtsQuery> {
let query_type = query
.get_property_names()?
.get_element::<napi::JsString>(0)?;
let query_type = query_type.into_utf8()?.into_owned()?;
let query_value =
query
.get::<_, napi::JsObject>(&query_type)?
.ok_or(napi::Error::from_reason(format!(
"query value {} not found",
query_type
)))?;
match query_type.as_str() {
"match" => {
let column = query_value
.get_property_names()?
.get_element::<napi::JsString>(0)?
.into_utf8()?
.into_owned()?;
let params =
query_value
.get::<_, napi::JsObject>(&column)?
.ok_or(napi::Error::from_reason(format!(
"column {} not found",
column
)))?;
let query = params
.get::<_, napi::JsString>("query")?
.ok_or(napi::Error::from_reason("query not found"))?
.into_utf8()?
.into_owned()?;
let boost = params
.get::<_, napi::JsNumber>("boost")?
.ok_or(napi::Error::from_reason("boost not found"))?
.get_double()? as f32;
let fuzziness = params
.get::<_, napi::JsNumber>("fuzziness")?
.map(|f| f.get_uint32())
.transpose()?;
let max_expansions = params
.get::<_, napi::JsNumber>("max_expansions")?
.ok_or(napi::Error::from_reason("max_expansions not found"))?
.get_uint32()? as usize;
let query = MatchQuery::new(query)
.with_column(Some(column))
.with_boost(boost)
.with_fuzziness(fuzziness)
.with_max_expansions(max_expansions);
Ok(query.into())
}
"match_phrase" => {
let column = query_value
.get_property_names()?
.get_element::<napi::JsString>(0)?
.into_utf8()?
.into_owned()?;
let query = query_value
.get::<_, napi::JsString>(&column)?
.ok_or(napi::Error::from_reason(format!(
"column {} not found",
column
)))?
.into_utf8()?
.into_owned()?;
let query = PhraseQuery::new(query).with_column(Some(column));
Ok(query.into())
}
"boost" => {
let positive = query_value
.get::<_, napi::JsObject>("positive")?
.ok_or(napi::Error::from_reason("positive not found"))?;
let negative = query_value
.get::<_, napi::JsObject>("negative")?
.ok_or(napi::Error::from_reason("negative not found"))?;
let negative_boost = query_value
.get::<_, napi::JsNumber>("negative_boost")?
.ok_or(napi::Error::from_reason("negative_boost not found"))?
.get_double()? as f32;
let positive = parse_fts_query(&positive)?;
let negative = parse_fts_query(&negative)?;
let query = BoostQuery::new(positive, negative, Some(negative_boost));
Ok(query.into())
}
"multi_match" => {
let query = query_value
.get::<_, napi::JsString>("query")?
.ok_or(napi::Error::from_reason("query not found"))?
.into_utf8()?
.into_owned()?;
let columns_array = query_value
.get::<_, napi::JsTypedArray>("columns")?
.ok_or(napi::Error::from_reason("columns not found"))?;
let columns_num = columns_array.get_array_length()?;
let mut columns = Vec::with_capacity(columns_num as usize);
for i in 0..columns_num {
let column = columns_array
.get_element::<napi::JsString>(i)?
.into_utf8()?
.into_owned()?;
columns.push(column);
}
let boost_array = query_value
.get::<_, napi::JsTypedArray>("boost")?
.ok_or(napi::Error::from_reason("boost not found"))?;
if boost_array.get_array_length()? != columns_num {
return Err(napi::Error::from_reason(format!(
"boost array length ({}) does not match columns length ({})",
boost_array.get_array_length()?,
columns_num
)));
}
let mut boost = Vec::with_capacity(columns_num as usize);
for i in 0..columns_num {
let b = boost_array.get_element::<napi::JsNumber>(i)?.get_double()? as f32;
boost.push(b);
}
let query =
MultiMatchQuery::try_new_with_boosts(query, columns, boost).map_err(|e| {
napi::Error::from_reason(format!("Error creating MultiMatchQuery: {}", e))
})?;
Ok(query.into())
}
_ => Err(napi::Error::from_reason(format!(
"Unsupported query type: {}",
query_type
))),
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.22.0-beta.5"
current_version = "0.22.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.22.0-beta.5"
version = "0.22.0"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -43,6 +43,9 @@ classifiers = [
repository = "https://github.com/lancedb/lancedb"
[project.optional-dependencies]
pylance = [
"pylance>=0.25",
]
tests = [
"aiohttp",
"boto3",
@@ -55,7 +58,7 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=0.23.2",
"pylance>=0.25",
"requests",
]
dev = [
@@ -74,6 +77,7 @@ embeddings = [
"pillow",
"open-clip-torch",
"cohere",
"colpali-engine>=0.3.10",
"huggingface_hub",
"InstructorEmbedding",
"google.generativeai",

View File

@@ -26,7 +26,7 @@ def connect(
api_key: Optional[str] = None,
region: str = "us-east-1",
host_override: Optional[str] = None,
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
read_consistency_interval: Optional[timedelta] = None,
request_thread_pool: Optional[Union[int, ThreadPoolExecutor]] = None,
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
storage_options: Optional[Dict[str, str]] = None,
@@ -49,8 +49,9 @@ def connect(
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For strong consistency,
set this to zero seconds. Then every read will check for updates from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero timedelta
for eventual consistency. If more than that interval has passed since
the last check, then the table will be checked for updates. Note: this
@@ -121,7 +122,7 @@ async def connect_async(
api_key: Optional[str] = None,
region: str = "us-east-1",
host_override: Optional[str] = None,
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
read_consistency_interval: Optional[timedelta] = None,
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> AsyncConnection:
@@ -142,8 +143,9 @@ async def connect_async(
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For strong consistency,
set this to zero seconds. Then every read will check for updates from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero timedelta
for eventual consistency. If more than that interval has passed since
the last check, then the table will be checked for updates. Note: this

View File

@@ -9,7 +9,7 @@ import numpy as np
import pyarrow as pa
import pyarrow.dataset
from .dependencies import pandas as pd
from .dependencies import _check_for_pandas, pandas as pd
DATA = Union[List[dict], "pd.DataFrame", pa.Table, Iterable[pa.RecordBatch]]
VEC = Union[list, np.ndarray, pa.Array, pa.ChunkedArray]
@@ -63,7 +63,7 @@ def data_to_reader(
data: DATA, schema: Optional[pa.Schema] = None
) -> pa.RecordBatchReader:
"""Convert various types of input into a RecordBatchReader"""
if pd is not None and isinstance(data, pd.DataFrame):
if _check_for_pandas(data) and isinstance(data, pd.DataFrame):
return pa.Table.from_pandas(data, schema=schema).to_reader()
elif isinstance(data, pa.Table):
return data.to_reader()

View File

@@ -6,7 +6,6 @@ from __future__ import annotations
from abc import abstractmethod
from pathlib import Path
from datetime import timedelta
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
@@ -33,6 +32,7 @@ import deprecation
if TYPE_CHECKING:
import pyarrow as pa
from .pydantic import LanceModel
from datetime import timedelta
from ._lancedb import Connection as LanceDbConnection
from .common import DATA, URI
@@ -318,8 +318,9 @@ class LanceDBConnection(DBConnection):
The root uri of the database.
read_consistency_interval: timedelta, default None
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For strong consistency,
set this to zero seconds. Then every read will check for updates from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero timedelta
for eventual consistency. If more than that interval has passed since
the last check, then the table will be checked for updates. Note: this
@@ -351,7 +352,7 @@ class LanceDBConnection(DBConnection):
self,
uri: URI,
*,
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
):
if not isinstance(uri, Path):

View File

@@ -19,3 +19,4 @@ from .imagebind import ImageBindEmbeddings
from .jinaai import JinaEmbeddings
from .watsonx import WatsonxEmbeddings
from .voyageai import VoyageAIEmbeddingFunction
from .colpali import ColPaliEmbeddings

View File

@@ -0,0 +1,255 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from functools import lru_cache
from typing import List, Union, Optional, Any
import numpy as np
import io
from ..util import attempt_import_or_raise
from .base import EmbeddingFunction
from .registry import register
from .utils import TEXT, IMAGES, is_flash_attn_2_available
@register("colpali")
class ColPaliEmbeddings(EmbeddingFunction):
"""
An embedding function that uses the ColPali engine for
multimodal multi-vector embeddings.
This embedding function supports ColQwen2.5 models, producing multivector outputs
for both text and image inputs. The output embeddings are lists of vectors, each
vector being 128-dimensional by default, represented as List[List[float]].
Parameters
----------
model_name : str
The name of the model to use (e.g., "Metric-AI/ColQwen2.5-3b-multilingual-v1.0")
device : str
The device for inference (default "cuda:0").
dtype : str
Data type for model weights (default "bfloat16").
use_token_pooling : bool
Whether to use token pooling to reduce embedding size (default True).
pool_factor : int
Factor to reduce sequence length if token pooling is enabled (default 2).
quantization_config : Optional[BitsAndBytesConfig]
Quantization configuration for the model. (default None, bitsandbytes needed)
batch_size : int
Batch size for processing inputs (default 2).
"""
model_name: str = "Metric-AI/ColQwen2.5-3b-multilingual-v1.0"
device: str = "auto"
dtype: str = "bfloat16"
use_token_pooling: bool = True
pool_factor: int = 2
quantization_config: Optional[Any] = None
batch_size: int = 2
_model = None
_processor = None
_token_pooler = None
_vector_dim = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
(
self._model,
self._processor,
self._token_pooler,
) = self._load_model(
self.model_name,
self.dtype,
self.device,
self.use_token_pooling,
self.quantization_config,
)
@staticmethod
@lru_cache(maxsize=1)
def _load_model(
model_name: str,
dtype: str,
device: str,
use_token_pooling: bool,
quantization_config: Optional[Any],
):
"""
Initialize and cache the ColPali model, processor, and token pooler.
"""
torch = attempt_import_or_raise("torch", "torch")
transformers = attempt_import_or_raise("transformers", "transformers")
colpali_engine = attempt_import_or_raise("colpali_engine", "colpali_engine")
from colpali_engine.compression.token_pooling import HierarchicalTokenPooler
if quantization_config is not None:
if not isinstance(quantization_config, transformers.BitsAndBytesConfig):
raise ValueError("quantization_config must be a BitsAndBytesConfig")
if dtype == "bfloat16":
torch_dtype = torch.bfloat16
elif dtype == "float16":
torch_dtype = torch.float16
elif dtype == "float64":
torch_dtype = torch.float64
else:
torch_dtype = torch.float32
model = colpali_engine.models.ColQwen2_5.from_pretrained(
model_name,
torch_dtype=torch_dtype,
device_map=device,
quantization_config=quantization_config
if quantization_config is not None
else None,
attn_implementation="flash_attention_2"
if is_flash_attn_2_available()
else None,
).eval()
processor = colpali_engine.models.ColQwen2_5_Processor.from_pretrained(
model_name
)
token_pooler = HierarchicalTokenPooler() if use_token_pooling else None
return model, processor, token_pooler
def ndims(self):
"""
Return the dimension of a vector in the multivector output (e.g., 128).
"""
torch = attempt_import_or_raise("torch", "torch")
if self._vector_dim is None:
dummy_query = "test"
batch_queries = self._processor.process_queries([dummy_query]).to(
self._model.device
)
with torch.no_grad():
query_embeddings = self._model(**batch_queries)
if self.use_token_pooling and self._token_pooler is not None:
query_embeddings = self._token_pooler.pool_embeddings(
query_embeddings,
pool_factor=self.pool_factor,
padding=True,
padding_side=self._processor.tokenizer.padding_side,
)
self._vector_dim = query_embeddings[0].shape[-1]
return self._vector_dim
def _process_embeddings(self, embeddings):
"""
Format model embeddings into List[List[float]].
Use token pooling if enabled.
"""
torch = attempt_import_or_raise("torch", "torch")
if self.use_token_pooling and self._token_pooler is not None:
embeddings = self._token_pooler.pool_embeddings(
embeddings,
pool_factor=self.pool_factor,
padding=True,
padding_side=self._processor.tokenizer.padding_side,
)
if isinstance(embeddings, torch.Tensor):
tensors = embeddings.detach().cpu()
if tensors.dtype == torch.bfloat16:
tensors = tensors.to(torch.float32)
return (
tensors.numpy()
.astype(np.float64 if self.dtype == "float64" else np.float32)
.tolist()
)
return []
def generate_text_embeddings(self, text: TEXT) -> List[List[List[float]]]:
"""
Generate embeddings for text input.
"""
torch = attempt_import_or_raise("torch", "torch")
text = self.sanitize_input(text)
all_embeddings = []
for i in range(0, len(text), self.batch_size):
batch_text = text[i : i + self.batch_size]
batch_queries = self._processor.process_queries(batch_text).to(
self._model.device
)
with torch.no_grad():
query_embeddings = self._model(**batch_queries)
all_embeddings.extend(self._process_embeddings(query_embeddings))
return all_embeddings
def _prepare_images(self, images: IMAGES) -> List:
"""
Convert image inputs to PIL Images.
"""
PIL = attempt_import_or_raise("PIL", "pillow")
requests = attempt_import_or_raise("requests", "requests")
images = self.sanitize_input(images)
pil_images = []
try:
for image in images:
if isinstance(image, str):
if image.startswith(("http://", "https://")):
response = requests.get(image, timeout=10)
response.raise_for_status()
pil_images.append(PIL.Image.open(io.BytesIO(response.content)))
else:
with PIL.Image.open(image) as im:
pil_images.append(im.copy())
elif isinstance(image, bytes):
pil_images.append(PIL.Image.open(io.BytesIO(image)))
else:
# Assume it's a PIL Image; will raise if invalid
pil_images.append(image)
except Exception as e:
raise ValueError(f"Failed to process image: {e}")
return pil_images
def generate_image_embeddings(self, images: IMAGES) -> List[List[List[float]]]:
"""
Generate embeddings for a batch of images.
"""
torch = attempt_import_or_raise("torch", "torch")
pil_images = self._prepare_images(images)
all_embeddings = []
for i in range(0, len(pil_images), self.batch_size):
batch_images = pil_images[i : i + self.batch_size]
batch_images = self._processor.process_images(batch_images).to(
self._model.device
)
with torch.no_grad():
image_embeddings = self._model(**batch_images)
all_embeddings.extend(self._process_embeddings(image_embeddings))
return all_embeddings
def compute_query_embeddings(
self, query: Union[str, IMAGES], *args, **kwargs
) -> List[List[List[float]]]:
"""
Compute embeddings for a single user query (text only).
"""
if not isinstance(query, str):
raise ValueError(
"Query must be a string, image to image search is not supported"
)
return self.generate_text_embeddings([query])
def compute_source_embeddings(
self, images: IMAGES, *args, **kwargs
) -> List[List[List[float]]]:
"""
Compute embeddings for a batch of source images.
Parameters
----------
images : Union[str, bytes, List, pa.Array, pa.ChunkedArray, np.ndarray]
Batch of images (paths, URLs, bytes, or PIL Images).
"""
images = self.sanitize_input(images)
return self.generate_image_embeddings(images)

View File

@@ -18,6 +18,7 @@ import numpy as np
import pyarrow as pa
from ..dependencies import pandas as pd
from ..util import attempt_import_or_raise
# ruff: noqa: PERF203
@@ -275,3 +276,12 @@ def url_retrieve(url: str):
def api_key_not_found_help(provider):
logging.error("Could not find API key for %s", provider)
raise ValueError(f"Please set the {provider.upper()}_API_KEY environment variable.")
def is_flash_attn_2_available():
try:
attempt_import_or_raise("flash_attn", "flash_attn")
return True
except ImportError:
return False

View File

@@ -152,6 +152,104 @@ def Vector(
return FixedSizeList
def MultiVector(
dim: int, value_type: pa.DataType = pa.float32(), nullable: bool = True
) -> Type:
"""Pydantic MultiVector Type for multi-vector embeddings.
This type represents a list of vectors, each with the same dimension.
Useful for models that produce multiple embeddings per input, like ColPali.
Parameters
----------
dim : int
The dimension of each vector in the multi-vector.
value_type : pyarrow.DataType, optional
The value type of the vectors, by default pa.float32()
nullable : bool, optional
Whether the multi-vector is nullable, by default it is True.
Examples
--------
>>> import pydantic
>>> from lancedb.pydantic import MultiVector
...
>>> class MyModel(pydantic.BaseModel):
... id: int
... text: str
... embeddings: MultiVector(128) # List of 128-dimensional vectors
>>> schema = pydantic_to_schema(MyModel)
>>> assert schema == pa.schema([
... pa.field("id", pa.int64(), False),
... pa.field("text", pa.utf8(), False),
... pa.field("embeddings", pa.list_(pa.list_(pa.float32(), 128)))
... ])
"""
class MultiVectorList(list, FixedSizeListMixin):
def __repr__(self):
return f"MultiVector(dim={dim})"
@staticmethod
def nullable() -> bool:
return nullable
@staticmethod
def dim() -> int:
return dim
@staticmethod
def value_arrow_type() -> pa.DataType:
return value_type
@staticmethod
def is_multi_vector() -> bool:
return True
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: pydantic.GetCoreSchemaHandler
) -> CoreSchema:
return core_schema.no_info_after_validator_function(
cls,
core_schema.list_schema(
items_schema=core_schema.list_schema(
min_length=dim,
max_length=dim,
items_schema=core_schema.float_schema(),
),
),
)
@classmethod
def __get_validators__(cls) -> Generator[Callable, None, None]:
yield cls.validate
# For pydantic v1
@classmethod
def validate(cls, v):
if not isinstance(v, (list, range)):
raise TypeError("A list of vectors is needed")
for vec in v:
if not isinstance(vec, (list, range, np.ndarray)) or len(vec) != dim:
raise TypeError(f"Each vector must be a list of {dim} numbers")
return cls(v)
if PYDANTIC_VERSION.major < 2:
@classmethod
def __modify_schema__(cls, field_schema: Dict[str, Any]):
field_schema["items"] = {
"type": "array",
"items": {"type": "number"},
"minItems": dim,
"maxItems": dim,
}
return MultiVectorList
def _py_type_to_arrow_type(py_type: Type[Any], field: FieldInfo) -> pa.DataType:
"""Convert a field with native Python type to Arrow data type.
@@ -206,6 +304,9 @@ def _pydantic_type_to_arrow_type(tp: Any, field: FieldInfo) -> pa.DataType:
fields = _pydantic_model_to_fields(tp)
return pa.struct(fields)
if issubclass(tp, FixedSizeListMixin):
if getattr(tp, "is_multi_vector", lambda: False)():
return pa.list_(pa.list_(tp.value_arrow_type(), tp.dim()))
# For regular Vector
return pa.list_(tp.value_arrow_type(), tp.dim())
return _py_type_to_arrow_type(tp, field)

View File

@@ -28,6 +28,8 @@ import pyarrow.compute as pc
import pyarrow.fs as pa_fs
import pydantic
from lancedb.pydantic import PYDANTIC_VERSION
from . import __version__
from .arrow import AsyncRecordBatchReader
from .dependencies import pandas as pd
@@ -266,8 +268,8 @@ class MultiMatchQuery(FullTextQuery):
Parameters
----------
query : str | list[Query]
If a string, the query string to match against.
query : str
The query string to match against.
columns : list[str]
The list of columns to match against.
@@ -498,10 +500,14 @@ class Query(pydantic.BaseModel):
)
return query
class Config:
# This tells pydantic to allow custom types (needed for the `vector` query since
# pa.Array wouln't be allowed otherwise)
arbitrary_types_allowed = True
# This tells pydantic to allow custom types (needed for the `vector` query since
# pa.Array wouln't be allowed otherwise)
if PYDANTIC_VERSION.major < 2: # Pydantic 1.x compat
class Config:
arbitrary_types_allowed = True
else:
model_config = {"arbitrary_types_allowed": True}
class LanceQueryBuilder(ABC):
@@ -1586,6 +1592,8 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
self._refine_factor = None
self._distance_type = None
self._phrase_query = None
self._lower_bound = None
self._upper_bound = None
def _validate_query(self, query, vector=None, text=None):
if query is not None and (vector is not None or text is not None):
@@ -1665,6 +1673,10 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
self._vector_query.ef(self._ef)
if self._bypass_vector_index:
self._vector_query.bypass_vector_index()
if self._lower_bound or self._upper_bound:
self._vector_query.distance_range(
lower_bound=self._lower_bound, upper_bound=self._upper_bound
)
if self._reranker is None:
self._reranker = RRFReranker()

View File

@@ -104,6 +104,7 @@ class RemoteTable(Table):
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST", "scalar"] = "scalar",
*,
replace: bool = False,
wait_timeout: timedelta = None,
):
"""Creates a scalar index
Parameters
@@ -126,13 +127,18 @@ class RemoteTable(Table):
else:
raise ValueError(f"Unknown index type: {index_type}")
LOOP.run(self._table.create_index(column, config=config, replace=replace))
LOOP.run(
self._table.create_index(
column, config=config, replace=replace, wait_timeout=wait_timeout
)
)
def create_fts_index(
self,
column: str,
*,
replace: bool = False,
wait_timeout: timedelta = None,
with_position: bool = True,
# tokenizer configs:
base_tokenizer: str = "simple",
@@ -153,7 +159,11 @@ class RemoteTable(Table):
remove_stop_words=remove_stop_words,
ascii_folding=ascii_folding,
)
LOOP.run(self._table.create_index(column, config=config, replace=replace))
LOOP.run(
self._table.create_index(
column, config=config, replace=replace, wait_timeout=wait_timeout
)
)
def create_index(
self,
@@ -165,6 +175,7 @@ class RemoteTable(Table):
replace: Optional[bool] = None,
accelerator: Optional[str] = None,
index_type="vector",
wait_timeout: Optional[timedelta] = None,
):
"""Create an index on the table.
Currently, the only parameters that matter are
@@ -236,7 +247,11 @@ class RemoteTable(Table):
" 'IVF_FLAT', 'IVF_PQ', 'IVF_HNSW_PQ', 'IVF_HNSW_SQ'"
)
LOOP.run(self._table.create_index(vector_column_name, config=config))
LOOP.run(
self._table.create_index(
vector_column_name, config=config, wait_timeout=wait_timeout
)
)
def add(
self,
@@ -554,6 +569,11 @@ class RemoteTable(Table):
def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name))
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
):
return LOOP.run(self._table.wait_for_index(index_names, timeout))
def uses_v2_manifest_paths(self) -> bool:
raise NotImplementedError(
"uses_v2_manifest_paths() is not supported on the LanceDB Cloud"

View File

@@ -47,6 +47,9 @@ class AnswerdotaiRerankers(Reranker):
)
def _rerank(self, result_set: pa.Table, query: str):
result_set = self._handle_empty_results(result_set)
if len(result_set) == 0:
return result_set
docs = result_set[self.column].to_pylist()
doc_ids = list(range(len(docs)))
result = self.reranker.rank(query, docs, doc_ids=doc_ids)
@@ -83,7 +86,6 @@ class AnswerdotaiRerankers(Reranker):
vector_results = self._rerank(vector_results, query)
if self.score == "relevance":
vector_results = vector_results.drop_columns(["_distance"])
vector_results = vector_results.sort_by([("_relevance_score", "descending")])
return vector_results
@@ -91,7 +93,5 @@ class AnswerdotaiRerankers(Reranker):
fts_results = self._rerank(fts_results, query)
if self.score == "relevance":
fts_results = fts_results.drop_columns(["_score"])
fts_results = fts_results.sort_by([("_relevance_score", "descending")])
return fts_results

View File

@@ -65,6 +65,16 @@ class Reranker(ABC):
f"{self.__class__.__name__} does not implement rerank_vector"
)
def _handle_empty_results(self, results: pa.Table):
"""
Helper method to handle empty FTS results consistently
"""
if len(results) > 0:
return results
return results.append_column(
"_relevance_score", pa.array([], type=pa.float32())
)
def rerank_fts(
self,
query: str,

View File

@@ -62,6 +62,9 @@ class CohereReranker(Reranker):
return cohere.Client(os.environ.get("COHERE_API_KEY") or self.api_key)
def _rerank(self, result_set: pa.Table, query: str):
result_set = self._handle_empty_results(result_set)
if len(result_set) == 0:
return result_set
docs = result_set[self.column].to_pylist()
response = self._client.rerank(
query=query,
@@ -99,24 +102,14 @@ class CohereReranker(Reranker):
)
return combined_results
def rerank_vector(
self,
query: str,
vector_results: pa.Table,
):
result_set = self._rerank(vector_results, query)
def rerank_vector(self, query: str, vector_results: pa.Table):
vector_results = self._rerank(vector_results, query)
if self.score == "relevance":
result_set = result_set.drop_columns(["_distance"])
vector_results = vector_results.drop_columns(["_distance"])
return vector_results
return result_set
def rerank_fts(
self,
query: str,
fts_results: pa.Table,
):
result_set = self._rerank(fts_results, query)
def rerank_fts(self, query: str, fts_results: pa.Table):
fts_results = self._rerank(fts_results, query)
if self.score == "relevance":
result_set = result_set.drop_columns(["_score"])
return result_set
fts_results = fts_results.drop_columns(["_score"])
return fts_results

View File

@@ -63,6 +63,9 @@ class CrossEncoderReranker(Reranker):
return cross_encoder
def _rerank(self, result_set: pa.Table, query: str):
result_set = self._handle_empty_results(result_set)
if len(result_set) == 0:
return result_set
passages = result_set[self.column].to_pylist()
cross_inp = [[query, passage] for passage in passages]
cross_scores = self.model.predict(cross_inp)
@@ -93,11 +96,7 @@ class CrossEncoderReranker(Reranker):
return combined_results
def rerank_vector(
self,
query: str,
vector_results: pa.Table,
):
def rerank_vector(self, query: str, vector_results: pa.Table):
vector_results = self._rerank(vector_results, query)
if self.score == "relevance":
vector_results = vector_results.drop_columns(["_distance"])
@@ -105,11 +104,7 @@ class CrossEncoderReranker(Reranker):
vector_results = vector_results.sort_by([("_relevance_score", "descending")])
return vector_results
def rerank_fts(
self,
query: str,
fts_results: pa.Table,
):
def rerank_fts(self, query: str, fts_results: pa.Table):
fts_results = self._rerank(fts_results, query)
if self.score == "relevance":
fts_results = fts_results.drop_columns(["_score"])

View File

@@ -62,6 +62,9 @@ class JinaReranker(Reranker):
return self._session
def _rerank(self, result_set: pa.Table, query: str):
result_set = self._handle_empty_results(result_set)
if len(result_set) == 0:
return result_set
docs = result_set[self.column].to_pylist()
response = self._client.post( # type: ignore
API_URL,
@@ -104,24 +107,14 @@ class JinaReranker(Reranker):
)
return combined_results
def rerank_vector(
self,
query: str,
vector_results: pa.Table,
):
result_set = self._rerank(vector_results, query)
def rerank_vector(self, query: str, vector_results: pa.Table):
vector_results = self._rerank(vector_results, query)
if self.score == "relevance":
result_set = result_set.drop_columns(["_distance"])
vector_results = vector_results.drop_columns(["_distance"])
return vector_results
return result_set
def rerank_fts(
self,
query: str,
fts_results: pa.Table,
):
result_set = self._rerank(fts_results, query)
def rerank_fts(self, query: str, fts_results: pa.Table):
fts_results = self._rerank(fts_results, query)
if self.score == "relevance":
result_set = result_set.drop_columns(["_score"])
return result_set
fts_results = fts_results.drop_columns(["_score"])
return fts_results

View File

@@ -44,6 +44,9 @@ class OpenaiReranker(Reranker):
self.api_key = api_key
def _rerank(self, result_set: pa.Table, query: str):
result_set = self._handle_empty_results(result_set)
if len(result_set) == 0:
return result_set
docs = result_set[self.column].to_pylist()
response = self._client.chat.completions.create(
model=self.model_name,
@@ -104,18 +107,14 @@ class OpenaiReranker(Reranker):
vector_results = self._rerank(vector_results, query)
if self.score == "relevance":
vector_results = vector_results.drop_columns(["_distance"])
vector_results = vector_results.sort_by([("_relevance_score", "descending")])
return vector_results
def rerank_fts(self, query: str, fts_results: pa.Table):
fts_results = self._rerank(fts_results, query)
if self.score == "relevance":
fts_results = fts_results.drop_columns(["_score"])
fts_results = fts_results.sort_by([("_relevance_score", "descending")])
return fts_results
@cached_property

View File

@@ -63,6 +63,9 @@ class VoyageAIReranker(Reranker):
)
def _rerank(self, result_set: pa.Table, query: str):
result_set = self._handle_empty_results(result_set)
if len(result_set) == 0:
return result_set
docs = result_set[self.column].to_pylist()
response = self._client.rerank(
query=query,
@@ -101,24 +104,14 @@ class VoyageAIReranker(Reranker):
)
return combined_results
def rerank_vector(
self,
query: str,
vector_results: pa.Table,
):
result_set = self._rerank(vector_results, query)
def rerank_vector(self, query: str, vector_results: pa.Table):
vector_results = self._rerank(vector_results, query)
if self.score == "relevance":
result_set = result_set.drop_columns(["_distance"])
vector_results = vector_results.drop_columns(["_distance"])
return vector_results
return result_set
def rerank_fts(
self,
query: str,
fts_results: pa.Table,
):
result_set = self._rerank(fts_results, query)
def rerank_fts(self, query: str, fts_results: pa.Table):
fts_results = self._rerank(fts_results, query)
if self.score == "relevance":
result_set = result_set.drop_columns(["_score"])
return result_set
fts_results = fts_results.drop_columns(["_score"])
return fts_results

View File

@@ -631,6 +631,7 @@ class Table(ABC):
index_cache_size: Optional[int] = None,
*,
index_type: VectorIndexType = "IVF_PQ",
wait_timeout: Optional[timedelta] = None,
num_bits: int = 8,
max_iterations: int = 50,
sample_rate: int = 256,
@@ -666,6 +667,8 @@ class Table(ABC):
num_bits: int
The number of bits to encode sub-vectors. Only used with the IVF_PQ index.
Only 4 and 8 are supported.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
"""
raise NotImplementedError
@@ -689,6 +692,23 @@ class Table(ABC):
"""
raise NotImplementedError
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
"""
Wait for indexing to complete for the given index names.
This will poll the table until all the indices are fully indexed,
or raise a timeout exception if the timeout is reached.
Parameters
----------
index_names: str
The name of the indices to poll
timeout: timedelta
Timeout to wait for asynchronous indexing. The default is 5 minutes.
"""
raise NotImplementedError
@abstractmethod
def create_scalar_index(
self,
@@ -696,6 +716,7 @@ class Table(ABC):
*,
replace: bool = True,
index_type: ScalarIndexType = "BTREE",
wait_timeout: Optional[timedelta] = None,
):
"""Create a scalar index on a column.
@@ -708,7 +729,8 @@ class Table(ABC):
Replace the existing index if it exists.
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"], default "BTREE"
The type of index to create.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
Examples
--------
@@ -767,6 +789,7 @@ class Table(ABC):
stem: bool = False,
remove_stop_words: bool = False,
ascii_folding: bool = False,
wait_timeout: Optional[timedelta] = None,
):
"""Create a full-text search index on the table.
@@ -822,6 +845,8 @@ class Table(ABC):
ascii_folding : bool, default False
Whether to fold ASCII characters. This converts accented characters to
their ASCII equivalent. For example, "café" would be converted to "cafe".
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
"""
raise NotImplementedError
@@ -1745,8 +1770,37 @@ class LanceTable(Table):
)
def drop_index(self, name: str) -> None:
"""
Drops an index from the table
Parameters
----------
name: str
The name of the index to drop
"""
return LOOP.run(self._table.drop_index(name))
def prewarm_index(self, name: str) -> None:
"""
Prewarms an index in the table
This loads the entire index into memory
If the index does not fit into the available cache this call
may be wasteful
Parameters
----------
name: str
The name of the index to prewarm
"""
return LOOP.run(self._table.prewarm_index(name))
def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
return LOOP.run(self._table.wait_for_index(index_names, timeout))
def create_scalar_index(
self,
column: str,
@@ -2141,6 +2195,8 @@ class LanceTable(Table):
and also the "_distance" column which is the distance between the query
vector and the returned vector.
"""
if isinstance(query, FullTextQuery):
query_type = "fts"
vector_column_name = infer_vector_column_name(
schema=self.schema,
query_type=query_type,
@@ -2938,6 +2994,7 @@ class AsyncTable:
config: Optional[
Union[IvfFlat, IvfPq, HnswPq, HnswSq, BTree, Bitmap, LabelList, FTS]
] = None,
wait_timeout: Optional[timedelta] = None,
):
"""Create an index to speed up queries
@@ -2962,6 +3019,8 @@ class AsyncTable:
For advanced configuration you can specify the type of index you would
like to create. You can also specify index-specific parameters when
creating an index object.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
"""
if config is not None:
if not isinstance(
@@ -2972,7 +3031,9 @@ class AsyncTable:
" Bitmap, LabelList, or FTS"
)
try:
await self._inner.create_index(column, index=config, replace=replace)
await self._inner.create_index(
column, index=config, replace=replace, wait_timeout=wait_timeout
)
except ValueError as e:
if "not support the requested language" in str(e):
supported_langs = ", ".join(lang_mapping.values())
@@ -3000,6 +3061,40 @@ class AsyncTable:
"""
await self._inner.drop_index(name)
async def prewarm_index(self, name: str) -> None:
"""
Prewarm an index in the table.
Parameters
----------
name: str
The name of the index to prewarm
Notes
-----
This will load the index into memory. This may reduce the cold-start time for
future queries. If the index does not fit in the cache then this call may be
wasteful.
"""
await self._inner.prewarm_index(name)
async def wait_for_index(
self, index_names: Iterable[str], timeout: timedelta = timedelta(seconds=300)
) -> None:
"""
Wait for indexing to complete for the given index names.
This will poll the table until all the indices are fully indexed,
or raise a timeout exception if the timeout is reached.
Parameters
----------
index_names: str
The name of the indices to poll
timeout: timedelta
Timeout to wait for asynchronous indexing. The default is 5 minutes.
"""
await self._inner.wait_for_index(index_names, timeout)
async def add(
self,
data: DATA,
@@ -3223,8 +3318,10 @@ class AsyncTable:
async def get_embedding_func(
vector_column_name: Optional[str],
query_type: QueryType,
query: Optional[Union[VEC, str, "PIL.Image.Image", Tuple]],
query: Optional[Union[VEC, str, "PIL.Image.Image", Tuple, FullTextQuery]],
) -> Tuple[str, EmbeddingFunctionConfig]:
if isinstance(query, FullTextQuery):
query_type = "fts"
schema = await self.schema()
vector_column_name = infer_vector_column_name(
schema=schema,

View File

@@ -253,9 +253,14 @@ def infer_vector_column_name(
query: Optional[Any], # inferred later in query builder
vector_column_name: Optional[str],
):
if (vector_column_name is None and query is not None and query_type != "fts") or (
vector_column_name is None and query_type == "hybrid"
):
if vector_column_name is not None:
return vector_column_name
if query_type == "fts":
# FTS queries do not require a vector column
return None
if query is not None or query_type == "hybrid":
try:
vector_column_name = inf_vector_column_query(schema)
except Exception as e:

View File

@@ -315,11 +315,6 @@ def test_table():
db = lancedb.connect(uri, read_consistency_interval=timedelta(seconds=5))
tbl = db.open_table("test_table")
# --8<-- [end:table_eventual_consistency]
# --8<-- [start:table_no_consistency]
uri = "data/sample-lancedb"
db = lancedb.connect(uri, read_consistency_interval=None)
tbl = db.open_table("test_table")
# --8<-- [end:table_no_consistency]
# --8<-- [start:table_checkout_latest]
tbl = db.open_table("test_table")
@@ -574,12 +569,6 @@ async def test_table_async():
)
async_tbl = await async_db.open_table("test_table_async")
# --8<-- [end:table_async_eventual_consistency]
# --8<-- [start:table_async_no_consistency]
uri = "data/sample-lancedb"
async_db = await lancedb.connect_async(uri, read_consistency_interval=None)
async_tbl = await async_db.open_table("test_table_async")
# --8<-- [end:table_async_no_consistency]
# --8<-- [start:table_async_checkout_latest]
async_tbl = await async_db.open_table("test_table_async")

View File

@@ -6,7 +6,9 @@ import lancedb
# --8<-- [end:import-lancedb]
# --8<-- [start:import-numpy]
from lancedb.query import BoostQuery, MatchQuery
import numpy as np
import pyarrow as pa
# --8<-- [end:import-numpy]
# --8<-- [start:import-datetime]
@@ -154,6 +156,84 @@ async def test_vector_search_async():
# --8<-- [end:search_result_async_as_list]
def test_fts_fuzzy_query():
uri = "data/fuzzy-example"
db = lancedb.connect(uri)
table = db.create_table(
"my_table_fts_fuzzy",
data=pa.table(
{
"text": [
"fa",
"fo", # spellchecker:disable-line
"fob",
"focus",
"foo",
"food",
"foul",
]
}
),
mode="overwrite",
)
table.create_fts_index("text", use_tantivy=False, replace=True)
results = table.search(MatchQuery("foo", "text", fuzziness=1)).to_pandas()
assert len(results) == 4
assert set(results["text"].to_list()) == {
"foo",
"fo", # 1 deletion # spellchecker:disable-line
"fob", # 1 substitution
"food", # 1 insertion
}
def test_fts_boost_query():
uri = "data/boost-example"
db = lancedb.connect(uri)
table = db.create_table(
"my_table_fts_boost",
data=pa.table(
{
"title": [
"The Hidden Gems of Travel",
"Exploring Nature's Wonders",
"Cultural Treasures Unveiled",
"The Nightlife Chronicles",
"Scenic Escapes and Challenges",
],
"desc": [
"A vibrant city with occasional traffic jams.",
"Beautiful landscapes but overpriced tourist spots.",
"Rich cultural heritage but humid summers.",
"Bustling nightlife but noisy streets.",
"Scenic views but limited public transport options.",
],
}
),
mode="overwrite",
)
table.create_fts_index("desc", use_tantivy=False, replace=True)
results = table.search(
BoostQuery(
MatchQuery("beautiful, cultural, nightlife", "desc"),
MatchQuery("bad traffic jams, overpriced", "desc"),
),
).to_pandas()
# we will hit 3 results because the positive query has 3 hits
assert len(results) == 3
# the one containing "overpriced" will be negatively boosted,
# so it will be the last one
assert (
results["desc"].to_list()[2]
== "Beautiful landscapes but overpriced tourist spots."
)
def test_fts_native():
# --8<-- [start:basic_fts]
uri = "data/sample-lancedb"

View File

@@ -3,6 +3,7 @@
import re
from datetime import timedelta
import os
import lancedb
@@ -298,11 +299,13 @@ def test_create_exist_ok(tmp_db: lancedb.DBConnection):
@pytest.mark.asyncio
async def test_connect(tmp_path):
db = await lancedb.connect_async(tmp_path)
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=5s)"
db = await lancedb.connect_async(tmp_path, read_consistency_interval=None)
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)"
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=5)
)
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=5s)"
@pytest.mark.asyncio
async def test_close(mem_db_async: lancedb.AsyncConnection):
@@ -450,7 +453,7 @@ async def test_open_table(tmp_path):
assert tbl.name == "test"
assert (
re.search(
r"NativeTable\(test, uri=.*test\.lance, read_consistency_interval=5s\)",
r"NativeTable\(test, uri=.*test\.lance, read_consistency_interval=None\)",
str(tbl),
)
is not None

View File

@@ -11,7 +11,7 @@ import pandas as pd
import pyarrow as pa
import pytest
from lancedb.embeddings import get_registry
from lancedb.pydantic import LanceModel, Vector
from lancedb.pydantic import LanceModel, Vector, MultiVector
import requests
# These are integration tests for embedding functions.
@@ -575,3 +575,67 @@ def test_voyageai_multimodal_embedding_text_function():
tbl.add(df)
assert len(tbl.to_pandas()["vector"][0]) == voyageai.ndims()
@pytest.mark.slow
@pytest.mark.skipif(
importlib.util.find_spec("colpali_engine") is None,
reason="colpali_engine not installed",
)
def test_colpali(tmp_path):
import requests
from lancedb.pydantic import LanceModel
db = lancedb.connect(tmp_path)
registry = get_registry()
func = registry.get("colpali").create()
class MediaItems(LanceModel):
text: str
image_uri: str = func.SourceField()
image_bytes: bytes = func.SourceField()
image_vectors: MultiVector(func.ndims()) = (
func.VectorField()
) # Multivector image embeddings
table = db.create_table("media", schema=MediaItems)
texts = [
"a cute cat playing with yarn",
"a puppy in a flower field",
"a red sports car on the highway",
"a vintage bicycle leaning against a wall",
"a plate of delicious pasta",
"fresh fruit salad in a bowl",
]
uris = [
"http://farm1.staticflickr.com/53/167798175_7c7845bbbd_z.jpg",
"http://farm1.staticflickr.com/134/332220238_da527d8140_z.jpg",
"http://farm9.staticflickr.com/8387/8602747737_2e5c2a45d4_z.jpg",
"http://farm5.staticflickr.com/4092/5017326486_1f46057f5f_z.jpg",
"http://farm9.staticflickr.com/8216/8434969557_d37882c42d_z.jpg",
"http://farm6.staticflickr.com/5142/5835678453_4f3a4edb45_z.jpg",
]
# Get images as bytes
image_bytes = [requests.get(uri).content for uri in uris]
table.add(
pd.DataFrame({"text": texts, "image_uri": uris, "image_bytes": image_bytes})
)
# Test text-to-image search
image_results = (
table.search("fluffy companion", vector_column_name="image_vectors")
.limit(1)
.to_pydantic(MediaItems)[0]
)
assert "cat" in image_results.text.lower() or "puppy" in image_results.text.lower()
# Verify multivector dimensions
first_row = table.to_arrow().to_pylist()[0]
assert len(first_row["image_vectors"]) > 1, "Should have multiple image vectors"
assert len(first_row["image_vectors"][0]) == func.ndims(), (
"Vector dimension mismatch"
)

View File

@@ -22,6 +22,7 @@ from lancedb.db import DBConnection
from lancedb.index import FTS
from lancedb.query import BoostQuery, MatchQuery, MultiMatchQuery, PhraseQuery
import numpy as np
import pyarrow as pa
import pandas as pd
import pytest
from utils import exception_output
@@ -626,3 +627,32 @@ def test_language(mem_db: DBConnection):
# Stop words -> no results
results = table.search("la", query_type="fts").limit(5).to_list()
assert len(results) == 0
def test_fts_on_list(mem_db: DBConnection):
data = pa.table(
{
"text": [
["lance database", "the", "search"],
["lance database"],
["lance", "search"],
["database", "search"],
["unrelated", "doc"],
],
"vector": [
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0],
[7.0, 8.0, 9.0],
[10.0, 11.0, 12.0],
[13.0, 14.0, 15.0],
],
}
)
table = mem_db.create_table("test", data=data)
table.create_fts_index("text", use_tantivy=False)
res = table.search("lance").limit(5).to_list()
assert len(res) == 3
res = table.search(PhraseQuery("lance database", "text")).limit(5).to_list()
assert len(res) == 2

View File

@@ -4,13 +4,32 @@
import lancedb
from lancedb.query import LanceHybridQueryBuilder
from lancedb.rerankers.rrf import RRFReranker
import pyarrow as pa
import pyarrow.compute as pc
import pytest
import pytest_asyncio
from lancedb.index import FTS
from lancedb.table import AsyncTable
from lancedb.table import AsyncTable, Table
@pytest.fixture
def sync_table(tmpdir_factory) -> Table:
tmp_path = str(tmpdir_factory.mktemp("data"))
db = lancedb.connect(tmp_path)
data = pa.table(
{
"text": pa.array(["a", "b", "cat", "dog"]),
"vector": pa.array(
[[0.1, 0.1], [2, 2], [-0.1, -0.1], [0.5, -0.5]],
type=pa.list_(pa.float32(), list_size=2),
),
}
)
table = db.create_table("test", data)
table.create_fts_index("text", with_position=False, use_tantivy=False)
return table
@pytest_asyncio.fixture
@@ -102,6 +121,42 @@ async def test_async_hybrid_query_default_limit(table: AsyncTable):
assert texts.count("a") == 1
def test_hybrid_query_distance_range(sync_table: Table):
reranker = RRFReranker(return_score="all")
result = (
sync_table.search(query_type="hybrid")
.vector([0.0, 0.4])
.text("cat and dog")
.distance_range(lower_bound=0.2, upper_bound=0.5)
.rerank(reranker)
.limit(2)
.to_arrow()
)
assert len(result) == 2
print(result)
for dist in result["_distance"]:
if dist.is_valid:
assert 0.2 <= dist.as_py() <= 0.5
@pytest.mark.asyncio
async def test_hybrid_query_distance_range_async(table: AsyncTable):
reranker = RRFReranker(return_score="all")
result = await (
table.query()
.nearest_to([0.0, 0.4])
.nearest_to_text("cat and dog")
.distance_range(lower_bound=0.2, upper_bound=0.5)
.rerank(reranker)
.limit(2)
.to_arrow()
)
assert len(result) == 2
for dist in result["_distance"]:
if dist.is_valid:
assert 0.2 <= dist.as_py() <= 0.5
@pytest.mark.asyncio
async def test_explain_plan(table: AsyncTable):
plan = await (

View File

@@ -8,7 +8,7 @@ import pyarrow as pa
import pytest
import pytest_asyncio
from lancedb import AsyncConnection, AsyncTable, connect_async
from lancedb.index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq
from lancedb.index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
@pytest_asyncio.fixture
@@ -119,6 +119,18 @@ async def test_create_label_list_index(some_table: AsyncTable):
assert str(indices) == '[Index(LabelList, columns=["tags"], name="tags_idx")]'
@pytest.mark.asyncio
async def test_full_text_search_index(some_table: AsyncTable):
await some_table.create_index("tags", config=FTS(with_position=False))
indices = await some_table.list_indices()
assert str(indices) == '[Index(FTS, columns=["tags"], name="tags_idx")]'
await some_table.prewarm_index("tags_idx")
res = await (await some_table.search("tag0")).to_arrow()
assert res.num_rows > 0
@pytest.mark.asyncio
async def test_create_vector_index(some_table: AsyncTable):
# Can create

View File

@@ -9,7 +9,13 @@ from typing import List, Optional, Tuple
import pyarrow as pa
import pydantic
import pytest
from lancedb.pydantic import PYDANTIC_VERSION, LanceModel, Vector, pydantic_to_schema
from lancedb.pydantic import (
PYDANTIC_VERSION,
LanceModel,
Vector,
pydantic_to_schema,
MultiVector,
)
from pydantic import BaseModel
from pydantic import Field
@@ -354,3 +360,55 @@ def test_optional_nested_model():
),
]
)
def test_multi_vector():
class TestModel(pydantic.BaseModel):
vec: MultiVector(8)
schema = pydantic_to_schema(TestModel)
assert schema == pa.schema(
[pa.field("vec", pa.list_(pa.list_(pa.float32(), 8)), True)]
)
with pytest.raises(pydantic.ValidationError):
TestModel(vec=[[1.0] * 7])
with pytest.raises(pydantic.ValidationError):
TestModel(vec=[[1.0] * 9])
TestModel(vec=[[1.0] * 8])
TestModel(vec=[[1.0] * 8, [2.0] * 8])
TestModel(vec=[])
def test_multi_vector_nullable():
class NullableModel(pydantic.BaseModel):
vec: MultiVector(16, nullable=False)
schema = pydantic_to_schema(NullableModel)
assert schema == pa.schema(
[pa.field("vec", pa.list_(pa.list_(pa.float32(), 16)), False)]
)
class DefaultModel(pydantic.BaseModel):
vec: MultiVector(16)
schema = pydantic_to_schema(DefaultModel)
assert schema == pa.schema(
[pa.field("vec", pa.list_(pa.list_(pa.float32(), 16)), True)]
)
def test_multi_vector_in_lance_model():
class TestModel(LanceModel):
id: int
vectors: MultiVector(16) = Field(default=[[0.0] * 16])
schema = pydantic_to_schema(TestModel)
assert schema == TestModel.to_arrow_schema()
assert TestModel.field_names() == ["id", "vectors"]
t = TestModel(id=1)
assert t.vectors == [[0.0] * 16]

View File

@@ -257,7 +257,9 @@ async def test_distance_range_with_new_rows_async():
}
)
table = await conn.create_table("test", data)
table.create_index("vector", config=IvfPq(num_partitions=1, num_sub_vectors=2))
await table.create_index(
"vector", config=IvfPq(num_partitions=1, num_sub_vectors=2)
)
q = [0, 0]
rs = await table.query().nearest_to(q).to_arrow()

View File

@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import re
from concurrent.futures import ThreadPoolExecutor
import contextlib
from datetime import timedelta
@@ -235,6 +235,10 @@ def test_table_add_in_threadpool():
def test_table_create_indices():
def handler(request):
index_stats = dict(
index_type="IVF_PQ", num_indexed_rows=1000, num_unindexed_rows=0
)
if request.path == "/v1/table/test/create_index/":
request.send_response(200)
request.end_headers()
@@ -258,6 +262,47 @@ def test_table_create_indices():
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/list/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
dict(
indexes=[
{
"index_name": "id_idx",
"columns": ["id"],
},
{
"index_name": "text_idx",
"columns": ["text"],
},
{
"index_name": "vector_idx",
"columns": ["vector"],
},
]
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/id_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/text_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/vector_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
request.wfile.write(payload.encode())
elif "/drop/" in request.path:
request.send_response(200)
request.end_headers()
@@ -269,14 +314,81 @@ def test_table_create_indices():
# Parameters are well-tested through local and async tests.
# This is a smoke-test.
table = db.create_table("test", [{"id": 1}])
table.create_scalar_index("id")
table.create_fts_index("text")
table.create_scalar_index("vector")
table.create_scalar_index("id", wait_timeout=timedelta(seconds=2))
table.create_fts_index("text", wait_timeout=timedelta(seconds=2))
table.create_index(
vector_column_name="vector", wait_timeout=timedelta(seconds=10)
)
table.wait_for_index(["id_idx"], timedelta(seconds=2))
table.wait_for_index(["text_idx", "vector_idx"], timedelta(seconds=2))
table.drop_index("vector_idx")
table.drop_index("id_idx")
table.drop_index("text_idx")
def test_table_wait_for_index_timeout():
def handler(request):
index_stats = dict(
index_type="BTREE", num_indexed_rows=1000, num_unindexed_rows=1
)
if request.path == "/v1/table/test/create/?mode=create":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
elif request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
dict(
version=1,
schema=dict(
fields=[
dict(name="id", type={"type": "int64"}, nullable=False),
]
),
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/list/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
dict(
indexes=[
{
"index_name": "id_idx",
"columns": ["id"],
},
]
)
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/index/id_idx/stats/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(index_stats)
print(f"{index_stats=}")
request.wfile.write(payload.encode())
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}])
with pytest.raises(
RuntimeError,
match=re.escape(
'Timeout error: timed out waiting for indices: ["id_idx"] after 1s'
),
):
table.wait_for_index(["id_idx"], timedelta(seconds=1))
@contextlib.contextmanager
def query_test_table(query_handler, *, server_version=Version("0.1.0")):
def handler(request):

View File

@@ -457,3 +457,45 @@ def test_voyageai_reranker(tmp_path, use_tantivy):
reranker = VoyageAIReranker(model_name="rerank-2")
table, schema = get_test_table(tmp_path, use_tantivy)
_run_test_reranker(reranker, table, "single player experience", None, schema)
def test_empty_result_reranker():
pytest.importorskip("sentence_transformers")
db = lancedb.connect("memory://")
# Define schema
schema = pa.schema(
[
("id", pa.int64()),
("text", pa.string()),
("vector", pa.list_(pa.float32(), 128)), # 128-dimensional vector
]
)
# Create empty table with schema
empty_table = db.create_table("empty_table", schema=schema, mode="overwrite")
empty_table.create_fts_index("text", use_tantivy=False, replace=True)
for reranker in [
CrossEncoderReranker(),
# ColbertReranker(),
# AnswerdotaiRerankers(),
# OpenaiReranker(),
# JinaReranker(),
# VoyageAIReranker(model_name="rerank-2"),
]:
results = (
empty_table.search(list(range(128)))
.limit(3)
.rerank(reranker, "query")
.to_arrow()
)
# check if empty set contains _relevance_score column
assert "_relevance_score" in results.column_names
assert len(results) == 0
results = (
empty_table.search("query", query_type="fts")
.limit(3)
.rerank(reranker)
.to_arrow()
)

View File

@@ -9,9 +9,9 @@ from typing import List
from unittest.mock import patch
import lancedb
from lancedb.dependencies import _PANDAS_AVAILABLE
from lancedb.index import HnswPq, HnswSq, IvfPq
import numpy as np
import pandas as pd
import polars as pl
import pyarrow as pa
import pyarrow.dataset
@@ -32,11 +32,7 @@ def test_basic(mem_db: DBConnection):
table = mem_db.create_table("test", data=data)
assert table.name == "test"
assert (
"LanceTable(name='test', version=1, "
"read_consistency_interval=datetime.timedelta(seconds=5), "
"_conn=LanceDBConnection("
) in repr(table)
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
expected_schema = pa.schema(
{
"vector": pa.list_(pa.float32(), 2),
@@ -142,13 +138,16 @@ def test_create_table(mem_db: DBConnection):
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
]
df = pd.DataFrame(rows)
pa_table = pa.Table.from_pandas(df, schema=schema)
pa_table = pa.Table.from_pylist(rows, schema=schema)
data = [
("Rows", rows),
("pd_DataFrame", df),
("pa_Table", pa_table),
]
if _PANDAS_AVAILABLE:
import pandas as pd
df = pd.DataFrame(rows)
data.append(("pd_DataFrame", df))
for name, d in data:
tbl = mem_db.create_table(name, data=d, schema=schema).to_arrow()
@@ -300,7 +299,7 @@ def test_add_subschema(mem_db: DBConnection):
data = {"price": 10.0, "item": "foo"}
table.add([data])
data = pd.DataFrame({"price": [2.0], "vector": [[3.1, 4.1]]})
data = pa.Table.from_pydict({"price": [2.0], "vector": [[3.1, 4.1]]})
table.add(data)
data = {"price": 3.0, "vector": [5.9, 26.5], "item": "bar"}
table.add([data])
@@ -409,6 +408,7 @@ def test_add_nullability(mem_db: DBConnection):
def test_add_pydantic_model(mem_db: DBConnection):
pytest.importorskip("pandas")
# https://github.com/lancedb/lancedb/issues/562
class Metadata(BaseModel):
@@ -477,10 +477,10 @@ def test_polars(mem_db: DBConnection):
table = mem_db.create_table("test", data=pl.DataFrame(data))
assert len(table) == 2
result = table.to_pandas()
assert np.allclose(result["vector"].tolist(), data["vector"])
assert result["item"].tolist() == data["item"]
assert np.allclose(result["price"].tolist(), data["price"])
result = table.to_arrow()
assert np.allclose(result["vector"].to_pylist(), data["vector"])
assert result["item"].to_pylist() == data["item"]
assert np.allclose(result["price"].to_pylist(), data["price"])
schema = pa.schema(
[
@@ -692,7 +692,7 @@ def test_delete(mem_db: DBConnection):
assert len(table.list_versions()) == 2
assert table.version == 2
assert len(table) == 1
assert table.to_pandas()["id"].tolist() == [1]
assert table.to_arrow()["id"].to_pylist() == [1]
def test_update(mem_db: DBConnection):
@@ -856,6 +856,7 @@ def test_merge_insert(mem_db: DBConnection):
ids=["pa.Table", "pd.DataFrame", "rows"],
)
def test_merge_insert_subschema(mem_db: DBConnection, data_format):
pytest.importorskip("pandas")
initial_data = pa.table(
{"id": range(3), "a": [1.0, 2.0, 3.0], "c": ["x", "x", "x"]}
)
@@ -952,7 +953,7 @@ def test_create_with_embedding_function(mem_db: DBConnection):
func = MockTextEmbeddingFunction.create()
texts = ["hello world", "goodbye world", "foo bar baz fizz buzz"]
df = pd.DataFrame({"text": texts, "vector": func.compute_source_embeddings(texts)})
df = pa.table({"text": texts, "vector": func.compute_source_embeddings(texts)})
conf = EmbeddingFunctionConfig(
source_column="text", vector_column="vector", function=func
@@ -977,7 +978,7 @@ def test_create_f16_table(mem_db: DBConnection):
text: str
vector: Vector(32, value_type=pa.float16())
df = pd.DataFrame(
df = pa.table(
{
"text": [f"s-{i}" for i in range(512)],
"vector": [np.random.randn(32).astype(np.float16) for _ in range(512)],
@@ -990,7 +991,7 @@ def test_create_f16_table(mem_db: DBConnection):
table.add(df)
table.create_index(num_partitions=2, num_sub_vectors=2)
query = df.vector.iloc[2]
query = df["vector"][2].as_py()
expected = table.search(query).limit(2).to_arrow()
assert "s-2" in expected["text"].to_pylist()
@@ -1006,7 +1007,7 @@ def test_add_with_embedding_function(mem_db: DBConnection):
table = mem_db.create_table("my_table", schema=MyTable)
texts = ["hello world", "goodbye world", "foo bar baz fizz buzz"]
df = pd.DataFrame({"text": texts})
df = pa.table({"text": texts})
table.add(df)
texts = ["the quick brown fox", "jumped over the lazy dog"]
@@ -1037,14 +1038,14 @@ def test_multiple_vector_columns(mem_db: DBConnection):
{"vector1": v1, "vector2": v2, "text": "foo"},
{"vector1": v2, "vector2": v1, "text": "bar"},
]
df = pd.DataFrame(data)
df = pa.Table.from_pylist(data)
table.add(df)
q = np.random.randn(10)
result1 = table.search(q, vector_column_name="vector1").limit(1).to_pandas()
result2 = table.search(q, vector_column_name="vector2").limit(1).to_pandas()
result1 = table.search(q, vector_column_name="vector1").limit(1).to_arrow()
result2 = table.search(q, vector_column_name="vector2").limit(1).to_arrow()
assert result1["text"].iloc[0] != result2["text"].iloc[0]
assert result1["text"][0] != result2["text"][0]
def test_create_scalar_index(mem_db: DBConnection):
@@ -1082,22 +1083,22 @@ def test_empty_query(mem_db: DBConnection):
"my_table",
data=[{"text": "foo", "id": 0}, {"text": "bar", "id": 1}],
)
df = table.search().select(["id"]).where("text='bar'").limit(1).to_pandas()
val = df.id.iloc[0]
df = table.search().select(["id"]).where("text='bar'").limit(1).to_arrow()
val = df["id"][0].as_py()
assert val == 1
table = mem_db.create_table("my_table2", data=[{"id": i} for i in range(100)])
df = table.search().select(["id"]).to_pandas()
assert len(df) == 100
df = table.search().select(["id"]).to_arrow()
assert df.num_rows == 100
# None is the same as default
df = table.search().select(["id"]).limit(None).to_pandas()
assert len(df) == 100
df = table.search().select(["id"]).limit(None).to_arrow()
assert df.num_rows == 100
# invalid limist is the same as None, wihch is the same as default
df = table.search().select(["id"]).limit(-1).to_pandas()
assert len(df) == 100
df = table.search().select(["id"]).limit(-1).to_arrow()
assert df.num_rows == 100
# valid limit should work
df = table.search().select(["id"]).limit(42).to_pandas()
assert len(df) == 42
df = table.search().select(["id"]).limit(42).to_arrow()
assert df.num_rows == 42
def test_search_with_schema_inf_single_vector(mem_db: DBConnection):
@@ -1116,14 +1117,14 @@ def test_search_with_schema_inf_single_vector(mem_db: DBConnection):
{"vector_col": v1, "text": "foo"},
{"vector_col": v2, "text": "bar"},
]
df = pd.DataFrame(data)
df = pa.Table.from_pylist(data)
table.add(df)
q = np.random.randn(10)
result1 = table.search(q, vector_column_name="vector_col").limit(1).to_pandas()
result2 = table.search(q).limit(1).to_pandas()
result1 = table.search(q, vector_column_name="vector_col").limit(1).to_arrow()
result2 = table.search(q).limit(1).to_arrow()
assert result1["text"].iloc[0] == result2["text"].iloc[0]
assert result1["text"][0].as_py() == result2["text"][0].as_py()
def test_search_with_schema_inf_multiple_vector(mem_db: DBConnection):
@@ -1143,12 +1144,12 @@ def test_search_with_schema_inf_multiple_vector(mem_db: DBConnection):
{"vector1": v1, "vector2": v2, "text": "foo"},
{"vector1": v2, "vector2": v1, "text": "bar"},
]
df = pd.DataFrame(data)
df = pa.Table.from_pylist(data)
table.add(df)
q = np.random.randn(10)
with pytest.raises(ValueError):
table.search(q).limit(1).to_pandas()
table.search(q).limit(1).to_arrow()
def test_compact_cleanup(tmp_db: DBConnection):

View File

@@ -204,9 +204,7 @@ pub fn connect(
}
if let Some(read_consistency_interval) = read_consistency_interval {
let read_consistency_interval = Duration::from_secs_f64(read_consistency_interval);
builder = builder.read_consistency_interval(Some(read_consistency_interval));
} else {
builder = builder.read_consistency_interval(None);
builder = builder.read_consistency_interval(read_consistency_interval);
}
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);

View File

@@ -46,7 +46,7 @@ pub struct PyFullTextSearchQuery {
impl From<FullTextSearchQuery> for PyFullTextSearchQuery {
fn from(query: FullTextSearchQuery) -> Self {
PyFullTextSearchQuery {
Self {
columns: query.columns().into_iter().collect(),
query: query.query.query().to_owned(),
limit: query.limit,
@@ -100,7 +100,7 @@ pub struct PyQueryRequest {
impl From<AnyQuery> for PyQueryRequest {
fn from(query: AnyQuery) -> Self {
match query {
AnyQuery::Query(query_request) => PyQueryRequest {
AnyQuery::Query(query_request) => Self {
limit: query_request.limit,
offset: query_request.offset,
filter: query_request.filter.map(PyQueryFilter),
@@ -122,7 +122,7 @@ impl From<AnyQuery> for PyQueryRequest {
postfilter: None,
norm: None,
},
AnyQuery::VectorQuery(vector_query) => PyQueryRequest {
AnyQuery::VectorQuery(vector_query) => Self {
limit: vector_query.base.limit,
offset: vector_query.base.offset,
filter: vector_query.base.filter.map(PyQueryFilter),
@@ -652,6 +652,11 @@ impl HybridQuery {
self.inner_vec.bypass_vector_index();
}
#[pyo3(signature = (lower_bound=None, upper_bound=None))]
pub fn distance_range(&mut self, lower_bound: Option<f32>, upper_bound: Option<f32>) {
self.inner_vec.distance_range(lower_bound, upper_bound);
}
pub fn to_vector_query(&mut self) -> PyResult<VectorQuery> {
Ok(VectorQuery {
inner: self.inner_vec.inner.clone(),

View File

@@ -177,15 +177,19 @@ impl Table {
})
}
#[pyo3(signature = (column, index=None, replace=None))]
#[pyo3(signature = (column, index=None, replace=None, wait_timeout=None))]
pub fn create_index<'a>(
self_: PyRef<'a, Self>,
column: String,
index: Option<Bound<'_, PyAny>>,
replace: Option<bool>,
wait_timeout: Option<Bound<'_, PyAny>>,
) -> PyResult<Bound<'a, PyAny>> {
let index = extract_index_params(&index)?;
let mut op = self_.inner_ref()?.create_index(&[column], index);
let timeout = wait_timeout.map(|t| t.extract::<std::time::Duration>().unwrap());
let mut op = self_
.inner_ref()?
.create_index_with_timeout(&[column], index, timeout);
if let Some(replace) = replace {
op = op.replace(replace);
}
@@ -204,6 +208,34 @@ impl Table {
})
}
pub fn wait_for_index<'a>(
self_: PyRef<'a, Self>,
index_names: Vec<String>,
timeout: Bound<'_, PyAny>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
let timeout = timeout.extract::<std::time::Duration>()?;
future_into_py(self_.py(), async move {
let index_refs = index_names
.iter()
.map(String::as_str)
.collect::<Vec<&str>>();
inner
.wait_for_index(&index_refs, timeout)
.await
.infer_error()?;
Ok(())
})
}
pub fn prewarm_index(self_: PyRef<'_, Self>, index_name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.prewarm_index(&index_name).await.infer_error()?;
Ok(())
})
}
pub fn list_indices(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {

View File

@@ -163,8 +163,9 @@ pub fn parse_fts_query(query: &Bound<'_, PyDict>) -> PyResult<FtsQuery> {
.ok_or(PyValueError::new_err("boost not found"))?
.extract::<Vec<f32>>()?;
let query =
MultiMatchQuery::try_new_with_boosts(query, columns, boost).map_err(|e| {
let query = MultiMatchQuery::try_new(query, columns)
.and_then(|q| q.try_with_boosts(boost))
.map_err(|e| {
PyValueError::new_err(format!("Error creating MultiMatchQuery: {}", e))
})?;
Ok(query.into())

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-node"
version = "0.19.0-beta.5"
version = "0.19.0-beta.11"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true

View File

@@ -60,7 +60,7 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let mut conn_builder = connect(&path).storage_options(storage_options);
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(Some(interval));
conn_builder = conn_builder.read_consistency_interval(interval);
}
rt.spawn(async move {
let database = conn_builder.execute().await;

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.19.0-beta.5"
version = "0.19.0-beta.11"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -12,7 +12,7 @@ use super::{
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
OpenDatabaseRequest,
};
use crate::connection::{ConnectRequest, DEFAULT_READ_CONSISTENCY_INTERVAL};
use crate::connection::ConnectRequest;
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
use crate::database::{Database, DatabaseOptions};
use crate::error::{CreateDirSnafu, Error, Result};
@@ -81,7 +81,7 @@ impl ListingCatalogOptionsBuilder {
/// [`crate::database::listing::ListingDatabase`]
#[derive(Debug)]
pub struct ListingCatalog {
object_store: ObjectStore,
object_store: Arc<ObjectStore>,
uri: String,
@@ -105,7 +105,7 @@ impl ListingCatalog {
}
async fn open_path(path: &str) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_path(path).unwrap();
let (object_store, base_path) = ObjectStore::from_uri(path).await.unwrap();
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
@@ -214,7 +214,7 @@ impl Catalog for ListingCatalog {
uri: db_uri,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: Default::default(),
};
@@ -241,7 +241,7 @@ impl Catalog for ListingCatalog {
uri: db_path.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: Default::default(),
};
@@ -311,7 +311,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();

View File

@@ -36,9 +36,6 @@ pub use lance_encoding::version::LanceFileVersion;
#[cfg(feature = "remote")]
use lance_io::object_store::StorageOptions;
pub(crate) const DEFAULT_READ_CONSISTENCY_INTERVAL: Option<std::time::Duration> =
Some(std::time::Duration::from_secs(5));
/// A builder for configuring a [`Connection::table_names`] operation
pub struct TableNamesBuilder {
parent: Arc<dyn Database>,
@@ -621,15 +618,14 @@ pub struct ConnectRequest {
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For strong consistency, set this to
/// If None, then consistency is not checked. For performance
/// reasons, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero timedelta
/// for eventual consistency. If more than that interval has passed since
/// the last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
///
/// The default is 5 seconds.
pub read_consistency_interval: Option<std::time::Duration>,
}
@@ -647,7 +643,7 @@ impl ConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: HashMap::new(),
},
embedding_registry: None,
@@ -786,7 +782,8 @@ impl ConnectBuilder {
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For strong consistency, set this to
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
@@ -795,15 +792,13 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// The default is 5 seconds.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: Option<std::time::Duration>,
read_consistency_interval: std::time::Duration,
) -> Self {
self.request.read_consistency_interval = read_consistency_interval;
self.request.read_consistency_interval = Some(read_consistency_interval);
self
}
@@ -868,7 +863,7 @@ impl ConnectBuilder {
/// # Arguments
///
/// * `uri` - URI where the database is located, can be a local directory, supported remote cloud storage,
/// or a LanceDB Cloud database. See [ConnectOptions::uri] for a list of accepted formats
/// or a LanceDB Cloud database. See [ConnectOptions::uri] for a list of accepted formats
pub fn connect(uri: &str) -> ConnectBuilder {
ConnectBuilder::new(uri)
}
@@ -887,7 +882,7 @@ impl CatalogConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: HashMap::new(),
},
}

View File

@@ -41,7 +41,7 @@ where
/// ----------
/// - reader: RecordBatchReader
/// - strict: if set true, only `fixed_size_list<float>` is considered as vector column. If set to false,
/// a `list<float>` column with same length is also considered as vector column.
/// a `list<float>` column with same length is also considered as vector column.
pub fn infer_vector_columns(
reader: impl RecordBatchReader + Send,
strict: bool,

View File

@@ -201,7 +201,7 @@ impl ListingDatabaseOptionsBuilder {
/// We will have two tables named `table1` and `table2`.
#[derive(Debug)]
pub struct ListingDatabase {
object_store: ObjectStore,
object_store: Arc<ObjectStore>,
query_string: Option<String>,
pub(crate) uri: String,

View File

@@ -35,6 +35,8 @@ pub enum Error {
Schema { message: String },
#[snafu(display("Runtime error: {message}"))]
Runtime { message: String },
#[snafu(display("Timeout error: {message}"))]
Timeout { message: String },
// 3rd party / external errors
#[snafu(display("object_store error: {source}"))]

View File

@@ -1,11 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use scalar::FtsIndexBuilder;
use serde::Deserialize;
use serde_with::skip_serializing_none;
use std::sync::Arc;
use std::time::Duration;
use vector::IvfFlatIndexBuilder;
use crate::{table::BaseTable, DistanceType, Error, Result};
@@ -17,6 +17,7 @@ use self::{
pub mod scalar;
pub mod vector;
pub mod waiter;
/// Supported index types.
#[derive(Debug, Clone)]
@@ -69,6 +70,7 @@ pub struct IndexBuilder {
pub(crate) index: Index,
pub(crate) columns: Vec<String>,
pub(crate) replace: bool,
pub(crate) wait_timeout: Option<Duration>,
}
impl IndexBuilder {
@@ -78,6 +80,7 @@ impl IndexBuilder {
index,
columns,
replace: true,
wait_timeout: None,
}
}
@@ -91,6 +94,15 @@ impl IndexBuilder {
self
}
/// Duration of time to wait for asynchronous indexing to complete. If not set,
/// `create_index()` will not wait.
///
/// This is not supported for `NativeTable` since indexing is synchronous.
pub fn wait_timeout(mut self, d: Duration) -> Self {
self.wait_timeout = Some(d);
self
}
pub async fn execute(self) -> Result<()> {
self.parent.clone().create_index(self).await
}

View File

@@ -0,0 +1,89 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::error::Result;
use crate::table::BaseTable;
use crate::Error;
use log::debug;
use std::time::{Duration, Instant};
use tokio::time::sleep;
const DEFAULT_SLEEP_MS: u64 = 1000;
const MAX_WAIT: Duration = Duration::from_secs(2 * 60 * 60);
/// Poll the table using list_indices() and index_stats() until all of the indices have 0 un-indexed rows.
/// Will return Error::Timeout if the columns are not fully indexed within the timeout.
pub async fn wait_for_index(
table: &dyn BaseTable,
index_names: &[&str],
timeout: Duration,
) -> Result<()> {
if timeout > MAX_WAIT {
return Err(Error::InvalidInput {
message: format!("timeout must be less than {:?}", MAX_WAIT),
});
}
let start = Instant::now();
let mut remaining = index_names.to_vec();
// poll via list_indices() and index_stats() until all indices are created and fully indexed
while start.elapsed() < timeout {
let mut completed = vec![];
let indices = table.list_indices().await?;
for &idx in &remaining {
if !indices.iter().any(|i| i.name == *idx) {
debug!("still waiting for new index '{}'", idx);
continue;
}
let stats = table.index_stats(idx.as_ref()).await?;
match stats {
None => {
debug!("still waiting for new index '{}'", idx);
continue;
}
Some(s) => {
if s.num_unindexed_rows == 0 {
// note: this may never stabilize under constant writes.
// we should later replace this with a status/job model
completed.push(idx);
debug!(
"fully indexed '{}'. indexed rows: {}",
idx, s.num_indexed_rows
);
} else {
debug!(
"still waiting for index '{}'. unindexed rows: {}",
idx, s.num_unindexed_rows
);
}
}
}
}
remaining.retain(|idx| !completed.contains(idx));
if remaining.is_empty() {
return Ok(());
}
sleep(Duration::from_millis(DEFAULT_SLEEP_MS)).await;
}
// debug log index diagnostics
for &r in &remaining {
let stats = table.index_stats(r.as_ref()).await?;
match stats {
Some(s) => debug!(
"index '{}' not fully indexed after {:?}. stats: {:?}",
r, timeout, s
),
None => debug!("index '{}' not found after {:?}", r, timeout),
}
}
Err(Error::Timeout {
message: format!(
"timed out waiting for indices: {:?} after {:?}",
remaining, timeout
),
})
}

View File

@@ -31,7 +31,7 @@
//! are not yet ready to be released.
//!
//! - `remote` - Enable remote client to connect to LanceDB cloud. This is not yet fully implemented
//! and should not be enabled.
//! and should not be enabled.
//!
//! ### Quick Start
//!

View File

@@ -8,6 +8,7 @@
pub(crate) mod client;
pub(crate) mod db;
mod retry;
pub(crate) mod table;
pub(crate) mod util;

View File

@@ -1,17 +1,17 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use http::HeaderName;
use log::debug;
use reqwest::{
header::{HeaderMap, HeaderValue},
Request, RequestBuilder, Response,
Body, Request, RequestBuilder, Response,
};
use std::{collections::HashMap, future::Future, str::FromStr, time::Duration};
use crate::error::{Error, Result};
use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
@@ -118,41 +118,14 @@ pub struct RetryConfig {
/// You can also set the `LANCE_CLIENT_RETRY_STATUSES` environment variable
/// to set this value. Use a comma-separated list of integer values.
///
/// The default is 429, 500, 502, 503.
/// Note that write operations will never be retried on 5xx errors as this may
/// result in duplicated writes.
///
/// The default is 409, 429, 500, 502, 503, 504.
pub statuses: Option<Vec<u16>>,
// TODO: should we allow customizing methods?
}
#[derive(Debug, Clone)]
struct ResolvedRetryConfig {
retries: u8,
connect_retries: u8,
read_retries: u8,
backoff_factor: f32,
backoff_jitter: f32,
statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![429, 500, 502, 503])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}
// We use the `HttpSend` trait to abstract over the `reqwest::Client` so that
// we can mock responses in tests. Based on the patterns from this blog post:
// https://write.as/balrogboogie/testing-reqwest-based-clients
@@ -160,8 +133,8 @@ impl TryFrom<RetryConfig> for ResolvedRetryConfig {
pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
client: reqwest::Client,
host: String,
retry_config: ResolvedRetryConfig,
sender: S,
pub(crate) retry_config: ResolvedRetryConfig,
pub(crate) sender: S,
}
pub trait HttpSend: Clone + Send + Sync + std::fmt::Debug + 'static {
@@ -375,74 +348,69 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
self.client.post(full_uri)
}
pub async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
pub async fn send(&self, req: RequestBuilder) -> Result<(String, Response)> {
let (client, request) = req.build_split();
let mut request = request.unwrap();
let request_id = self.extract_request_id(&mut request);
self.log_request(&request, &request_id);
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
let header = HeaderValue::from_str(&request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
request_id
};
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
if with_retry {
self.send_with_retry_impl(client, request, request_id).await
} else {
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
let response = self
.sender
.send(&client, request)
.await
.err_to_http(request_id.clone())?;
debug!(
"Received response for request_id={}: {:?}",
request_id, &response
);
Ok((request_id, response))
}
async fn send_with_retry_impl(
/// Send the request using retries configured in the RetryConfig.
/// If retry_5xx is false, 5xx requests will not be retried regardless of the statuses configured
/// in the RetryConfig.
/// Since this requires arrow serialization, this is implemented here instead of in RestfulLanceDbClient
pub async fn send_with_retry(
&self,
client: reqwest::Client,
req: Request,
request_id: String,
req_builder: RequestBuilder,
mut make_body: Option<Box<dyn FnMut() -> Result<Body> + Send + 'static>>,
retry_5xx: bool,
) -> Result<(String, Response)> {
let mut retry_counter = RetryCounter::new(&self.retry_config, request_id);
let retry_config = &self.retry_config;
let non_5xx_statuses = retry_config
.statuses
.iter()
.filter(|s| !s.is_server_error())
.cloned()
.collect::<Vec<_>>();
// clone and build the request to extract the request id
let tmp_req = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let (_, r) = tmp_req.build_split();
let mut r = r.unwrap();
let request_id = self.extract_request_id(&mut r);
let mut retry_counter = RetryCounter::new(retry_config, request_id.clone());
loop {
// This only works if the request body is not a stream. If it is
// a stream, we can't use the retry path. We would need to implement
// an outer retry.
let request = req.try_clone().ok_or_else(|| Error::Runtime {
let mut req_builder = req_builder.try_clone().ok_or_else(|| Error::Runtime {
message: "Attempted to retry a request that cannot be cloned".to_string(),
})?;
let response = self
.sender
.send(&client, request)
.await
.map(|r| (r.status(), r));
// set the streaming body on the request builder after clone
if let Some(body_gen) = make_body.as_mut() {
let body = body_gen()?;
req_builder = req_builder.body(body);
}
let (c, request) = req_builder.build_split();
let mut request = request.unwrap();
self.set_request_id(&mut request, &request_id.clone());
self.log_request(&request, &request_id);
let response = self.sender.send(&c, request).await.map(|r| (r.status(), r));
match response {
Ok((status, response)) if status.is_success() => {
debug!(
@@ -451,7 +419,10 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
return Ok((retry_counter.request_id, response));
}
Ok((status, response)) if self.retry_config.statuses.contains(&status) => {
Ok((status, response))
if (retry_5xx && retry_config.statuses.contains(&status))
|| non_5xx_statuses.contains(&status) =>
{
let source = self
.check_response(&retry_counter.request_id, response)
.await
@@ -480,6 +451,47 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
}
}
fn log_request(&self, request: &Request, request_id: &String) {
if log::log_enabled!(log::Level::Debug) {
let content_type = request
.headers()
.get("content-type")
.map(|v| v.to_str().unwrap());
if content_type == Some("application/json") {
let body = request.body().as_ref().unwrap().as_bytes().unwrap();
let body = String::from_utf8_lossy(body);
debug!(
"Sending request_id={}: {:?} with body {}",
request_id, request, body
);
} else {
debug!("Sending request_id={}: {:?}", request_id, request);
}
}
}
/// Extract the request ID from the request headers.
/// If the request ID header is not set, this will generate a new one and set
/// it on the request headers
pub fn extract_request_id(&self, request: &mut Request) -> String {
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
self.set_request_id(request, &request_id);
request_id
};
request_id
}
/// Set the request ID header
pub fn set_request_id(&self, request: &mut Request, request_id: &str) {
let header = HeaderValue::from_str(request_id).unwrap();
request.headers_mut().insert(REQUEST_ID_HEADER, header);
}
pub async fn check_response(&self, request_id: &str, response: Response) -> Result<Response> {
// Try to get the response text, but if that fails, just return the status code
let status = response.status();
@@ -501,91 +513,6 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
}
}
struct RetryCounter<'a> {
request_failures: u8,
connect_failures: u8,
read_failures: u8,
config: &'a ResolvedRetryConfig,
request_id: String,
}
impl<'a> RetryCounter<'a> {
fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
fn increment_request_failures(&mut self, source: crate::Error) -> Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_connect_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn increment_read_failures(&mut self, source: reqwest::Error) -> Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
pub trait RequestResultExt {
type Output;
fn err_to_http(self, request_id: String) -> Result<Self::Output>;

View File

@@ -255,7 +255,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
if let Some(start_after) = request.start_after {
req = req.query(&[("page_token", start_after)]);
}
let (request_id, rsp) = self.client.send(req, true).await?;
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp
@@ -302,7 +302,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.body(data_buffer)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, rsp) = self.client.send(req, false).await?;
let (request_id, rsp) = self.client.send(req).await?;
if rsp.status() == StatusCode::BAD_REQUEST {
let body = rsp.text().await.err_to_http(request_id.clone())?;
@@ -362,7 +362,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, rsp) = self.client.send(req, true).await?;
let (request_id, rsp) = self.client.send_with_retry(req, None, true).await?;
if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: request.name });
}
@@ -383,7 +383,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
.client
.post(&format!("/v1/table/{}/rename/", current_name));
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req, false).await?;
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
let table = self.table_cache.remove(current_name).await;
if let Some(table) = table {
@@ -394,7 +394,7 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn drop_table(&self, name: &str) -> Result<()> {
let req = self.client.post(&format!("/v1/table/{}/drop/", name));
let (request_id, resp) = self.client.send(req, true).await?;
let (request_id, resp) = self.client.send(req).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(name).await;
Ok(())

View File

@@ -0,0 +1,122 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::remote::RetryConfig;
use crate::Error;
use log::debug;
use std::time::Duration;
pub struct RetryCounter<'a> {
pub request_failures: u8,
pub connect_failures: u8,
pub read_failures: u8,
pub config: &'a ResolvedRetryConfig,
pub request_id: String,
}
impl<'a> RetryCounter<'a> {
pub(crate) fn new(config: &'a ResolvedRetryConfig, request_id: String) -> Self {
Self {
request_failures: 0,
connect_failures: 0,
read_failures: 0,
config,
request_id,
}
}
fn check_out_of_retries(
&self,
source: Box<dyn std::error::Error + Send + Sync>,
status_code: Option<reqwest::StatusCode>,
) -> crate::Result<()> {
if self.request_failures >= self.config.retries
|| self.connect_failures >= self.config.connect_retries
|| self.read_failures >= self.config.read_retries
{
Err(Error::Retry {
request_id: self.request_id.clone(),
request_failures: self.request_failures,
max_request_failures: self.config.retries,
connect_failures: self.connect_failures,
max_connect_failures: self.config.connect_retries,
read_failures: self.read_failures,
max_read_failures: self.config.read_retries,
source,
status_code,
})
} else {
Ok(())
}
}
pub fn increment_request_failures(&mut self, source: crate::Error) -> crate::Result<()> {
self.request_failures += 1;
let status_code = if let crate::Error::Http { status_code, .. } = &source {
*status_code
} else {
None
};
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_connect_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.connect_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn increment_read_failures(&mut self, source: reqwest::Error) -> crate::Result<()> {
self.read_failures += 1;
let status_code = source.status();
self.check_out_of_retries(Box::new(source), status_code)
}
pub fn next_sleep_time(&self) -> Duration {
let backoff = self.config.backoff_factor * (2.0f32.powi(self.request_failures as i32));
let jitter = rand::random::<f32>() * self.config.backoff_jitter;
let sleep_time = Duration::from_secs_f32(backoff + jitter);
debug!(
"Retrying request {:?} ({}/{} connect, {}/{} read, {}/{} read) in {:?}",
self.request_id,
self.connect_failures,
self.config.connect_retries,
self.request_failures,
self.config.retries,
self.read_failures,
self.config.read_retries,
sleep_time
);
sleep_time
}
}
#[derive(Debug, Clone)]
pub struct ResolvedRetryConfig {
pub retries: u8,
pub connect_retries: u8,
pub read_retries: u8,
pub backoff_factor: f32,
pub backoff_jitter: f32,
pub statuses: Vec<reqwest::StatusCode>,
}
impl TryFrom<RetryConfig> for ResolvedRetryConfig {
type Error = Error;
fn try_from(retry_config: RetryConfig) -> crate::Result<Self> {
Ok(Self {
retries: retry_config.retries.unwrap_or(3),
connect_retries: retry_config.connect_retries.unwrap_or(3),
read_retries: retry_config.read_retries.unwrap_or(3),
backoff_factor: retry_config.backoff_factor.unwrap_or(0.25),
backoff_jitter: retry_config.backoff_jitter.unwrap_or(0.25),
statuses: retry_config
.statuses
.unwrap_or_else(|| vec![409, 429, 500, 502, 503, 504])
.into_iter()
.map(|status| reqwest::StatusCode::from_u16(status).unwrap())
.collect(),
})
}
}

View File

@@ -1,17 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use crate::index::Index;
use crate::index::IndexStatistics;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error, Table};
use arrow_array::RecordBatchReader;
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
@@ -25,9 +21,19 @@ use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec};
use reqwest::{RequestBuilder, Response};
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::RwLock;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
use crate::index::waiter::wait_for_index;
use crate::{
connection::NoData,
error::Result,
@@ -39,11 +45,6 @@ use crate::{
},
};
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
#[derive(Debug)]
@@ -83,7 +84,7 @@ impl<S: HttpSend> RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -127,6 +128,61 @@ impl<S: HttpSend> RemoteTable<S> {
Ok(reqwest::Body::wrap_stream(body_stream))
}
/// Buffer the reader into memory
async fn buffer_reader<R: RecordBatchReader + ?Sized>(
reader: &mut R,
) -> Result<(SchemaRef, Vec<RecordBatch>)> {
let schema = reader.schema();
let mut batches = Vec::new();
for batch in reader {
batches.push(batch?);
}
Ok((schema, batches))
}
/// Create a new RecordBatchReader from buffered data
fn make_reader(schema: SchemaRef, batches: Vec<RecordBatch>) -> impl RecordBatchReader {
let iter = batches.into_iter().map(Ok);
RecordBatchIterator::new(iter, schema)
}
async fn send(&self, req: RequestBuilder, with_retry: bool) -> Result<(String, Response)> {
let res = if with_retry {
self.client.send_with_retry(req, None, true).await?
} else {
self.client.send(req).await?
};
Ok(res)
}
/// Send the request with streaming body.
/// This will use retries if with_retry is set and the number of configured retries is > 0.
/// If retries are enabled, the stream will be buffered into memory.
async fn send_streaming(
&self,
req: RequestBuilder,
mut data: Box<dyn RecordBatchReader + Send>,
with_retry: bool,
) -> Result<(String, Response)> {
if !with_retry || self.client.retry_config.retries == 0 {
let body = Self::reader_as_body(data)?;
return self.client.send(req.body(body)).await;
}
// to support retries, buffer into memory and clone the batches on each retry
let (schema, batches) = Self::buffer_reader(&mut *data).await?;
let make_body = Box::new(move || {
let reader = Self::make_reader(schema.clone(), batches.clone());
Self::reader_as_body(Box::new(reader))
});
let res = self
.client
.send_with_retry(req, Some(make_body), false)
.await?;
Ok(res)
}
async fn check_table_response(
&self,
request_id: &str,
@@ -168,7 +224,8 @@ impl<S: HttpSend> RemoteTable<S> {
}
// Server requires k.
let limit = params.limit.unwrap_or(usize::MAX);
// use isize::MAX as usize to avoid overflow: https://github.com/lancedb/lancedb/issues/2211
let limit = params.limit.unwrap_or(isize::MAX as usize);
body["k"] = serde_json::Value::Number(serde_json::Number::from(limit));
if let Some(filter) = &params.filter {
@@ -339,8 +396,6 @@ impl<S: HttpSend> RemoteTable<S> {
let mut request = self.client.post(&format!("/v1/table/{}/query/", self.name));
if let Some(timeout) = options.timeout {
// Client side timeout
request = request.timeout(timeout);
// Also send to server, so it can abort the query if it takes too long.
// (If it doesn't fit into u64, it's not worth sending anyways.)
if let Ok(timeout_ms) = u64::try_from(timeout.as_millis()) {
@@ -355,11 +410,29 @@ impl<S: HttpSend> RemoteTable<S> {
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures).await?;
Ok(streams)
let streams = futures::future::try_join_all(futures);
if let Some(timeout) = options.timeout {
let timeout_future = tokio::time::sleep(timeout);
tokio::pin!(timeout_future);
tokio::pin!(streams);
tokio::select! {
_ = &mut timeout_future => {
Err(Error::Other {
message: format!("Query timeout after {} ms", timeout.as_millis()),
source: None,
})
}
result = &mut streams => {
Ok(result?)
}
}
} else {
Ok(streams.await?)
}
}
async fn prepare_query_bodies(&self, query: &AnyQuery) -> Result<Vec<serde_json::Value>> {
@@ -455,7 +528,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
self.checkout_latest().await?;
Ok(())
@@ -465,7 +538,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.name));
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
@@ -511,7 +584,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request = request.json(&body);
}
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -529,12 +602,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let body = Self::reader_as_body(data)?;
let mut request = self
.client
.post(&format!("/v1/table/{}/insert/", self.name))
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
match add.mode {
AddDataMode::Append => {}
@@ -543,8 +614,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send_streaming(request, data, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
@@ -612,7 +682,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect::<Vec<_>>();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -654,7 +724,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let (request_id, response) = self.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -696,7 +766,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"predicate": update.filter,
}));
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
@@ -710,7 +780,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/delete/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -796,34 +866,45 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let request = request.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
if let Some(wait_timeout) = index.wait_timeout {
let name = format!("{}_idx", column);
self.wait_for_index(&[&name], wait_timeout).await?;
}
Ok(())
}
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
async fn wait_for_index(&self, index_names: &[&str], timeout: Duration) -> Result<()> {
wait_for_index(self, index_names, timeout).await
}
async fn merge_insert(
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let query = MergeInsertRequest::try_from(params)?;
let body = Self::reader_as_body(new_data)?;
let request = self
.client
.post(&format!("/v1/table/{}/merge_insert/", self.name))
.query(&query)
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE)
.body(body);
.header(CONTENT_TYPE, ARROW_STREAM_CONTENT_TYPE);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> {
self.check_mutable().await?;
Err(Error::NotSupported {
@@ -852,7 +933,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/add_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?; // todo:
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -891,7 +972,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/alter_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -903,7 +984,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/drop_columns/", self.name))
.json(&body);
let (request_id, response) = self.client.send(request, false).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
@@ -917,7 +998,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
#[derive(Deserialize)]
@@ -974,7 +1055,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
if response.status() == StatusCode::NOT_FOUND {
return Ok(None);
@@ -998,11 +1079,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
"/v1/table/{}/index/{}/drop/",
self.name, index_name
));
let (request_id, response) = self.client.send(request, true).await?;
let (request_id, response) = self.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
Ok(())
}
async fn prewarm_index(&self, _index_name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "prewarm_index is not yet supported on LanceDB cloud.".into(),
})
}
async fn table_definition(&self) -> Result<TableDefinition> {
Err(Error::NotSupported {
message: "table_definition is not supported on LanceDB cloud.".into(),
@@ -1453,6 +1540,42 @@ mod tests {
assert_eq!(&body, &expected_body);
}
#[tokio::test]
async fn test_merge_insert_retries_on_409() {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let data = Box::new(RecordBatchIterator::new(
[Ok(batch.clone())],
batch.schema(),
));
// Default parameters
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/merge_insert/");
let params = request.url().query_pairs().collect::<HashMap<_, _>>();
assert_eq!(params["on"], "some_col");
assert_eq!(params["when_matched_update_all"], "false");
assert_eq!(params["when_not_matched_insert_all"], "false");
assert_eq!(params["when_not_matched_by_source_delete"], "false");
assert!(!params.contains_key("when_matched_update_all_filt"));
assert!(!params.contains_key("when_not_matched_by_source_delete_filt"));
http::Response::builder().status(409).body("").unwrap()
});
let e = table
.merge_insert(&["some_col"])
.execute(data)
.await
.unwrap_err();
assert!(e.to_string().contains("Hit retry limit"));
}
#[tokio::test]
async fn test_delete() {
let table = Table::new_with_handler("my_table", |request| {
@@ -1494,7 +1617,7 @@ mod tests {
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
let expected_body = serde_json::json!({
"k": usize::MAX,
"k": isize::MAX as usize,
"prefilter": true,
"vector": [], // Empty vector means no vector query.
"version": null,
@@ -1769,6 +1892,7 @@ mod tests {
"boost": 1.0,
"fuzziness": 0,
"max_expansions": 50,
"operator": "Or",
},
}
},
@@ -2409,4 +2533,88 @@ mod tests {
});
table.drop_index("my_index").await.unwrap();
}
#[tokio::test]
async fn test_wait_for_index() {
let table = _make_table_with_indices(0);
table
.wait_for_index(&["vector_idx", "my_idx"], Duration::from_secs(1))
.await
.unwrap();
}
#[tokio::test]
async fn test_wait_for_index_timeout() {
let table = _make_table_with_indices(100);
let e = table
.wait_for_index(&["vector_idx", "my_idx"], Duration::from_secs(1))
.await
.unwrap_err();
assert_eq!(
e.to_string(),
"Timeout error: timed out waiting for indices: [\"vector_idx\", \"my_idx\"] after 1s"
);
}
#[tokio::test]
async fn test_wait_for_index_timeout_never_created() {
let table = _make_table_with_indices(0);
let e = table
.wait_for_index(&["doesnt_exist_idx"], Duration::from_secs(1))
.await
.unwrap_err();
assert_eq!(
e.to_string(),
"Timeout error: timed out waiting for indices: [\"doesnt_exist_idx\"] after 1s"
);
}
fn _make_table_with_indices(unindexed_rows: usize) -> Table {
let table = Table::new_with_handler("my_table", move |request| {
assert_eq!(request.method(), "POST");
let response_body = match request.url().path() {
"/v1/table/my_table/index/list/" => {
serde_json::json!({
"indexes": [
{
"index_name": "vector_idx",
"index_uuid": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"columns": ["vector"],
"index_status": "done",
},
{
"index_name": "my_idx",
"index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6",
"columns": ["my_column"],
"index_status": "done",
},
]
})
}
"/v1/table/my_table/index/vector_idx/stats/" => {
serde_json::json!({
"num_indexed_rows": 100000,
"num_unindexed_rows": unindexed_rows,
"index_type": "IVF_PQ",
"distance_type": "l2"
})
}
"/v1/table/my_table/index/my_idx/stats/" => {
serde_json::json!({
"num_indexed_rows": 100000,
"num_unindexed_rows": unindexed_rows,
"index_type": "LABEL_LIST"
})
}
_path => {
serde_json::json!(None::<String>)
}
};
let body = serde_json::to_string(&response_body).unwrap();
let status = if body == "null" { 404 } else { 200 };
http::Response::builder().status(status).body(body).unwrap()
});
table
}
}

View File

@@ -29,8 +29,8 @@ impl FromStr for NormalizeMethod {
fn from_str(s: &str) -> Result<Self> {
match s.to_lowercase().as_str() {
"score" => Ok(NormalizeMethod::Score),
"rank" => Ok(NormalizeMethod::Rank),
"score" => Ok(Self::Score),
"rank" => Ok(Self::Rank),
_ => Err(Error::InvalidInput {
message: format!("invalid normalize method: {}", s),
}),
@@ -41,8 +41,8 @@ impl FromStr for NormalizeMethod {
impl std::fmt::Display for NormalizeMethod {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NormalizeMethod::Score => write!(f, "score"),
NormalizeMethod::Rank => write!(f, "rank"),
Self::Score => write!(f, "score"),
Self::Rank => write!(f, "rank"),
}
}
}

View File

@@ -3,10 +3,6 @@
//! LanceDB Table APIs
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use arrow::array::{AsArray, FixedSizeListBuilder, Float32Builder};
use arrow::datatypes::{Float32Type, UInt8Type};
use arrow_array::{RecordBatchIterator, RecordBatchReader};
@@ -45,6 +41,10 @@ use lance_table::format::Manifest;
use lance_table::io::commit::ManifestNamingScheme;
use log::info;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::arrow::IntoArrow;
use crate::connection::NoData;
@@ -78,6 +78,7 @@ pub mod datafusion;
pub(crate) mod dataset;
pub mod merge;
use crate::index::waiter::wait_for_index;
pub use chrono::Duration;
pub use lance::dataset::optimize::CompactionOptions;
pub use lance::dataset::scanner::DatasetRecordBatchStream;
@@ -455,6 +456,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn list_indices(&self) -> Result<Vec<IndexConfig>>;
/// Drop an index from the table.
async fn drop_index(&self, name: &str) -> Result<()>;
/// Prewarm an index in the table
async fn prewarm_index(&self, name: &str) -> Result<()>;
/// Get statistics about the index.
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>>;
/// Merge insert new records into the table.
@@ -489,6 +492,13 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn table_definition(&self) -> Result<TableDefinition>;
/// Get the table URI
fn dataset_uri(&self) -> &str;
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
async fn wait_for_index(
&self,
index_names: &[&str],
timeout: std::time::Duration,
) -> Result<()>;
}
/// A Table is a collection of strong typed Rows.
@@ -767,6 +777,28 @@ impl Table {
)
}
/// See [Table::create_index]
/// For remote tables, this allows an optional wait_timeout to poll until asynchronous indexing is complete
pub fn create_index_with_timeout(
&self,
columns: &[impl AsRef<str>],
index: Index,
wait_timeout: Option<std::time::Duration>,
) -> IndexBuilder {
let mut builder = IndexBuilder::new(
self.inner.clone(),
columns
.iter()
.map(|val| val.as_ref().to_string())
.collect::<Vec<_>>(),
index,
);
if let Some(timeout) = wait_timeout {
builder = builder.wait_timeout(timeout);
}
builder
}
/// Create a builder for a merge insert operation
///
/// This operation can add rows, update rows, and remove rows all in a single
@@ -794,8 +826,8 @@ impl Table {
/// # Arguments
///
/// * `on` One or more columns to join on. This is how records from the
/// source table and target table are matched. Typically this is some
/// kind of key or id column.
/// source table and target table are matched. Typically this is some
/// kind of key or id column.
///
/// # Examples
///
@@ -1086,6 +1118,32 @@ impl Table {
self.inner.drop_index(name).await
}
/// Prewarm an index in the table
///
/// This is a hint to fully load the index into memory. It can be used to
/// avoid cold starts
///
/// It is generally wasteful to call this if the index does not fit into the
/// available cache.
///
/// Note: This function is not yet supported on all indices, in which case it
/// may do nothing.
///
/// Use [`Self::list_indices()`] to find the names of the indices.
pub async fn prewarm_index(&self, name: &str) -> Result<()> {
self.inner.prewarm_index(name).await
}
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
pub async fn wait_for_index(
&self,
index_names: &[&str],
timeout: std::time::Duration,
) -> Result<()> {
self.inner.wait_for_index(index_names, timeout).await
}
// Take many execution plans and map them into a single plan that adds
// a query_index column and unions them.
pub(crate) fn multi_vector_plan(
@@ -2006,6 +2064,11 @@ impl BaseTable for NativeTable {
Ok(())
}
async fn prewarm_index(&self, index_name: &str) -> Result<()> {
let dataset = self.dataset.get().await?;
Ok(dataset.prewarm_index(index_name).await?)
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
let dataset = self.dataset.get().await?.clone();
let mut builder = LanceUpdateBuilder::new(Arc::new(dataset));
@@ -2407,6 +2470,16 @@ impl BaseTable for NativeTable {
loss,
}))
}
/// Poll until the columns are fully indexed. Will return Error::Timeout if the columns
/// are not fully indexed within the timeout.
async fn wait_for_index(
&self,
index_names: &[&str],
timeout: std::time::Duration,
) -> Result<()> {
wait_for_index(self, index_names, timeout).await
}
}
#[cfg(test)]
@@ -2629,7 +2702,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -2712,7 +2785,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -2909,7 +2982,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3190,7 +3263,10 @@ mod tests {
.execute()
.await
.unwrap();
table
.wait_for_index(&["embeddings_idx"], Duration::from_millis(10))
.await
.unwrap();
let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap();
@@ -3258,7 +3334,10 @@ mod tests {
.execute()
.await
.unwrap();
table
.wait_for_index(&["i_idx"], Duration::from_millis(10))
.await
.unwrap();
let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 1);
let index = index_configs.into_iter().next().unwrap();
@@ -3455,6 +3534,9 @@ mod tests {
assert_eq!(stats.num_unindexed_rows, 0);
assert_eq!(stats.index_type, crate::index::IndexType::FTS);
assert_eq!(stats.distance_type, None);
// Make sure we can call prewarm without error
table.prewarm_index("text_idx").await.unwrap();
}
#[tokio::test]
@@ -3480,8 +3562,7 @@ mod tests {
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2
.read_consistency_interval(Some(std::time::Duration::from_millis(interval)));
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
@@ -3517,7 +3598,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3538,7 +3619,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3551,7 +3632,7 @@ mod tests {
let native_tbl = table.as_native().unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 0);
let base_config_len = manifest.config.len();
native_tbl
.update_config(vec![("test_key1".to_string(), "test_val1".to_string())])
@@ -3559,7 +3640,7 @@ mod tests {
.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 1);
assert_eq!(manifest.config.len(), 1 + base_config_len);
assert_eq!(
manifest.config.get("test_key1"),
Some(&"test_val1".to_string())
@@ -3570,7 +3651,7 @@ mod tests {
.await
.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 2);
assert_eq!(manifest.config.len(), 2 + base_config_len);
assert_eq!(
manifest.config.get("test_key1"),
Some(&"test_val1".to_string())
@@ -3588,7 +3669,7 @@ mod tests {
.await
.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 2);
assert_eq!(manifest.config.len(), 2 + base_config_len);
assert_eq!(
manifest.config.get("test_key1"),
Some(&"test_val1".to_string())
@@ -3600,7 +3681,7 @@ mod tests {
native_tbl.delete_config_keys(&["test_key1"]).await.unwrap();
let manifest = native_tbl.manifest().await.unwrap();
assert_eq!(manifest.config.len(), 1);
assert_eq!(manifest.config.len(), 1 + base_config_len);
assert_eq!(
manifest.config.get("test_key2"),
Some(&"test_val2_update".to_string())
@@ -3613,7 +3694,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3675,7 +3756,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();

View File

@@ -7,7 +7,6 @@ use std::{
time::{self, Duration, Instant},
};
use futures::FutureExt;
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -23,16 +22,13 @@ pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug)]
#[derive(Debug, Clone)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
/// A background task loading the next version of the dataset. This happens
/// in the background so as not to block the current thread.
refresh_task: Option<tokio::task::JoinHandle<Result<Dataset>>>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
@@ -45,18 +41,9 @@ impl DatasetRef {
Self::Latest {
dataset,
last_consistency_check,
refresh_task,
..
} => {
// Replace the refresh task
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
let mut new_dataset = dataset.clone();
refresh_task.replace(tokio::spawn(async move {
new_dataset.checkout_latest().await?;
Ok(new_dataset)
}));
dataset.checkout_latest().await?;
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
@@ -70,24 +57,26 @@ impl DatasetRef {
matches!(self, Self::Latest { .. })
}
fn strong_consistency(&self) -> bool {
matches!(
self,
Self::Latest { read_consistency_interval: Some(interval), .. }
if interval.as_nanos() == 0
)
async fn need_reload(&self) -> Result<bool> {
Ok(match self {
Self::Latest { dataset, .. } => {
dataset.latest_version_id().await? != dataset.version().version
}
Self::TimeTravel { dataset, version } => dataset.version().version != *version,
})
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset.checkout_latest().await?;
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
Ok(())
}
@@ -125,74 +114,13 @@ impl DatasetRef {
match self {
Self::Latest {
dataset: ref mut ds,
refresh_task,
last_consistency_check,
..
} => {
*ds = dataset;
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
*refresh_task = None;
*last_consistency_check = Some(Instant::now());
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
/// Wait for the background refresh task to complete.
async fn await_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
let dataset = refresh_task.await.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
Ok(())
}
/// Check if background refresh task is done, and if so, update the dataset.
fn check_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
if refresh_task.is_finished() {
let dataset = refresh_task
.now_or_never()
.unwrap()
.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
}
Ok(())
}
fn refresh_is_ready(&self) -> bool {
matches!(
self,
Self::Latest {
refresh_task: Some(refresh_task),
..
}
if refresh_task.is_finished()
)
}
}
impl DatasetConsistencyWrapper {
@@ -202,7 +130,6 @@ impl DatasetConsistencyWrapper {
dataset,
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
})))
}
@@ -261,9 +188,18 @@ impl DatasetConsistencyWrapper {
}
pub async fn reload(&self) -> Result<()> {
if !self.0.read().await.need_reload().await? {
return Ok(());
}
let mut write_guard = self.0.write().await;
write_guard.reload().await?;
write_guard.await_refresh().await
// on lock escalation -- check if someone else has already reloaded
if !write_guard.need_reload().await? {
return Ok(());
}
// actually need reloading
write_guard.reload().await
}
/// Returns the version, if in time travel mode, or None otherwise
@@ -309,26 +245,9 @@ impl DatasetConsistencyWrapper {
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
// We may have previously created a background task to fetch the new
// version of the dataset. If that task is done, we should update the
// dataset.
{
let read_guard = self.0.read().await;
if read_guard.refresh_is_ready() {
drop(read_guard);
self.0.write().await.check_refresh()?;
}
}
if !self.is_up_to_date().await? {
self.reload().await?;
}
// If we are in strong consistency mode, we should await the refresh task.
if self.0.read().await.strong_consistency() {
self.0.write().await.await_refresh().await?;
}
Ok(())
}
}
@@ -384,7 +303,7 @@ mod tests {
#[tokio::test]
async fn test_iops_open_strong_consistency() {
let db = connect("memory://")
.read_consistency_interval(Some(Duration::ZERO))
.read_consistency_interval(Duration::ZERO)
.execute()
.await
.expect("Failed to connect to database");

Some files were not shown because too many files have changed in this diff Show More