Compare commits

..

20 Commits

Author SHA1 Message Date
lancedb automation
1ade8846fd chore: update lance dependency to v4.0.0-beta.6 2026-03-03 18:54:14 +00:00
Xuanwo
52ce2c995c fix(ci): only run npm publish on release tags (#3093)
This PR fixes the npm publish dry-run failure for prerelease versions
without changing the existing workflow trigger behavior. The publish
step now detects prerelease versions from `nodejs/package.json` and
always appends `--tag preview` when needed.

Context:
- On `main` pushes, the workflow still runs `npm publish --dry-run` by
design.
- Recent failures were caused by prerelease versions (for example
`0.27.0-beta.3`) running without `--tag`, which npm rejects.
- The previous `refs/tags/v...-beta...` check did not apply on branch
pushes, so dry-run could fail even though release tags worked.
2026-03-04 01:35:10 +08:00
Sean Mackrory
e71a00998c ci: add regression test for fastSearch in FTS queries in TypeScript (#3090)
We recently added support for this for the Python bindings, and wanted
to confirm this already worked as expected in the TS bindings.
2026-03-03 07:09:09 -08:00
Sean Mackrory
39a2ac0a1c feat: add parity between fast_search keyword argument between vector and FTS searches (#3091)
We don't necessarily need to do this, but one user was confused having
used `fast_search=True` as a keyword argument for vector searches, but
being unable to do so for FTS, even after the most recent changes. I
think this is the only discrepancy in where that is possible.
2026-03-03 05:21:36 -08:00
Wyatt Alt
bc7b344fa4 feat: add support for remote index params (#3087)
Prior to this commit the remote SDK did not support the full set of
index parameters. This extends the SDK to support them.
2026-03-02 11:14:28 -08:00
Will Jones
f91d2f5fec ci(python): pin maturin to work around bug (#3088)
Work around for https://github.com/PyO3/maturin/issues/3059
2026-03-02 09:38:54 -08:00
Wyatt Alt
cf81b6419f feat: add num_deleted_rows to delete result (#3077) 2026-03-02 08:37:14 -08:00
Lance Release
0498ac1f2f Bump version: 0.27.0-beta.2 → 0.27.0-beta.3 2026-02-28 01:31:51 +00:00
Lance Release
aeb1c3ee6a Bump version: 0.30.0-beta.2 → 0.30.0-beta.3 2026-02-28 01:29:53 +00:00
Weston Pace
f9ae46c0e7 feat: upgrade lance to 3.0.0-rc.2 and add bindings for fast_search (#3083) 2026-02-27 17:27:01 -08:00
Will Jones
84bf022fb1 fix(python): pin pylance to make datafusion table provider match version (#3080) 2026-02-27 13:34:05 -08:00
Will Jones
310967eceb ci(rust): fix linux job (#3076) 2026-02-26 19:25:46 -08:00
Jack Ye
154dbeee2a chore: fix clippy for PreprocessingOutput without remote feature (#3070)
Fix clippy:

```
error: fields `overwrite` and `rescannable` are never read
Error:    --> /home/runner/work/xxxx/xxxx/src/lancedb/rust/lancedb/src/table/add_data.rs:158:9
    |
156 | pub struct PreprocessingOutput {
    |            ------------------- fields in this struct
157 |     pub plan: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
158 |     pub overwrite: bool,
    |         ^^^^^^^^^
159 |     pub rescannable: bool,
    |         ^^^^^^^^^^^
    |
    = note: `-D dead-code` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(dead_code)]`
```
2026-02-25 14:59:32 -08:00
Lance Release
c9c08ac8b9 Bump version: 0.27.0-beta.1 → 0.27.0-beta.2 2026-02-25 07:47:54 +00:00
Lance Release
e253f5d9b6 Bump version: 0.30.0-beta.1 → 0.30.0-beta.2 2026-02-25 07:46:06 +00:00
LanceDB Robot
05b4fb0990 chore: update lance dependency to v3.1.0-beta.2 (#3068)
## Summary
- Bump Lance Rust workspace dependencies to `v3.1.0-beta.2` via
`ci/set_lance_version.py`.
- Update Java `lance-core.version` to `3.1.0-beta.2`.

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`

## Release Reference
- refs/tags/v3.1.0-beta.2
2026-02-24 23:02:22 -08:00
Mesut-Doner
613b9c1099 feat(rust): add expression builder API for type-safe query filters (#3032)
## Summary

Adds a Rust expression builder API as a type-safe alternative to SQL
strings for query filters.

## Motivation

Filtering with raw SQL strings can be awkward when using variables and
special types:


Closes  #3038

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
2026-02-24 18:44:03 -08:00
Will Jones
d5948576b9 feat: parallel inserts for local tables (#3062)
When input data is sufficiently large, we automatically split up into
parallel writes using a round-robin exchange operator. We sample the
first batch to determine data width, and target size of 1 million rows
or 2GB, whichever is smaller.
2026-02-24 12:26:51 -08:00
Will Jones
0d3fc7860a ci: fix python DataFusion test (#3060) 2026-02-24 07:59:12 -08:00
Weston Pace
531cec075c fix: don't expect all offsets to fit in one batch in permutation reader (#3065)
This would cause takes against large permutations to fail
2026-02-24 06:32:54 -08:00
50 changed files with 1640 additions and 394 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.27.0-beta.1"
current_version = "0.27.0-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -29,6 +29,7 @@ runs:
if: ${{ inputs.arm-build == 'false' }}
uses: PyO3/maturin-action@v1
with:
maturin-version: "1.12.4"
command: build
working-directory: python
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
@@ -44,6 +45,7 @@ runs:
if: ${{ inputs.arm-build == 'true' }}
uses: PyO3/maturin-action@v1
with:
maturin-version: "1.12.4"
command: build
working-directory: python
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"

View File

@@ -20,6 +20,7 @@ runs:
uses: PyO3/maturin-action@v1
with:
command: build
maturin-version: "1.12.4"
# TODO: pass through interpreter
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"

View File

@@ -25,6 +25,7 @@ runs:
uses: PyO3/maturin-action@v1
with:
command: build
maturin-version: "1.12.4"
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
working-directory: python

View File

@@ -356,7 +356,8 @@ jobs:
if [[ $DRY_RUN == "true" ]]; then
ARGS="$ARGS --dry-run"
fi
if [[ $GITHUB_REF =~ refs/tags/v(.*)-beta.* ]]; then
VERSION=$(node -p "require('./package.json').version")
if [[ $VERSION == *-* ]]; then
ARGS="$ARGS --tag preview"
fi
npm publish $ARGS

View File

@@ -10,6 +10,10 @@ on:
- python/**
- rust/**
- .github/workflows/python.yml
- .github/workflows/build_linux_wheel/**
- .github/workflows/build_mac_wheel/**
- .github/workflows/build_windows_wheel/**
- .github/workflows/run_tests/**
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}

View File

@@ -100,7 +100,9 @@ jobs:
lfs: true
- uses: Swatinem/rust-cache@v2
- name: Install dependencies
run: sudo apt install -y protobuf-compiler libssl-dev
run: |
sudo apt update
sudo apt install -y protobuf-compiler libssl-dev
- uses: rui314/setup-mold@v1
- name: Make Swap
run: |

522
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=3.1.0-beta.1", default-features = false, "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.1.0-beta.1", default-features = false, "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.1.0-beta.1", default-features = false, "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.1.0-beta.1", "tag" = "v3.1.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=4.0.0-beta.6", default-features = false, "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=4.0.0-beta.6", default-features = false, "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=4.0.0-beta.6", default-features = false, "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=4.0.0-beta.6", "tag" = "v4.0.0-beta.6", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }
@@ -40,13 +40,15 @@ arrow-schema = "57.2"
arrow-select = "57.2"
arrow-cast = "57.2"
async-trait = "0"
datafusion = { version = "51.0", default-features = false }
datafusion-catalog = "51.0"
datafusion-common = { version = "51.0", default-features = false }
datafusion-execution = "51.0"
datafusion-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-physical-expr = "51.0"
datafusion = { version = "52.1", default-features = false }
datafusion-catalog = "52.1"
datafusion-common = { version = "52.1", default-features = false }
datafusion-execution = "52.1"
datafusion-expr = "52.1"
datafusion-functions = "52.1"
datafusion-physical-plan = "52.1"
datafusion-physical-expr = "52.1"
datafusion-sql = "52.1"
env_logger = "0.11"
half = { "version" = "2.7.1", default-features = false, features = [
"num-traits",

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.27.0-beta.1</version>
<version>0.27.0-beta.3</version>
</dependency>
```

View File

@@ -8,6 +8,14 @@
## Properties
### numDeletedRows
```ts
numDeletedRows: number;
```
***
### version
```ts

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-beta.1</version>
<version>0.27.0-beta.3</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-beta.1</version>
<version>0.27.0-beta.3</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>3.1.0-beta.1</lance-core.version>
<lance-core.version>4.0.0-beta.6</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.27.0-beta.1"
version = "0.27.0-beta.3"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -1697,6 +1697,65 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
expect(results2[0].text).toBe(data[1].text);
});
test("full text search fast search", async () => {
const db = await connect(tmpDir.name);
const data = [{ text: "hello world", vector: [0.1, 0.2, 0.3], id: 1 }];
const table = await db.createTable("test", data);
await table.createIndex("text", {
config: Index.fts(),
});
// Insert unindexed data after creating the index.
await table.add([{ text: "xyz", vector: [0.4, 0.5, 0.6], id: 2 }]);
const withFlatSearch = await table
.search("xyz", "fts")
.limit(10)
.toArray();
expect(withFlatSearch.length).toBeGreaterThan(0);
const fastSearchResults = await table
.search("xyz", "fts")
.fastSearch()
.limit(10)
.toArray();
expect(fastSearchResults.length).toBe(0);
const nearestToTextFastSearch = await table
.query()
.nearestToText("xyz")
.fastSearch()
.limit(10)
.toArray();
expect(nearestToTextFastSearch.length).toBe(0);
// fastSearch should be chainable with other methods.
const chainedFastSearch = await table
.search("xyz", "fts")
.fastSearch()
.select(["text"])
.limit(5)
.toArray();
expect(chainedFastSearch.length).toBe(0);
await table.optimize();
const indexedFastSearch = await table
.search("xyz", "fts")
.fastSearch()
.limit(10)
.toArray();
expect(indexedFastSearch.length).toBeGreaterThan(0);
const indexedNearestToTextFastSearch = await table
.query()
.nearestToText("xyz")
.fastSearch()
.limit(10)
.toArray();
expect(indexedNearestToTextFastSearch.length).toBeGreaterThan(0);
});
test("prewarm full text search index", async () => {
const db = await connect(tmpDir.name);
const data = [

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -753,12 +753,14 @@ impl From<lancedb::table::AddResult> for AddResult {
#[napi(object)]
pub struct DeleteResult {
pub num_deleted_rows: i64,
pub version: i64,
}
impl From<lancedb::table::DeleteResult> for DeleteResult {
fn from(value: lancedb::table::DeleteResult) -> Self {
Self {
num_deleted_rows: value.num_deleted_rows as i64,
version: value.version as i64,
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.0-beta.1"
current_version = "0.30.0-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.30.0-beta.1"
version = "0.30.0-beta.3"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -45,7 +45,7 @@ repository = "https://github.com/lancedb/lancedb"
[project.optional-dependencies]
pylance = [
"pylance>=3.1.0b1",
"pylance>=1.0.0b14",
]
tests = [
"aiohttp",
@@ -59,9 +59,9 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=3.1.0b1",
"pylance>=1.0.0b14,<3.0.0",
"requests",
"datafusion>=51,<52", # Must match pylance's DataFusion version
"datafusion<52",
]
dev = [
"ruff",

View File

@@ -606,6 +606,7 @@ class LanceQueryBuilder(ABC):
query,
ordering_field_name=ordering_field_name,
fts_columns=fts_columns,
fast_search=fast_search,
)
if isinstance(query, list):
@@ -1456,12 +1457,14 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
query: str | FullTextQuery,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
fast_search: bool = None,
):
super().__init__(table)
self._query = query
self._phrase_query = False
self.ordering_field_name = ordering_field_name
self._reranker = None
self._fast_search = fast_search
if isinstance(fts_columns, str):
fts_columns = [fts_columns]
self._fts_columns = fts_columns
@@ -1483,6 +1486,19 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
self._phrase_query = phrase_query
return self
def fast_search(self) -> LanceFtsQueryBuilder:
"""
Skip a flat search of unindexed data. This will improve
search performance but search results will not include unindexed data.
Returns
-------
LanceFtsQueryBuilder
The LanceFtsQueryBuilder object.
"""
self._fast_search = True
return self
def to_query_object(self) -> Query:
return Query(
columns=self._columns,
@@ -1494,6 +1510,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
query=self._query, columns=self._fts_columns
),
offset=self._offset,
fast_search=self._fast_search,
)
def output_schema(self) -> pa.Schema:

View File

@@ -218,8 +218,6 @@ class RemoteTable(Table):
train: bool = True,
):
"""Create an index on the table.
Currently, the only parameters that matter are
the metric and the vector column name.
Parameters
----------
@@ -250,11 +248,6 @@ class RemoteTable(Table):
>>> table.create_index("l2", "vector") # doctest: +SKIP
"""
if num_sub_vectors is not None:
logging.warning(
"num_sub_vectors is not supported on LanceDB cloud."
"This parameter will be tuned automatically."
)
if accelerator is not None:
logging.warning(
"GPU accelerator is not yet supported on LanceDB cloud."

View File

@@ -1331,7 +1331,7 @@ class Table(ABC):
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.delete("x = 2")
DeleteResult(version=2)
DeleteResult(num_deleted_rows=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
@@ -1345,7 +1345,7 @@ class Table(ABC):
>>> to_remove
'1, 5'
>>> table.delete(f"x IN ({to_remove})")
DeleteResult(version=3)
DeleteResult(num_deleted_rows=1, version=3)
>>> table.to_pandas()
x vector
0 3 [5.0, 6.0]
@@ -4215,7 +4215,7 @@ class AsyncTable:
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.delete("x = 2")
DeleteResult(version=2)
DeleteResult(num_deleted_rows=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
@@ -4229,7 +4229,7 @@ class AsyncTable:
>>> to_remove
'1, 5'
>>> table.delete(f"x IN ({to_remove})")
DeleteResult(version=3)
DeleteResult(num_deleted_rows=1, version=3)
>>> table.to_pandas()
x vector
0 3 [5.0, 6.0]

View File

@@ -27,6 +27,7 @@ from lancedb.query import (
PhraseQuery,
BooleanQuery,
Occur,
LanceFtsQueryBuilder,
)
import numpy as np
import pyarrow as pa
@@ -882,3 +883,109 @@ def test_fts_query_to_json():
'"must_not":[]}}'
)
assert json_str == expected
def test_fts_fast_search(table):
table.create_fts_index("text", use_tantivy=False)
# Insert some unindexed data
table.add(
[
{
"text": "xyz",
"vector": [0 for _ in range(128)],
"id": 101,
"text2": "xyz",
"nested": {"text": "xyz"},
"count": 10,
}
]
)
# Without fast_search, the query object should not have fast_search set
builder = table.search("xyz", query_type="fts").limit(10)
query = builder.to_query_object()
assert query.fast_search is None
# With fast_search, the query object should have fast_search=True
builder = table.search("xyz", query_type="fts").fast_search().limit(10)
query = builder.to_query_object()
assert query.fast_search is True
# fast_search should be chainable with other methods
builder = (
table.search("xyz", query_type="fts").fast_search().select(["text"]).limit(5)
)
query = builder.to_query_object()
assert query.fast_search is True
assert query.limit == 5
assert query.columns == ["text"]
# fast_search should be enabled by keyword argument too
query = LanceFtsQueryBuilder(table, "xyz", fast_search=True).to_query_object()
assert query.fast_search is True
# Verify it executes without error and skips unindexed data
results = table.search("xyz", query_type="fts").fast_search().limit(5).to_list()
assert len(results) == 0
# Update index and verify it returns results
table.optimize()
results = table.search("xyz", query_type="fts").fast_search().limit(5).to_list()
assert len(results) > 0
@pytest.mark.asyncio
async def test_fts_fast_search_async(async_table):
await async_table.create_index("text", config=FTS())
# Insert some unindexed data
await async_table.add(
[
{
"text": "xyz",
"vector": [0 for _ in range(128)],
"id": 101,
"text2": "xyz",
"nested": {"text": "xyz"},
"count": 10,
}
]
)
# Without fast_search, should return results
results = await async_table.query().nearest_to_text("xyz").limit(5).to_list()
assert len(results) > 0
# With fast_search, should return no results data unindexed
fast_results = (
await async_table.query()
.nearest_to_text("xyz")
.fast_search()
.limit(5)
.to_list()
)
assert len(fast_results) == 0
# Update index and verify it returns results
await async_table.optimize()
fast_results = (
await async_table.query()
.nearest_to_text("xyz")
.fast_search()
.limit(5)
.to_list()
)
assert len(fast_results) > 0
# fast_search should be chainable with other methods
results = (
await async_table.query()
.nearest_to_text("xyz")
.fast_search()
.select(["text"])
.limit(5)
.to_list()
)
assert len(results) > 0

View File

@@ -71,7 +71,7 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
"Failed to call fetch_storage_options: {}",
e
))),
location: snafu::location!(),
location: std::panic::Location::caller(),
})?;
// If result is None, return None
@@ -83,7 +83,7 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
let result_dict = result.downcast::<PyDict>().map_err(|_| {
lance_core::Error::InvalidInput {
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
location: snafu::location!(),
location: std::panic::Location::caller(),
}
})?;
@@ -93,13 +93,13 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
let key_str: String = key.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option key must be a string: {}", e).into(),
location: snafu::location!(),
location: std::panic::Location::caller(),
}
})?;
let value_str: String = value.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option value must be a string: {}", e).into(),
location: snafu::location!(),
location: std::panic::Location::caller(),
}
})?;
storage_options.insert(key_str, value_str);
@@ -114,7 +114,7 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
"Task join error: {}",
e
))),
location: snafu::location!(),
location: std::panic::Location::caller(),
})?
}

View File

@@ -112,19 +112,24 @@ impl From<lancedb::table::AddResult> for AddResult {
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct DeleteResult {
pub num_deleted_rows: u64,
pub version: u64,
}
#[pymethods]
impl DeleteResult {
pub fn __repr__(&self) -> String {
format!("DeleteResult(version={})", self.version)
format!(
"DeleteResult(num_deleted_rows={}, version={})",
self.num_deleted_rows, self.version
)
}
}
impl From<lancedb::table::DeleteResult> for DeleteResult {
fn from(result: lancedb::table::DeleteResult) -> Self {
Self {
num_deleted_rows: result.num_deleted_rows,
version: result.version,
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.27.0-beta.1"
version = "0.27.0-beta.3"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -25,7 +25,9 @@ datafusion-catalog.workspace = true
datafusion-common.workspace = true
datafusion-execution.workspace = true
datafusion-expr.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-sql.workspace = true
datafusion-physical-plan.workspace = true
datafusion.workspace = true
object_store = { workspace = true }

View File

@@ -9,13 +9,6 @@
use std::sync::Arc;
use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
use async_trait::async_trait;
use futures::stream::once;
use futures::StreamExt;
use lance_datafusion::utils::StreamingWriteSource;
use crate::arrow::{
SendableRecordBatchStream, SendableRecordBatchStreamExt, SimpleRecordBatchStream,
};
@@ -25,6 +18,12 @@ use crate::embeddings::{
};
use crate::table::{ColumnDefinition, ColumnKind, TableDefinition};
use crate::{Error, Result};
use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
use async_trait::async_trait;
use futures::stream::once;
use futures::StreamExt;
use lance_datafusion::utils::StreamingWriteSource;
pub trait Scannable: Send {
/// Returns the schema of the data.
@@ -349,6 +348,133 @@ pub fn scannable_with_embeddings(
Ok(inner)
}
/// A wrapper that buffers the first RecordBatch from a Scannable so we can
/// inspect it (e.g. to estimate data size) without losing it.
pub(crate) struct PeekedScannable {
inner: Box<dyn Scannable>,
peeked: Option<RecordBatch>,
/// The first item from the stream, if it was an error. Stored so we can
/// re-emit it from `scan_as_stream` instead of silently dropping it.
first_error: Option<crate::Error>,
stream: Option<SendableRecordBatchStream>,
}
impl PeekedScannable {
pub fn new(inner: Box<dyn Scannable>) -> Self {
Self {
inner,
peeked: None,
first_error: None,
stream: None,
}
}
/// Reads and buffers the first batch from the inner scannable.
/// Returns a clone of it. Subsequent calls return the same batch.
///
/// Returns `None` if the stream is empty or the first item is an error.
/// Errors are preserved and re-emitted by `scan_as_stream`.
pub async fn peek(&mut self) -> Option<RecordBatch> {
if self.peeked.is_some() {
return self.peeked.clone();
}
// Already peeked and got an error or empty stream.
if self.stream.is_some() || self.first_error.is_some() {
return None;
}
let mut stream = self.inner.scan_as_stream();
match stream.next().await {
Some(Ok(batch)) => {
self.peeked = Some(batch.clone());
self.stream = Some(stream);
Some(batch)
}
Some(Err(e)) => {
self.first_error = Some(e);
self.stream = Some(stream);
None
}
None => {
self.stream = Some(stream);
None
}
}
}
}
impl Scannable for PeekedScannable {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn num_rows(&self) -> Option<usize> {
self.inner.num_rows()
}
fn rescannable(&self) -> bool {
self.inner.rescannable()
}
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
let schema = self.inner.schema();
// If peek() hit an error, prepend it so downstream sees the error.
let error_item = self.first_error.take().map(Err);
match (self.peeked.take(), self.stream.take()) {
(Some(batch), Some(rest)) => {
let prepend = futures::stream::once(std::future::ready(Ok(batch)));
Box::pin(SimpleRecordBatchStream {
schema,
stream: prepend.chain(rest),
})
}
(Some(batch), None) => Box::pin(SimpleRecordBatchStream {
schema,
stream: futures::stream::once(std::future::ready(Ok(batch))),
}),
(None, Some(rest)) => {
if let Some(err) = error_item {
let stream = futures::stream::once(std::future::ready(err));
Box::pin(SimpleRecordBatchStream { schema, stream })
} else {
rest
}
}
(None, None) => {
// peek() was never called — just delegate
self.inner.scan_as_stream()
}
}
}
}
/// Compute the number of write partitions based on data size estimates.
///
/// `sample_bytes` and `sample_rows` come from a representative batch and are
/// used to estimate per-row size. `total_rows_hint` is the total row count
/// when known; otherwise `sample_rows` row count is used as a lower bound
/// estimate.
///
/// Targets roughly 1 million rows or 2 GB per partition, capped at
/// `max_partitions` (typically the number of available CPU cores).
pub(crate) fn estimate_write_partitions(
sample_bytes: usize,
sample_rows: usize,
total_rows_hint: Option<usize>,
max_partitions: usize,
) -> usize {
if sample_rows == 0 {
return 1;
}
let bytes_per_row = sample_bytes / sample_rows;
let total_rows = total_rows_hint.unwrap_or(sample_rows);
let total_bytes = total_rows * bytes_per_row;
let by_rows = total_rows.div_ceil(1_000_000);
let by_bytes = total_bytes.div_ceil(2 * 1024 * 1024 * 1024);
by_rows.max(by_bytes).max(1).min(max_partitions)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -445,6 +571,231 @@ mod tests {
assert!(result2.unwrap().is_err());
}
mod peeked_scannable_tests {
use crate::test_utils::TestCustomError;
use super::*;
#[tokio::test]
async fn test_peek_returns_first_batch() {
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
let mut peeked = PeekedScannable::new(Box::new(batch.clone()));
let first = peeked.peek().await.unwrap();
assert_eq!(first, batch);
}
#[tokio::test]
async fn test_peek_is_idempotent() {
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
let mut peeked = PeekedScannable::new(Box::new(batch.clone()));
let first = peeked.peek().await.unwrap();
let second = peeked.peek().await.unwrap();
assert_eq!(first, second);
}
#[tokio::test]
async fn test_scan_after_peek_returns_all_data() {
let batches = vec![
record_batch!(("id", Int64, [1, 2])).unwrap(),
record_batch!(("id", Int64, [3, 4, 5])).unwrap(),
];
let mut peeked = PeekedScannable::new(Box::new(batches.clone()));
let first = peeked.peek().await.unwrap();
assert_eq!(first, batches[0]);
let result: Vec<RecordBatch> = peeked.scan_as_stream().try_collect().await.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0], batches[0]);
assert_eq!(result[1], batches[1]);
}
#[tokio::test]
async fn test_scan_without_peek_passes_through() {
let batch = record_batch!(("id", Int64, [1, 2, 3])).unwrap();
let mut peeked = PeekedScannable::new(Box::new(batch.clone()));
let result: Vec<RecordBatch> = peeked.scan_as_stream().try_collect().await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0], batch);
}
#[tokio::test]
async fn test_delegates_num_rows() {
let batches = vec![
record_batch!(("id", Int64, [1, 2])).unwrap(),
record_batch!(("id", Int64, [3])).unwrap(),
];
let peeked = PeekedScannable::new(Box::new(batches));
assert_eq!(peeked.num_rows(), Some(3));
}
#[tokio::test]
async fn test_non_rescannable_stream_data_preserved() {
let batches = vec![
record_batch!(("id", Int64, [1, 2])).unwrap(),
record_batch!(("id", Int64, [3])).unwrap(),
];
let schema = batches[0].schema();
let inner = futures::stream::iter(batches.clone().into_iter().map(Ok));
let stream: SendableRecordBatchStream = Box::pin(SimpleRecordBatchStream {
schema,
stream: inner,
});
let mut peeked = PeekedScannable::new(Box::new(stream));
assert!(!peeked.rescannable());
assert_eq!(peeked.num_rows(), None);
let first = peeked.peek().await.unwrap();
assert_eq!(first, batches[0]);
// All data is still available via scan_as_stream
let result: Vec<RecordBatch> = peeked.scan_as_stream().try_collect().await.unwrap();
assert_eq!(result.len(), 2);
assert_eq!(result[0], batches[0]);
assert_eq!(result[1], batches[1]);
}
#[tokio::test]
async fn test_error_in_first_batch_propagates() {
let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"id",
arrow_schema::DataType::Int64,
false,
)]));
let inner = futures::stream::iter(vec![Err(Error::External {
source: Box::new(TestCustomError),
})]);
let stream: SendableRecordBatchStream = Box::pin(SimpleRecordBatchStream {
schema,
stream: inner,
});
let mut peeked = PeekedScannable::new(Box::new(stream));
// peek returns None for errors
assert!(peeked.peek().await.is_none());
// But the error should come through when scanning
let mut stream = peeked.scan_as_stream();
let first = stream.next().await.unwrap();
assert!(first.is_err());
let err = first.unwrap_err();
assert!(
matches!(&err, Error::External { source } if source.downcast_ref::<TestCustomError>().is_some()),
"Expected TestCustomError to be preserved, got: {err}"
);
}
#[tokio::test]
async fn test_error_in_later_batch_propagates() {
let good_batch = record_batch!(("id", Int64, [1, 2])).unwrap();
let schema = good_batch.schema();
let inner = futures::stream::iter(vec![
Ok(good_batch.clone()),
Err(Error::External {
source: Box::new(TestCustomError),
}),
]);
let stream: SendableRecordBatchStream = Box::pin(SimpleRecordBatchStream {
schema,
stream: inner,
});
let mut peeked = PeekedScannable::new(Box::new(stream));
// peek succeeds with the first batch
let first = peeked.peek().await.unwrap();
assert_eq!(first, good_batch);
// scan_as_stream should yield the first batch, then the error
let mut stream = peeked.scan_as_stream();
let batch1 = stream.next().await.unwrap().unwrap();
assert_eq!(batch1, good_batch);
let batch2 = stream.next().await.unwrap();
assert!(batch2.is_err());
let err = batch2.unwrap_err();
assert!(
matches!(&err, Error::External { source } if source.downcast_ref::<TestCustomError>().is_some()),
"Expected TestCustomError to be preserved, got: {err}"
);
}
#[tokio::test]
async fn test_empty_stream_returns_none() {
let schema = Arc::new(arrow_schema::Schema::new(vec![arrow_schema::Field::new(
"id",
arrow_schema::DataType::Int64,
false,
)]));
let inner = futures::stream::empty();
let stream: SendableRecordBatchStream = Box::pin(SimpleRecordBatchStream {
schema,
stream: inner,
});
let mut peeked = PeekedScannable::new(Box::new(stream));
assert!(peeked.peek().await.is_none());
// Scanning an empty (post-peek) stream should yield nothing
let result: Vec<RecordBatch> = peeked.scan_as_stream().try_collect().await.unwrap();
assert!(result.is_empty());
}
}
mod estimate_write_partitions_tests {
use super::*;
#[test]
fn test_small_data_single_partition() {
// 100 rows * 24 bytes/row = 2400 bytes — well under both thresholds
assert_eq!(estimate_write_partitions(2400, 100, Some(100), 8), 1);
}
#[test]
fn test_scales_by_row_count() {
// 2.5M rows at 24 bytes/row — row threshold dominates
// ceil(2_500_000 / 1_000_000) = 3
assert_eq!(estimate_write_partitions(72, 3, Some(2_500_000), 8), 3);
}
#[test]
fn test_scales_by_byte_size() {
// 100k rows at 40KB/row = ~4GB total → ceil(4GB / 2GB) = 2
let sample_bytes = 40_000 * 10;
assert_eq!(
estimate_write_partitions(sample_bytes, 10, Some(100_000), 8),
2
);
}
#[test]
fn test_capped_at_max_partitions() {
// 10M rows would want 10 partitions, but capped at 4
assert_eq!(estimate_write_partitions(72, 3, Some(10_000_000), 4), 4);
}
#[test]
fn test_zero_sample_rows_returns_one() {
assert_eq!(estimate_write_partitions(0, 0, Some(1_000_000), 8), 1);
}
#[test]
fn test_no_row_hint_uses_sample_size() {
// Without a hint, uses sample_rows (3), which is small
assert_eq!(estimate_write_partitions(72, 3, None, 8), 1);
}
#[test]
fn test_always_at_least_one() {
assert_eq!(estimate_write_partitions(24, 1, Some(1), 8), 1);
}
}
mod embedding_tests {
use super::*;
use crate::embeddings::MemoryRegistry;

View File

@@ -426,6 +426,7 @@ impl PermutationReader {
row_ids_query = row_ids_query.limit(limit as usize);
}
let mut row_ids = row_ids_query.execute().await?;
let mut idx_offset = 0;
while let Some(batch) = row_ids.try_next().await? {
let row_ids = batch
.column(0)
@@ -433,8 +434,9 @@ impl PermutationReader {
.values()
.to_vec();
for (i, row_id) in row_ids.iter().enumerate() {
offset_map.insert(i as u64, *row_id);
offset_map.insert(i as u64 + idx_offset, *row_id);
}
idx_offset += batch.num_rows() as u64;
}
let offset_map = Arc::new(offset_map);
*offset_map_ref = Some(offset_map.clone());
@@ -845,4 +847,106 @@ mod tests {
.to_vec();
assert_eq!(idx_values, vec![row_ids[2] as i32]);
}
#[tokio::test]
async fn test_filtered_permutation_full_iteration() {
use crate::dataloader::permutation::builder::PermutationBuilder;
// Create a base table with 10000 rows where idx goes 0..10000.
// Filter to even values only, giving 5000 rows in the permutation.
let base_table = lance_datagen::gen_batch()
.col("idx", lance_datagen::array::step::<Int32Type>())
.into_mem_table("tbl", RowCount::from(10000), BatchCount::from(1))
.await;
let permutation_table = PermutationBuilder::new(base_table.clone())
.with_filter("idx % 2 = 0".to_string())
.build()
.await
.unwrap();
assert_eq!(permutation_table.count_rows(None).await.unwrap(), 5000);
let reader = PermutationReader::try_from_tables(
base_table.base_table().clone(),
permutation_table.base_table().clone(),
0,
)
.await
.unwrap();
assert_eq!(reader.count_rows(), 5000);
// Iterate through all batches using a batch size that doesn't evenly divide
// the row count (5000 / 128 = 39 full batches + 1 batch of 8 rows).
let batch_size = 128;
let mut stream = reader
.read(
Select::All,
QueryExecutionOptions {
max_batch_length: batch_size,
..Default::default()
},
)
.await
.unwrap();
let mut total_rows = 0u64;
let mut all_idx_values = Vec::new();
while let Some(batch) = stream.try_next().await.unwrap() {
assert!(batch.num_rows() <= batch_size as usize);
total_rows += batch.num_rows() as u64;
let idx_col = batch.column(0).as_primitive::<Int32Type>().values();
all_idx_values.extend(idx_col.iter().copied());
}
assert_eq!(total_rows, 5000);
assert_eq!(all_idx_values.len(), 5000);
// Every value should be even (from the filter)
assert!(all_idx_values.iter().all(|v| v % 2 == 0));
// Should have 5000 unique values
let unique: std::collections::HashSet<i32> = all_idx_values.iter().copied().collect();
assert_eq!(unique.len(), 5000);
// Use take_offsets to fetch rows from the beginning, middle, and end
// of the permutation. The values should match what we saw during iteration.
// Beginning
let batch = reader.take_offsets(&[0, 1, 2], Select::All).await.unwrap();
assert_eq!(batch.num_rows(), 3);
let idx_values = batch
.column(0)
.as_primitive::<Int32Type>()
.values()
.to_vec();
assert_eq!(idx_values, &all_idx_values[0..3]);
// Middle
let batch = reader
.take_offsets(&[2499, 2500, 2501], Select::All)
.await
.unwrap();
assert_eq!(batch.num_rows(), 3);
let idx_values = batch
.column(0)
.as_primitive::<Int32Type>()
.values()
.to_vec();
assert_eq!(idx_values, &all_idx_values[2499..2502]);
// End (last 3 rows)
let batch = reader
.take_offsets(&[4997, 4998, 4999], Select::All)
.await
.unwrap();
assert_eq!(batch.num_rows(), 3);
let idx_values = batch
.column(0)
.as_primitive::<Int32Type>()
.values()
.to_vec();
assert_eq!(idx_values, &all_idx_values[4997..5000]);
}
}

View File

@@ -97,10 +97,7 @@ pub type Result<T> = std::result::Result<T, Error>;
impl From<ArrowError> for Error {
fn from(source: ArrowError) -> Self {
match source {
ArrowError::ExternalError(source) => match source.downcast::<Self>() {
Ok(e) => *e,
Err(source) => Self::External { source },
},
ArrowError::ExternalError(source) => Self::from_box_error(source),
_ => Self::Arrow { source },
}
}
@@ -110,15 +107,7 @@ impl From<DataFusionError> for Error {
fn from(source: DataFusionError) -> Self {
match source {
DataFusionError::ArrowError(source, _) => (*source).into(),
DataFusionError::External(source) => match source.downcast::<Self>() {
Ok(e) => *e,
Err(source) => match source.downcast::<ArrowError>() {
Ok(arrow_error) => Self::Arrow {
source: *arrow_error,
},
Err(source) => Self::External { source },
},
},
DataFusionError::External(source) => Self::from_box_error(source),
other => Self::External {
source: Box::new(other),
},
@@ -130,15 +119,52 @@ impl From<lance::Error> for Error {
fn from(source: lance::Error) -> Self {
// Try to unwrap external errors that were wrapped by lance
match source {
lance::Error::Wrapped { error, .. } => match error.downcast::<Self>() {
Ok(e) => *e,
Err(source) => Self::External { source },
},
lance::Error::Wrapped { error, .. } => Self::from_box_error(error),
lance::Error::External { source } => Self::from_box_error(source),
_ => Self::Lance { source },
}
}
}
impl Error {
fn from_box_error(mut source: Box<dyn std::error::Error + Send + Sync>) -> Self {
source = match source.downcast::<Self>() {
Ok(e) => match *e {
Self::External { source } => return Self::from_box_error(source),
other => return other,
},
Err(source) => source,
};
source = match source.downcast::<lance::Error>() {
Ok(e) => match *e {
lance::Error::Wrapped { error, .. } => return Self::from_box_error(error),
other => return other.into(),
},
Err(source) => source,
};
source = match source.downcast::<ArrowError>() {
Ok(e) => match *e {
ArrowError::ExternalError(source) => return Self::from_box_error(source),
other => return other.into(),
},
Err(source) => source,
};
source = match source.downcast::<DataFusionError>() {
Ok(e) => match *e {
DataFusionError::ArrowError(source, _) => return (*source).into(),
DataFusionError::External(source) => return Self::from_box_error(source),
other => return other.into(),
},
Err(source) => source,
};
Self::External { source }
}
}
impl From<object_store::Error> for Error {
fn from(source: object_store::Error) -> Self {
Self::ObjectStore { source }

131
rust/lancedb/src/expr.rs Normal file
View File

@@ -0,0 +1,131 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Expression builder API for type-safe query construction
//!
//! This module provides a fluent API for building expressions that can be used
//! in filters and projections. It wraps DataFusion's expression system.
//!
//! # Examples
//!
//! ```rust
//! use std::ops::Mul;
//! use lancedb::expr::{col, lit};
//!
//! let expr = col("age").gt(lit(18));
//! let expr = col("age").gt(lit(18)).and(col("status").eq(lit("active")));
//! let expr = col("price") * lit(1.1);
//! ```
mod sql;
pub use sql::expr_to_sql_string;
use std::sync::Arc;
use arrow_schema::DataType;
use datafusion_expr::{expr_fn::cast, Expr, ScalarUDF};
use datafusion_functions::string::expr_fn as string_expr_fn;
pub use datafusion_expr::{col, lit};
pub use datafusion_expr::Expr as DfExpr;
pub fn lower(expr: Expr) -> Expr {
string_expr_fn::lower(expr)
}
pub fn upper(expr: Expr) -> Expr {
string_expr_fn::upper(expr)
}
pub fn contains(expr: Expr, search: Expr) -> Expr {
string_expr_fn::contains(expr, search)
}
pub fn expr_cast(expr: Expr, data_type: DataType) -> Expr {
cast(expr, data_type)
}
lazy_static::lazy_static! {
static ref FUNC_REGISTRY: std::sync::RwLock<std::collections::HashMap<String, Arc<ScalarUDF>>> = {
let mut m = std::collections::HashMap::new();
m.insert("lower".to_string(), datafusion_functions::string::lower());
m.insert("upper".to_string(), datafusion_functions::string::upper());
m.insert("contains".to_string(), datafusion_functions::string::contains());
m.insert("btrim".to_string(), datafusion_functions::string::btrim());
m.insert("ltrim".to_string(), datafusion_functions::string::ltrim());
m.insert("rtrim".to_string(), datafusion_functions::string::rtrim());
m.insert("concat".to_string(), datafusion_functions::string::concat());
m.insert("octet_length".to_string(), datafusion_functions::string::octet_length());
std::sync::RwLock::new(m)
};
}
pub fn func(name: impl AsRef<str>, args: Vec<Expr>) -> crate::Result<Expr> {
let name = name.as_ref();
let registry = FUNC_REGISTRY
.read()
.map_err(|e| crate::Error::InvalidInput {
message: format!("lock poisoned: {}", e),
})?;
let udf = registry
.get(name)
.ok_or_else(|| crate::Error::InvalidInput {
message: format!("unknown function: {}", name),
})?;
Ok(Expr::ScalarFunction(
datafusion_expr::expr::ScalarFunction::new_udf(udf.clone(), args),
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_col_lit_comparisons() {
let expr = col("age").gt(lit(18));
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.contains("age") && sql.contains("18"));
let expr = col("name").eq(lit("Alice"));
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.contains("name") && sql.contains("Alice"));
}
#[test]
fn test_compound_expression() {
let expr = col("age").gt(lit(18)).and(col("status").eq(lit("active")));
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.contains("age") && sql.contains("status"));
}
#[test]
fn test_string_functions() {
let expr = lower(col("name"));
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.to_lowercase().contains("lower"));
let expr = contains(col("text"), lit("search"));
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.to_lowercase().contains("contains"));
}
#[test]
fn test_func() {
let expr = func("lower", vec![col("x")]).unwrap();
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.to_lowercase().contains("lower"));
let result = func("unknown_func", vec![col("x")]);
assert!(result.is_err());
}
#[test]
fn test_arithmetic() {
let expr = col("price") * lit(1.1);
let sql = expr_to_sql_string(&expr).unwrap();
assert!(sql.contains("price"));
}
}

View File

@@ -0,0 +1,12 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use datafusion_expr::Expr;
use datafusion_sql::unparser;
pub fn expr_to_sql_string(expr: &Expr) -> crate::Result<String> {
let ast = unparser::expr_to_sql(expr).map_err(|e| crate::Error::InvalidInput {
message: format!("failed to serialize expression to SQL: {}", e),
})?;
Ok(ast.to_string())
}

View File

@@ -27,7 +27,7 @@
///
/// The btree index does not currently have any parameters though parameters such as the
/// block size may be added in the future.
#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, serde::Serialize)]
pub struct BTreeIndexBuilder {}
impl BTreeIndexBuilder {}
@@ -39,7 +39,7 @@ impl BTreeIndexBuilder {}
/// This index works best for low-cardinality (i.e., less than 1000 unique values) columns,
/// where the number of unique values is small.
/// The bitmap stores a list of row ids where the value is present.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct BitmapIndexBuilder {}
/// Builder for LabelList index.
@@ -48,7 +48,7 @@ pub struct BitmapIndexBuilder {}
/// support queries with `array_contains_all` and `array_contains_any`
/// using an underlying bitmap index.
///
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct LabelListIndexBuilder {}
pub use lance_index::scalar::inverted::query::*;

View File

@@ -7,6 +7,7 @@
//! Vector indices are only supported on fixed-size-list (tensor) columns of floating point
//! values
use lance::table::format::{IndexMetadata, Manifest};
use serde::Serialize;
use crate::DistanceType;
@@ -181,14 +182,17 @@ macro_rules! impl_hnsw_params_setter {
/// The partitioning process is called IVF and the `num_partitions` parameter controls how many groups to create.
///
/// Note that training an IVF Flat index on a large dataset is a slow operation and currently is also a memory intensive operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfFlatIndexBuilder {
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
// IVF
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
}
@@ -213,14 +217,17 @@ impl IvfFlatIndexBuilder {
///
/// This index compresses vectors using scalar quantization and groups them into IVF partitions.
/// It offers a balance between search performance and storage footprint.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfSqIndexBuilder {
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
// IVF
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
}
@@ -261,18 +268,23 @@ impl IvfSqIndexBuilder {
///
/// Note that training an IVF PQ index on a large dataset is a slow operation and
/// currently is also a memory intensive operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfPqIndexBuilder {
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
// IVF
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
// PQ
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_sub_vectors: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_bits: Option<u32>,
}
@@ -323,14 +335,18 @@ pub(crate) fn suggested_num_sub_vectors(dim: u32) -> u32 {
///
/// Note that training an IVF RQ index on a large dataset is a slow operation and
/// currently is also a memory intensive operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfRqIndexBuilder {
// IVF
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_bits: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
}
@@ -365,13 +381,16 @@ impl IvfRqIndexBuilder {
/// quickly find the closest vectors to a query vector.
///
/// The PQ (product quantizer) is used to compress the vectors as the same as IVF PQ.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfHnswPqIndexBuilder {
// IVF
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
// HNSW
@@ -379,7 +398,9 @@ pub struct IvfHnswPqIndexBuilder {
pub(crate) ef_construction: u32,
// PQ
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_sub_vectors: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_bits: Option<u32>,
}
@@ -415,13 +436,16 @@ impl IvfHnswPqIndexBuilder {
///
/// The SQ (scalar quantizer) is used to compress the vectors,
/// each vector is mapped to a 8-bit integer vector, 4x compression ratio for float32 vector.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfHnswSqIndexBuilder {
// IVF
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
// HNSW

View File

@@ -169,6 +169,7 @@ pub mod database;
pub mod dataloader;
pub mod embeddings;
pub mod error;
pub mod expr;
pub mod index;
pub mod io;
pub mod ipc;

View File

@@ -359,6 +359,28 @@ pub trait QueryBase {
/// on the filter column(s).
fn only_if(self, filter: impl AsRef<str>) -> Self;
/// Only return rows which match the filter, using an expression builder.
///
/// Use [`crate::expr`] for building type-safe expressions:
///
/// ```
/// use lancedb::expr::{col, lit};
/// use lancedb::query::{QueryBase, ExecutableQuery};
///
/// # use lancedb::Table;
/// # async fn query(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// let results = table.query()
/// .only_if_expr(col("age").gt(lit(18)).and(col("status").eq(lit("active"))))
/// .execute()
/// .await?;
/// # Ok(())
/// # }
/// ```
///
/// Note: Expression filters are not supported for remote/server-side queries.
/// Use [`QueryBase::only_if`] with SQL strings for remote tables.
fn only_if_expr(self, filter: datafusion_expr::Expr) -> Self;
/// Perform a full text search on the table.
///
/// The results will be returned in order of BM25 scores.
@@ -468,6 +490,11 @@ impl<T: HasQuery> QueryBase for T {
self
}
fn only_if_expr(mut self, filter: datafusion_expr::Expr) -> Self {
self.mut_query().filter = Some(QueryFilter::Datafusion(filter));
self
}
fn full_text_search(mut self, query: FullTextSearchQuery) -> Self {
if self.mut_query().limit.is_none() {
self.mut_query().limit = Some(DEFAULT_TOP_K);

View File

@@ -4,6 +4,7 @@
pub mod insert;
use self::insert::RemoteInsertExec;
use crate::expr::expr_to_sql_string;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
@@ -201,7 +202,6 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
}
pub struct RemoteTable<S: HttpSend = Sender> {
#[allow(dead_code)]
client: RestfulLanceDbClient<S>,
name: String,
namespace: Vec<String>,
@@ -447,13 +447,17 @@ impl<S: HttpSend> RemoteTable<S> {
body["k"] = serde_json::Value::Number(serde_json::Number::from(limit));
if let Some(filter) = &params.filter {
if let QueryFilter::Sql(filter) = filter {
body["filter"] = serde_json::Value::String(filter.clone());
} else {
return Err(Error::NotSupported {
message: "querying a remote table with a non-sql filter".to_string(),
});
}
let filter_sql = match filter {
QueryFilter::Sql(sql) => sql.clone(),
QueryFilter::Datafusion(expr) => expr_to_sql_string(expr)?,
QueryFilter::Substrait(_) => {
return Err(Error::NotSupported {
message: "Substrait filters are not supported for remote queries"
.to_string(),
});
}
};
body["filter"] = serde_json::Value::String(filter_sql);
}
match &params.select {
@@ -941,12 +945,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let version = self.current_version().await;
if let Some(filter) = filter {
let Filter::Sql(filter) = filter else {
return Err(Error::NotSupported {
message: "querying a remote table with a datafusion filter".to_string(),
});
let filter_sql = match filter {
Filter::Sql(sql) => sql.clone(),
Filter::Datafusion(expr) => expr_to_sql_string(&expr)?,
};
request = request.json(&serde_json::json!({ "predicate": filter, "version": version }));
request =
request.json(&serde_json::json!({ "predicate": filter_sql, "version": version }));
} else {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -1223,7 +1227,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = response.text().await.err_to_http(request_id.clone())?;
if body.trim().is_empty() {
// Backward compatible with old servers
return Ok(DeleteResult { version: 0 });
return Ok(DeleteResult {
num_deleted_rows: 0,
version: 0,
});
}
let delete_response: DeleteResult =
serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -1269,73 +1276,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
);
}
match index.index {
// TODO: Should we pass the actual index parameters? SaaS does not
// yet support them.
Index::IvfFlat(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_FLAT".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfPq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_PQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
if let Some(num_bits) = index.num_bits {
body["num_bits"] = serde_json::Value::Number(num_bits.into());
}
}
Index::IvfSq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_SQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfHnswSq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_HNSW_SQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfRq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_RQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
if let Some(num_bits) = index.num_bits {
body["num_bits"] = serde_json::Value::Number(num_bits.into());
}
}
Index::BTree(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BTREE".to_string());
}
Index::Bitmap(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BITMAP".to_string());
}
Index::LabelList(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("LABEL_LIST".to_string());
}
Index::FTS(fts) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("FTS".to_string());
let params = serde_json::to_value(&fts).map_err(|e| Error::InvalidInput {
message: format!("failed to serialize FTS index params {:?}", e),
})?;
for (key, value) in params.as_object().unwrap() {
body[key] = value.clone();
}
}
fn to_json(params: &impl serde::Serialize) -> crate::Result<serde_json::Value> {
serde_json::to_value(params).map_err(|e| Error::InvalidInput {
message: format!("failed to serialize index params {:?}", e),
})
}
// Map each Index variant to its wire type name and serializable params.
// Auto is special-cased since it needs schema inspection.
let (index_type_str, params) = match &index.index {
Index::IvfFlat(p) => ("IVF_FLAT", Some(to_json(p)?)),
Index::IvfPq(p) => ("IVF_PQ", Some(to_json(p)?)),
Index::IvfSq(p) => ("IVF_SQ", Some(to_json(p)?)),
Index::IvfHnswSq(p) => ("IVF_HNSW_SQ", Some(to_json(p)?)),
Index::IvfRq(p) => ("IVF_RQ", Some(to_json(p)?)),
Index::BTree(p) => ("BTREE", Some(to_json(p)?)),
Index::Bitmap(p) => ("BITMAP", Some(to_json(p)?)),
Index::LabelList(p) => ("LABEL_LIST", Some(to_json(p)?)),
Index::FTS(p) => ("FTS", Some(to_json(p)?)),
Index::Auto => {
let schema = self.schema().await?;
let field = schema
@@ -1344,11 +1302,11 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
message: format!("Column {} not found in schema", column),
})?;
if supported_vector_data_type(field.data_type()) {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_PQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(DistanceType::L2.to_string().to_lowercase());
("IVF_PQ", None)
} else if supported_btree_data_type(field.data_type()) {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BTREE".to_string());
("BTREE", None)
} else {
return Err(Error::NotSupported {
message: format!(
@@ -1366,6 +1324,13 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
};
body[INDEX_TYPE_KEY] = index_type_str.into();
if let Some(params) = params {
for (key, value) in params.as_object().expect("params should be a JSON object") {
body[key] = value.clone();
}
}
let request = request.json(&body);
let (request_id, response) = self.send(request, true).await?;
@@ -1826,7 +1791,9 @@ mod tests {
use rstest::rstest;
use serde_json::json;
use crate::index::vector::{IvfFlatIndexBuilder, IvfHnswSqIndexBuilder};
use crate::index::vector::{
IvfFlatIndexBuilder, IvfHnswSqIndexBuilder, IvfRqIndexBuilder, IvfSqIndexBuilder,
};
use crate::remote::db::DEFAULT_SERVER_VERSION;
use crate::remote::JSON_CONTENT_TYPE;
use crate::utils::background_cache::clock;
@@ -2988,6 +2955,8 @@ mod tests {
"IVF_FLAT",
json!({
"metric_type": "hamming",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfFlat(IvfFlatIndexBuilder::default().distance_type(DistanceType::Hamming)),
),
@@ -2996,6 +2965,8 @@ mod tests {
json!({
"metric_type": "hamming",
"num_partitions": 128,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfFlat(
IvfFlatIndexBuilder::default()
@@ -3007,6 +2978,8 @@ mod tests {
"IVF_PQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfPq(Default::default()),
),
@@ -3016,6 +2989,8 @@ mod tests {
"metric_type": "cosine",
"num_partitions": 128,
"num_bits": 4,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfPq(
IvfPqIndexBuilder::default()
@@ -3024,10 +2999,29 @@ mod tests {
.num_bits(4),
),
),
(
"IVF_PQ",
json!({
"metric_type": "l2",
"num_sub_vectors": 16,
"sample_rate": 512,
"max_iterations": 100,
}),
Index::IvfPq(
IvfPqIndexBuilder::default()
.num_sub_vectors(16)
.sample_rate(512)
.max_iterations(100),
),
),
(
"IVF_HNSW_SQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
"m": 20,
"ef_construction": 300,
}),
Index::IvfHnswSq(Default::default()),
),
@@ -3036,11 +3030,65 @@ mod tests {
json!({
"metric_type": "l2",
"num_partitions": 128,
"sample_rate": 256,
"max_iterations": 50,
"m": 40,
"ef_construction": 500,
}),
Index::IvfHnswSq(
IvfHnswSqIndexBuilder::default()
.distance_type(DistanceType::L2)
.num_partitions(128),
.num_partitions(128)
.num_edges(40)
.ef_construction(500),
),
),
(
"IVF_SQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfSq(Default::default()),
),
(
"IVF_SQ",
json!({
"metric_type": "cosine",
"num_partitions": 64,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfSq(
IvfSqIndexBuilder::default()
.distance_type(DistanceType::Cosine)
.num_partitions(64),
),
),
(
"IVF_RQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfRq(Default::default()),
),
(
"IVF_RQ",
json!({
"metric_type": "cosine",
"num_partitions": 64,
"num_bits": 8,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfRq(
IvfRqIndexBuilder::default()
.distance_type(DistanceType::Cosine)
.num_partitions(64)
.num_bits(8),
),
),
// HNSW_PQ isn't yet supported on SaaS
@@ -4635,4 +4683,60 @@ mod tests {
assert_eq!(result.version, 3);
assert_eq!(attempt.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_query_with_datafusion_filter() {
use datafusion_expr::{col, lit};
let expected_data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let expected_data_ref = expected_data.clone();
let table = Table::new_with_handler("my_table", move |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
// The Datafusion expression should be serialized to SQL
let filter = body.get("filter").expect("filter should be present");
let filter_str = filter.as_str().expect("filter should be a string");
// col("x") > lit(10) AND col("status") = lit("active")
assert!(
filter_str.contains("x") && filter_str.contains("10"),
"Filter should contain 'x' and '10', got: {}",
filter_str
);
assert!(
filter_str.contains("status") && filter_str.contains("active"),
"Filter should contain 'status' and 'active', got: {}",
filter_str
);
let response_body = write_ipc_file(&expected_data_ref);
http::Response::builder()
.status(200)
.header(CONTENT_TYPE, ARROW_FILE_CONTENT_TYPE)
.body(response_body)
.unwrap()
});
// Use only_if_expr with a Datafusion expression
let expr = col("x").gt(lit(10)).and(col("status").eq(lit("active")));
let data = table
.query()
.only_if_expr(expr)
.execute()
.await
.unwrap()
.collect::<Vec<_>>()
.await;
assert_eq!(data.len(), 1);
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
}
}

View File

@@ -6,11 +6,12 @@
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_execution::TaskContext;
use datafusion_expr::Expr;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use datafusion_physical_plan::ExecutionPlan;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use futures::TryStreamExt;
use lance::dataset::builder::DatasetBuilder;
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
@@ -21,7 +22,6 @@ use lance::dataset::{InsertBuilder, WriteParams};
use lance::index::vector::utils::infer_vector_dim;
use lance::index::vector::VectorIndexParams;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lance_datafusion::exec::execute_plan;
use lance_datafusion::utils::StreamingWriteSource;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::vector::bq::RQBuildParams;
@@ -43,7 +43,7 @@ use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::data::scannable::Scannable;
use crate::data::scannable::{estimate_write_partitions, PeekedScannable, Scannable};
use crate::database::Database;
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
use crate::error::{Error, Result};
@@ -2113,7 +2113,7 @@ impl BaseTable for NativeTable {
}
}
async fn add(&self, add: AddDataBuilder) -> Result<AddResult> {
async fn add(&self, mut add: AddDataBuilder) -> Result<AddResult> {
let table_def = self.table_definition().await?;
self.dataset.ensure_mutable()?;
@@ -2122,6 +2122,22 @@ impl BaseTable for NativeTable {
let table_schema = Schema::from(&ds.schema().clone());
// Peek at the first batch to estimate a good partition count for
// write parallelism.
let mut peeked = PeekedScannable::new(add.data);
let num_partitions = if let Some(first_batch) = peeked.peek().await {
let max_partitions = lance_core::utils::tokio::get_num_compute_intensive_cpus();
estimate_write_partitions(
first_batch.get_array_memory_size(),
first_batch.num_rows(),
peeked.num_rows(),
max_partitions,
)
} else {
1
};
add.data = Box::new(peeked);
let output = add.into_plan(&table_schema, &table_def)?;
let lance_params = output
@@ -2135,18 +2151,41 @@ impl BaseTable for NativeTable {
..Default::default()
});
let plan = Arc::new(InsertExec::new(
ds_wrapper.clone(),
ds,
output.plan,
lance_params,
));
// Repartition for write parallelism if beneficial.
let plan = if num_partitions > 1 {
Arc::new(
datafusion_physical_plan::repartition::RepartitionExec::try_new(
output.plan,
datafusion_physical_plan::Partitioning::RoundRobinBatch(num_partitions),
)?,
) as Arc<dyn ExecutionPlan>
} else {
output.plan
};
let stream = execute_plan(plan, Default::default())?;
stream
.try_collect::<Vec<_>>()
.await
.map_err(crate::Error::from)?;
let insert_exec = Arc::new(InsertExec::new(ds_wrapper.clone(), ds, plan, lance_params));
// Execute all partitions in parallel.
let task_ctx = Arc::new(TaskContext::default());
let handles = FuturesUnordered::new();
for partition in 0..num_partitions {
let exec = insert_exec.clone();
let ctx = task_ctx.clone();
handles.push(tokio::spawn(async move {
let mut stream = exec
.execute(partition, ctx)
.map_err(|e| -> Error { e.into() })?;
while let Some(batch) = stream.next().await {
batch.map_err(|e| -> Error { e.into() })?;
}
Ok::<_, Error>(())
}));
}
for handle in handles {
handle.await.map_err(|e| Error::Runtime {
message: format!("Insert task panicked: {}", e),
})??;
}
let version = ds_wrapper.get().await?.manifest().version;
Ok(AddResult { version })

View File

@@ -155,7 +155,9 @@ impl AddDataBuilder {
pub struct PreprocessingOutput {
pub plan: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
#[cfg_attr(not(feature = "remote"), allow(dead_code))]
pub overwrite: bool,
#[cfg_attr(not(feature = "remote"), allow(dead_code))]
pub rescannable: bool,
pub write_options: WriteOptions,
pub mode: AddDataMode,
@@ -219,6 +221,7 @@ mod tests {
use crate::table::add_data::NaNVectorBehavior;
use crate::table::{ColumnDefinition, ColumnKind, Table, TableDefinition, WriteOptions};
use crate::test_utils::embeddings::MockEmbed;
use crate::test_utils::TestCustomError;
use crate::Error;
use super::AddDataMode;
@@ -283,17 +286,20 @@ mod tests {
test_add_with_data(stream).await;
}
#[derive(Debug)]
struct MyError;
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MyError occurred")
}
fn assert_preserves_external_error(err: &Error) {
assert!(
matches!(err, Error::External { source } if source.downcast_ref::<TestCustomError>().is_some()),
"Expected Error::External, got: {err:?}"
);
// The original TestCustomError message should be preserved through the
// error chain, even if the error gets wrapped multiple times by
// lance's insert pipeline.
assert!(
err.to_string().contains("TestCustomError occurred"),
"Expected original error message to be preserved, got: {err}"
);
}
impl std::error::Error for MyError {}
#[tokio::test]
async fn test_add_preserves_reader_error() {
let table = create_test_table().await;
@@ -301,7 +307,7 @@ mod tests {
let schema = first_batch.schema();
let iterator = vec![
Ok(first_batch),
Err(ArrowError::ExternalError(Box::new(MyError))),
Err(ArrowError::ExternalError(Box::new(TestCustomError))),
];
let reader: Box<dyn arrow_array::RecordBatchReader + Send> = Box::new(
RecordBatchIterator::new(iterator.into_iter(), schema.clone()),
@@ -309,7 +315,7 @@ mod tests {
let result = table.add(reader).execute().await;
assert!(result.is_err());
assert_preserves_external_error(&result.unwrap_err());
}
#[tokio::test]
@@ -320,7 +326,7 @@ mod tests {
let iterator = vec![
Ok(first_batch),
Err(Error::External {
source: Box::new(MyError),
source: Box::new(TestCustomError),
}),
];
let stream = futures::stream::iter(iterator);
@@ -331,7 +337,7 @@ mod tests {
let result = table.add(stream).execute().await;
assert!(result.is_err());
assert_preserves_external_error(&result.unwrap_err());
}
#[tokio::test]

View File

@@ -7,6 +7,9 @@ use crate::Result;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DeleteResult {
/// The number of rows that were deleted.
#[serde(default)]
pub num_deleted_rows: u64,
// The commit version associated with the operation.
// A version of `0` indicates compatibility with legacy servers that do not return
/// a commit version.
@@ -20,10 +23,14 @@ pub struct DeleteResult {
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
dataset.delete(predicate).await?;
let delete_result = dataset.delete(predicate).await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DeleteResult { version })
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
#[cfg(test)]
@@ -108,6 +115,32 @@ mod tests {
assert_eq!(current_schema, original_schema);
}
#[tokio::test]
async fn test_delete_returns_num_deleted_rows() {
let conn = connect("memory://").execute().await.unwrap();
let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5])).unwrap();
let table = conn
.create_table("test_num_deleted", batch)
.execute()
.await
.unwrap();
// Delete 2 rows (id > 3 means id=4 and id=5)
let result = table.delete("id > 3").await.unwrap();
assert_eq!(result.num_deleted_rows, 2);
assert_eq!(table.count_rows(None).await.unwrap(), 3);
// Delete 0 rows (no rows match)
let result = table.delete("id > 100").await.unwrap();
assert_eq!(result.num_deleted_rows, 0);
assert_eq!(table.count_rows(None).await.unwrap(), 3);
// Delete remaining rows
let result = table.delete("true").await.unwrap();
assert_eq!(result.num_deleted_rows, 3);
assert_eq!(table.count_rows(None).await.unwrap(), 0);
}
#[tokio::test]
async fn test_delete_false_increments_version() {
let conn = connect("memory://").execute().await.unwrap();

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use super::NativeTable;
use crate::error::{Error, Result};
use crate::expr::expr_to_sql_string;
use crate::query::{
QueryExecutionOptions, QueryFilter, QueryRequest, Select, VectorQueryRequest, DEFAULT_TOP_K,
};
@@ -452,14 +453,12 @@ fn convert_to_namespace_query(query: &AnyQuery) -> Result<NsQueryTableRequest> {
fn filter_to_sql(filter: &QueryFilter) -> Result<String> {
match filter {
QueryFilter::Sql(sql) => Ok(sql.clone()),
QueryFilter::Substrait(_) => Err(Error::NotSupported {
message: "Substrait filters are not supported for server-side queries".to_string(),
}),
QueryFilter::Datafusion(_) => Err(Error::NotSupported {
message: "Datafusion expression filters are not supported for server-side queries. Use SQL filter instead.".to_string(),
}),
}
QueryFilter::Sql(sql) => Ok(sql.clone()),
QueryFilter::Substrait(_) => Err(Error::NotSupported {
message: "Substrait filters are not supported for server-side queries".to_string(),
}),
QueryFilter::Datafusion(expr) => expr_to_sql_string(expr),
}
}
/// Extract query vector(s) from Arrow arrays into the namespace format.

View File

@@ -4,3 +4,14 @@
pub mod connection;
pub mod datagen;
pub mod embeddings;
#[derive(Debug)]
pub struct TestCustomError;
impl std::fmt::Display for TestCustomError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TestCustomError occurred")
}
}
impl std::error::Error for TestCustomError {}