Compare commits

...

19 Commits

Author SHA1 Message Date
Jack Ye
01c6b9dcb8 feat: add mem_wal flag to merge insert for MemWAL write path
Add support for enabling MemWAL (Memory Write-Ahead Log) mode on merge insert
operations. This allows streaming writes to route through memory nodes for
high-performance buffered writes.

Changes:
- Add `mem_wal` field to MergeInsertBuilder with validation
- Add `x-lancedb-mem-wal-enabled` header for remote requests
- Add Python `mem_wal()` method to LanceMergeInsertBuilder
- Add validation to ensure only upsert pattern is supported:
  - when_matched_update_all() without filter
  - when_not_matched_insert_all()
- Throw NotSupported error for native tables
- Add mem_wal_enabled to ClientConfig for Python/Node bindings

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-16 01:04:04 -07:00
Will Jones
33a13f0738 fixes for breaking changes 2026-03-04 15:27:32 -08:00
Will Jones
cabc75f167 feat: upgrade Lance to 3.0.0-rc.3 2026-03-04 14:57:17 -08:00
Wyatt Alt
97ca9bb943 feat: allow passing azure client/tenant ID through remote SDK (#3102)
Prior to this commit we supported passing the azure storage account name
to the lancedb remote SDK through headers. This adds support for client
ID and tenant ID as well.
2026-03-04 11:11:36 -08:00
Xuanwo
fa1b04f341 chore: migrate Rust crates to edition 2024 and fix clippy warnings (#3098)
This PR migrates all Rust crates in the workspace to Rust 2024 edition
and addresses the resulting compatibility updates. It also fixes all
clippy warnings surfaced by the workspace checks so the codebase remains
warning-free under the current lint configuration.

Context:
- Scope: workspace edition bump (`2021` -> `2024`) plus follow-up
refactors required by new edition and clippy rules.
- Validation: `cargo fmt --all` and `cargo clippy --quiet --features
remote --tests --examples -- -D warnings` both pass.
2026-03-03 16:23:29 -08:00
mrncstt
367abe99d2 feat(python): support dict to SQL struct conversion in table.update() (#3089)
## Summary

- Add `@value_to_sql.register(dict)` handler that converts Python dicts
to DataFusion's `named_struct()` SQL syntax
- Enables updating struct-typed columns via `table.update(values={"col":
{"field_a": 1, "field_b": "hello"}})`
- Recursively handles nested structs, lists, nulls, and all existing
scalar types

Closes #1363

## Details

The `named_struct` function was introduced in DataFusion 38 and is now
available (LanceDB uses DataFusion 52.1). The implementation follows the
existing `singledispatch` pattern in `util.py`.

**Example conversion:**
```python
value_to_sql({"field_a": 1, "field_b": "hello"})
# => "named_struct('field_a', 1, 'field_b', 'hello')"
```

## Test plan

- [x] Unit tests for flat struct, nested struct, list inside struct,
mixed types, null values, and empty dict
- [ ] CI integration tests with actual table.update() on struct columns

🔗 [DataFusion named_struct
docs](https://datafusion.apache.org/user-guide/sql/scalar_functions.html#named-struct)
2026-03-03 13:36:08 -08:00
Xuanwo
52ce2c995c fix(ci): only run npm publish on release tags (#3093)
This PR fixes the npm publish dry-run failure for prerelease versions
without changing the existing workflow trigger behavior. The publish
step now detects prerelease versions from `nodejs/package.json` and
always appends `--tag preview` when needed.

Context:
- On `main` pushes, the workflow still runs `npm publish --dry-run` by
design.
- Recent failures were caused by prerelease versions (for example
`0.27.0-beta.3`) running without `--tag`, which npm rejects.
- The previous `refs/tags/v...-beta...` check did not apply on branch
pushes, so dry-run could fail even though release tags worked.
2026-03-04 01:35:10 +08:00
Sean Mackrory
e71a00998c ci: add regression test for fastSearch in FTS queries in TypeScript (#3090)
We recently added support for this for the Python bindings, and wanted
to confirm this already worked as expected in the TS bindings.
2026-03-03 07:09:09 -08:00
Sean Mackrory
39a2ac0a1c feat: add parity between fast_search keyword argument between vector and FTS searches (#3091)
We don't necessarily need to do this, but one user was confused having
used `fast_search=True` as a keyword argument for vector searches, but
being unable to do so for FTS, even after the most recent changes. I
think this is the only discrepancy in where that is possible.
2026-03-03 05:21:36 -08:00
Wyatt Alt
bc7b344fa4 feat: add support for remote index params (#3087)
Prior to this commit the remote SDK did not support the full set of
index parameters. This extends the SDK to support them.
2026-03-02 11:14:28 -08:00
Will Jones
f91d2f5fec ci(python): pin maturin to work around bug (#3088)
Work around for https://github.com/PyO3/maturin/issues/3059
2026-03-02 09:38:54 -08:00
Wyatt Alt
cf81b6419f feat: add num_deleted_rows to delete result (#3077) 2026-03-02 08:37:14 -08:00
Lance Release
0498ac1f2f Bump version: 0.27.0-beta.2 → 0.27.0-beta.3 2026-02-28 01:31:51 +00:00
Lance Release
aeb1c3ee6a Bump version: 0.30.0-beta.2 → 0.30.0-beta.3 2026-02-28 01:29:53 +00:00
Weston Pace
f9ae46c0e7 feat: upgrade lance to 3.0.0-rc.2 and add bindings for fast_search (#3083) 2026-02-27 17:27:01 -08:00
Will Jones
84bf022fb1 fix(python): pin pylance to make datafusion table provider match version (#3080) 2026-02-27 13:34:05 -08:00
Will Jones
310967eceb ci(rust): fix linux job (#3076) 2026-02-26 19:25:46 -08:00
Jack Ye
154dbeee2a chore: fix clippy for PreprocessingOutput without remote feature (#3070)
Fix clippy:

```
error: fields `overwrite` and `rescannable` are never read
Error:    --> /home/runner/work/xxxx/xxxx/src/lancedb/rust/lancedb/src/table/add_data.rs:158:9
    |
156 | pub struct PreprocessingOutput {
    |            ------------------- fields in this struct
157 |     pub plan: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
158 |     pub overwrite: bool,
    |         ^^^^^^^^^
159 |     pub rescannable: bool,
    |         ^^^^^^^^^^^
    |
    = note: `-D dead-code` implied by `-D warnings`
    = help: to override `-D warnings` add `#[allow(dead_code)]`
```
2026-02-25 14:59:32 -08:00
Lance Release
c9c08ac8b9 Bump version: 0.27.0-beta.1 → 0.27.0-beta.2 2026-02-25 07:47:54 +00:00
116 changed files with 1626 additions and 802 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.27.0-beta.1"
current_version = "0.27.0-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -29,6 +29,7 @@ runs:
if: ${{ inputs.arm-build == 'false' }}
uses: PyO3/maturin-action@v1
with:
maturin-version: "1.12.4"
command: build
working-directory: python
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
@@ -44,6 +45,7 @@ runs:
if: ${{ inputs.arm-build == 'true' }}
uses: PyO3/maturin-action@v1
with:
maturin-version: "1.12.4"
command: build
working-directory: python
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"

View File

@@ -20,6 +20,7 @@ runs:
uses: PyO3/maturin-action@v1
with:
command: build
maturin-version: "1.12.4"
# TODO: pass through interpreter
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"

View File

@@ -25,6 +25,7 @@ runs:
uses: PyO3/maturin-action@v1
with:
command: build
maturin-version: "1.12.4"
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
working-directory: python

View File

@@ -356,7 +356,8 @@ jobs:
if [[ $DRY_RUN == "true" ]]; then
ARGS="$ARGS --dry-run"
fi
if [[ $GITHUB_REF =~ refs/tags/v(.*)-beta.* ]]; then
VERSION=$(node -p "require('./package.json').version")
if [[ $VERSION == *-* ]]; then
ARGS="$ARGS --tag preview"
fi
npm publish $ARGS

View File

@@ -10,6 +10,10 @@ on:
- python/**
- rust/**
- .github/workflows/python.yml
- .github/workflows/build_linux_wheel/**
- .github/workflows/build_mac_wheel/**
- .github/workflows/build_windows_wheel/**
- .github/workflows/run_tests/**
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}

View File

@@ -100,7 +100,9 @@ jobs:
lfs: true
- uses: Swatinem/rust-cache@v2
- name: Install dependencies
run: sudo apt install -y protobuf-compiler libssl-dev
run: |
sudo apt update
sudo apt install -y protobuf-compiler libssl-dev
- uses: rui314/setup-mold@v1
- name: Make Swap
run: |

521
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@ exclude = ["python"]
resolver = "2"
[workspace.package]
edition = "2021"
edition = "2024"
authors = ["LanceDB Devs <dev@lancedb.com>"]
license = "Apache-2.0"
repository = "https://github.com/lancedb/lancedb"
@@ -15,20 +15,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=3.1.0-beta.2", default-features = false, "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.1.0-beta.2", default-features = false, "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.1.0-beta.2", default-features = false, "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.1.0-beta.2", "tag" = "v3.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=3.0.0-rc.3", default-features = false, "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=3.0.0-rc.3", "tag" = "v3.0.0-rc.3", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }
@@ -40,13 +40,15 @@ arrow-schema = "57.2"
arrow-select = "57.2"
arrow-cast = "57.2"
async-trait = "0"
datafusion = { version = "51.0", default-features = false }
datafusion-catalog = "51.0"
datafusion-common = { version = "51.0", default-features = false }
datafusion-execution = "51.0"
datafusion-expr = "51.0"
datafusion-physical-plan = "51.0"
datafusion-physical-expr = "51.0"
datafusion = { version = "52.1", default-features = false }
datafusion-catalog = "52.1"
datafusion-common = { version = "52.1", default-features = false }
datafusion-execution = "52.1"
datafusion-expr = "52.1"
datafusion-functions = "52.1"
datafusion-physical-plan = "52.1"
datafusion-physical-expr = "52.1"
datafusion-sql = "52.1"
env_logger = "0.11"
half = { "version" = "2.7.1", default-features = false, features = [
"num-traits",

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.27.0-beta.1</version>
<version>0.27.0-beta.3</version>
</dependency>
```

View File

@@ -8,6 +8,14 @@
## Properties
### numDeletedRows
```ts
numDeletedRows: number;
```
***
### version
```ts

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-beta.1</version>
<version>0.27.0-beta.3</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.27.0-beta.1</version>
<version>0.27.0-beta.3</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.27.0-beta.1"
version = "0.27.0-beta.3"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -1697,6 +1697,65 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
expect(results2[0].text).toBe(data[1].text);
});
test("full text search fast search", async () => {
const db = await connect(tmpDir.name);
const data = [{ text: "hello world", vector: [0.1, 0.2, 0.3], id: 1 }];
const table = await db.createTable("test", data);
await table.createIndex("text", {
config: Index.fts(),
});
// Insert unindexed data after creating the index.
await table.add([{ text: "xyz", vector: [0.4, 0.5, 0.6], id: 2 }]);
const withFlatSearch = await table
.search("xyz", "fts")
.limit(10)
.toArray();
expect(withFlatSearch.length).toBeGreaterThan(0);
const fastSearchResults = await table
.search("xyz", "fts")
.fastSearch()
.limit(10)
.toArray();
expect(fastSearchResults.length).toBe(0);
const nearestToTextFastSearch = await table
.query()
.nearestToText("xyz")
.fastSearch()
.limit(10)
.toArray();
expect(nearestToTextFastSearch.length).toBe(0);
// fastSearch should be chainable with other methods.
const chainedFastSearch = await table
.search("xyz", "fts")
.fastSearch()
.select(["text"])
.limit(5)
.toArray();
expect(chainedFastSearch.length).toBe(0);
await table.optimize();
const indexedFastSearch = await table
.search("xyz", "fts")
.fastSearch()
.limit(10)
.toArray();
expect(indexedFastSearch.length).toBeGreaterThan(0);
const indexedNearestToTextFastSearch = await table
.query()
.nearestToText("xyz")
.fastSearch()
.limit(10)
.toArray();
expect(indexedNearestToTextFastSearch.length).toBeGreaterThan(0);
});
test("prewarm full text search index", async () => {
const db = await connect(tmpDir.name);
const data = [

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.27.0-beta.1",
"version": "0.27.0-beta.3",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -8,10 +8,10 @@ use lancedb::database::{CreateTableMode, Database};
use napi::bindgen_prelude::*;
use napi_derive::*;
use crate::ConnectionOptions;
use crate::error::NapiErrorExt;
use crate::header::JsHeaderProvider;
use crate::table::Table;
use crate::ConnectionOptions;
use lancedb::connection::{ConnectBuilder, Connection as LanceDBConnection};
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};

View File

@@ -3,12 +3,12 @@
use std::sync::Mutex;
use lancedb::index::Index as LanceDbIndex;
use lancedb::index::scalar::{BTreeIndexBuilder, FtsIndexBuilder};
use lancedb::index::vector::{
IvfFlatIndexBuilder, IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder, IvfPqIndexBuilder,
IvfRqIndexBuilder,
};
use lancedb::index::Index as LanceDbIndex;
use napi_derive::napi;
use crate::util::parse_distance_type;

View File

@@ -17,8 +17,8 @@ use lancedb::query::VectorQuery as LanceDbVectorQuery;
use napi::bindgen_prelude::*;
use napi_derive::napi;
use crate::error::convert_error;
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
@@ -551,15 +551,12 @@ fn parse_fts_query(query: Object) -> napi::Result<FullTextSearchQuery> {
}
};
let mut query = FullTextSearchQuery::new_query(query);
if let Some(cols) = columns {
if !cols.is_empty() {
query = query.with_columns(&cols).map_err(|e| {
napi::Error::from_reason(format!(
"Failed to set full text search columns: {}",
e
))
})?;
}
if let Some(cols) = columns
&& !cols.is_empty()
{
query = query.with_columns(&cols).map_err(|e| {
napi::Error::from_reason(format!("Failed to set full text search columns: {}", e))
})?;
}
Ok(query)
} else {

View File

@@ -145,6 +145,7 @@ impl From<ClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: config.id_delimiter,
tls_config: config.tls_config.map(Into::into),
header_provider: None, // the header provider is set separately later
mem_wal_enabled: None, // mem_wal is set per-operation in merge_insert
}
}
}

View File

@@ -95,7 +95,7 @@ impl napi::bindgen_prelude::FromNapiValue for Session {
napi_val: napi::sys::napi_value,
) -> napi::Result<Self> {
let object: napi::bindgen_prelude::ClassInstance<Self> =
napi::bindgen_prelude::ClassInstance::from_napi_value(env, napi_val)?;
unsafe { napi::bindgen_prelude::ClassInstance::from_napi_value(env, napi_val)? };
Ok((*object).clone())
}
}

View File

@@ -753,12 +753,14 @@ impl From<lancedb::table::AddResult> for AddResult {
#[napi(object)]
pub struct DeleteResult {
pub num_deleted_rows: i64,
pub version: i64,
}
impl From<lancedb::table::DeleteResult> for DeleteResult {
fn from(value: lancedb::table::DeleteResult) -> Self {
Self {
num_deleted_rows: value.num_deleted_rows as i64,
version: value.version as i64,
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.0-beta.2"
current_version = "0.30.0-beta.3"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.30.0-beta.2"
version = "0.30.0-beta.3"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -59,7 +59,7 @@ tests = [
"polars>=0.19, <=1.3.0",
"tantivy",
"pyarrow-stubs",
"pylance>=1.0.0b14",
"pylance>=1.0.0b14,<3.0.0",
"requests",
"datafusion<52",
]

View File

@@ -34,6 +34,7 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._mem_wal = False
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -96,6 +97,47 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def mem_wal(self, enabled: bool = True) -> LanceMergeInsertBuilder:
"""
Enable MemWAL (Memory Write-Ahead Log) mode for this merge insert operation.
When enabled, the merge insert will route data through a memory node service
that buffers writes before flushing to storage. This is only supported for
remote (LanceDB Cloud) tables.
**Important:** MemWAL only supports the upsert pattern. You must use:
- `when_matched_update_all()` (without a filter condition)
- `when_not_matched_insert_all()`
MemWAL does NOT support:
- `when_matched_update_all(where=...)` with a filter condition
- `when_not_matched_by_source_delete()`
Parameters
----------
enabled: bool
Whether to enable MemWAL mode. Defaults to `True`.
Raises
------
NotImplementedError
If used on a native (local) table, as MemWAL is only supported for
remote tables.
ValueError
If the merge insert pattern is not supported by MemWAL.
Examples
--------
>>> # Correct usage with MemWAL
>>> table.merge_insert(["id"]) \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .mem_wal() \\
... .execute(new_data)
"""
self._mem_wal = enabled
return self
def execute(
self,
new_data: DATA,

View File

@@ -606,6 +606,7 @@ class LanceQueryBuilder(ABC):
query,
ordering_field_name=ordering_field_name,
fts_columns=fts_columns,
fast_search=fast_search,
)
if isinstance(query, list):
@@ -1456,12 +1457,14 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
query: str | FullTextQuery,
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
fast_search: bool = None,
):
super().__init__(table)
self._query = query
self._phrase_query = False
self.ordering_field_name = ordering_field_name
self._reranker = None
self._fast_search = fast_search
if isinstance(fts_columns, str):
fts_columns = [fts_columns]
self._fts_columns = fts_columns
@@ -1483,6 +1486,19 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
self._phrase_query = phrase_query
return self
def fast_search(self) -> LanceFtsQueryBuilder:
"""
Skip a flat search of unindexed data. This will improve
search performance but search results will not include unindexed data.
Returns
-------
LanceFtsQueryBuilder
The LanceFtsQueryBuilder object.
"""
self._fast_search = True
return self
def to_query_object(self) -> Query:
return Query(
columns=self._columns,
@@ -1494,6 +1510,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
query=self._query, columns=self._fts_columns
),
offset=self._offset,
fast_search=self._fast_search,
)
def output_schema(self) -> pa.Schema:

View File

@@ -218,8 +218,6 @@ class RemoteTable(Table):
train: bool = True,
):
"""Create an index on the table.
Currently, the only parameters that matter are
the metric and the vector column name.
Parameters
----------
@@ -250,11 +248,6 @@ class RemoteTable(Table):
>>> table.create_index("l2", "vector") # doctest: +SKIP
"""
if num_sub_vectors is not None:
logging.warning(
"num_sub_vectors is not supported on LanceDB cloud."
"This parameter will be tuned automatically."
)
if accelerator is not None:
logging.warning(
"GPU accelerator is not yet supported on LanceDB cloud."

View File

@@ -1331,7 +1331,7 @@ class Table(ABC):
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.delete("x = 2")
DeleteResult(version=2)
DeleteResult(num_deleted_rows=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
@@ -1345,7 +1345,7 @@ class Table(ABC):
>>> to_remove
'1, 5'
>>> table.delete(f"x IN ({to_remove})")
DeleteResult(version=3)
DeleteResult(num_deleted_rows=1, version=3)
>>> table.to_pandas()
x vector
0 3 [5.0, 6.0]
@@ -4181,6 +4181,7 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
mem_wal=merge._mem_wal,
),
)
@@ -4215,7 +4216,7 @@ class AsyncTable:
1 2 [3.0, 4.0]
2 3 [5.0, 6.0]
>>> table.delete("x = 2")
DeleteResult(version=2)
DeleteResult(num_deleted_rows=1, version=2)
>>> table.to_pandas()
x vector
0 1 [1.0, 2.0]
@@ -4229,7 +4230,7 @@ class AsyncTable:
>>> to_remove
'1, 5'
>>> table.delete(f"x IN ({to_remove})")
DeleteResult(version=3)
DeleteResult(num_deleted_rows=1, version=3)
>>> table.to_pandas()
x vector
0 3 [5.0, 6.0]

View File

@@ -324,6 +324,16 @@ def _(value: list):
return "[" + ", ".join(map(value_to_sql, value)) + "]"
@value_to_sql.register(dict)
def _(value: dict):
# https://datafusion.apache.org/user-guide/sql/scalar_functions.html#named-struct
return (
"named_struct("
+ ", ".join(f"'{k}', {value_to_sql(v)}" for k, v in value.items())
+ ")"
)
@value_to_sql.register(np.ndarray)
def _(value: np.ndarray):
return value_to_sql(value.tolist())

View File

@@ -27,6 +27,7 @@ from lancedb.query import (
PhraseQuery,
BooleanQuery,
Occur,
LanceFtsQueryBuilder,
)
import numpy as np
import pyarrow as pa
@@ -882,3 +883,109 @@ def test_fts_query_to_json():
'"must_not":[]}}'
)
assert json_str == expected
def test_fts_fast_search(table):
table.create_fts_index("text", use_tantivy=False)
# Insert some unindexed data
table.add(
[
{
"text": "xyz",
"vector": [0 for _ in range(128)],
"id": 101,
"text2": "xyz",
"nested": {"text": "xyz"},
"count": 10,
}
]
)
# Without fast_search, the query object should not have fast_search set
builder = table.search("xyz", query_type="fts").limit(10)
query = builder.to_query_object()
assert query.fast_search is None
# With fast_search, the query object should have fast_search=True
builder = table.search("xyz", query_type="fts").fast_search().limit(10)
query = builder.to_query_object()
assert query.fast_search is True
# fast_search should be chainable with other methods
builder = (
table.search("xyz", query_type="fts").fast_search().select(["text"]).limit(5)
)
query = builder.to_query_object()
assert query.fast_search is True
assert query.limit == 5
assert query.columns == ["text"]
# fast_search should be enabled by keyword argument too
query = LanceFtsQueryBuilder(table, "xyz", fast_search=True).to_query_object()
assert query.fast_search is True
# Verify it executes without error and skips unindexed data
results = table.search("xyz", query_type="fts").fast_search().limit(5).to_list()
assert len(results) == 0
# Update index and verify it returns results
table.optimize()
results = table.search("xyz", query_type="fts").fast_search().limit(5).to_list()
assert len(results) > 0
@pytest.mark.asyncio
async def test_fts_fast_search_async(async_table):
await async_table.create_index("text", config=FTS())
# Insert some unindexed data
await async_table.add(
[
{
"text": "xyz",
"vector": [0 for _ in range(128)],
"id": 101,
"text2": "xyz",
"nested": {"text": "xyz"},
"count": 10,
}
]
)
# Without fast_search, should return results
results = await async_table.query().nearest_to_text("xyz").limit(5).to_list()
assert len(results) > 0
# With fast_search, should return no results data unindexed
fast_results = (
await async_table.query()
.nearest_to_text("xyz")
.fast_search()
.limit(5)
.to_list()
)
assert len(fast_results) == 0
# Update index and verify it returns results
await async_table.optimize()
fast_results = (
await async_table.query()
.nearest_to_text("xyz")
.fast_search()
.limit(5)
.to_list()
)
assert len(fast_results) > 0
# fast_search should be chainable with other methods
results = (
await async_table.query()
.nearest_to_text("xyz")
.fast_search()
.select(["text"])
.limit(5)
.to_list()
)
assert len(results) > 0

View File

@@ -121,6 +121,32 @@ def test_value_to_sql_string(tmp_path):
assert table.to_pandas().query("search == @value")["replace"].item() == value
def test_value_to_sql_dict():
# Simple flat struct
assert value_to_sql({"a": 1, "b": "hello"}) == "named_struct('a', 1, 'b', 'hello')"
# Nested struct
assert (
value_to_sql({"outer": {"inner": 1}})
== "named_struct('outer', named_struct('inner', 1))"
)
# List inside struct
assert value_to_sql({"a": [1, 2]}) == "named_struct('a', [1, 2])"
# Mixed types
assert (
value_to_sql({"name": "test", "count": 42, "rate": 3.14, "active": True})
== "named_struct('name', 'test', 'count', 42, 'rate', 3.14, 'active', TRUE)"
)
# Null value inside struct
assert value_to_sql({"a": None}) == "named_struct('a', NULL)"
# Empty dict
assert value_to_sql({}) == "named_struct()"
def test_append_vector_columns():
registry = EmbeddingFunctionRegistry.get_instance()
registry.register("test")(MockTextEmbeddingFunction)

View File

@@ -10,7 +10,7 @@ use arrow::{
use futures::stream::StreamExt;
use lancedb::arrow::SendableRecordBatchStream;
use pyo3::{
exceptions::PyStopAsyncIteration, pyclass, pymethods, Bound, Py, PyAny, PyRef, PyResult, Python,
Bound, Py, PyAny, PyRef, PyResult, Python, exceptions::PyStopAsyncIteration, pyclass, pymethods,
};
use pyo3_async_runtimes::tokio::future_into_py;

View File

@@ -9,10 +9,10 @@ use lancedb::{
database::{CreateTableMode, Database, ReadConsistency},
};
use pyo3::{
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
exceptions::{PyRuntimeError, PyValueError},
pyclass, pyfunction, pymethods,
types::{PyDict, PyDictMethods},
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
@@ -506,6 +506,7 @@ pub struct PyClientConfig {
id_delimiter: Option<String>,
tls_config: Option<PyClientTlsConfig>,
header_provider: Option<Py<PyAny>>,
mem_wal_enabled: Option<bool>,
}
#[derive(FromPyObject)]
@@ -590,6 +591,7 @@ impl From<PyClientConfig> for lancedb::remote::ClientConfig {
id_delimiter: value.id_delimiter,
tls_config: value.tls_config.map(Into::into),
header_provider,
mem_wal_enabled: value.mem_wal_enabled,
}
}
}

View File

@@ -2,10 +2,10 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use pyo3::{
PyErr, PyResult, Python,
exceptions::{PyIOError, PyNotImplementedError, PyOSError, PyRuntimeError, PyValueError},
intern,
types::{PyAnyMethods, PyNone},
PyErr, PyResult, Python,
};
use lancedb::error::Error as LanceError;

View File

@@ -3,17 +3,17 @@
use lancedb::index::vector::{IvfFlatIndexBuilder, IvfRqIndexBuilder, IvfSqIndexBuilder};
use lancedb::index::{
Index as LanceDbIndex,
scalar::{BTreeIndexBuilder, FtsIndexBuilder},
vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder, IvfPqIndexBuilder},
Index as LanceDbIndex,
};
use pyo3::types::PyStringMethods;
use pyo3::IntoPyObject;
use pyo3::types::PyStringMethods;
use pyo3::{
Bound, FromPyObject, PyAny, PyResult, Python,
exceptions::{PyKeyError, PyValueError},
intern, pyclass, pymethods,
types::PyAnyMethods,
Bound, FromPyObject, PyAny, PyResult, Python,
};
use crate::util::parse_distance_type;
@@ -41,7 +41,12 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
let inner_opts = FtsIndexBuilder::default()
.base_tokenizer(params.base_tokenizer)
.language(&params.language)
.map_err(|_| PyValueError::new_err(format!("LanceDB does not support the requested language: '{}'", params.language)))?
.map_err(|_| {
PyValueError::new_err(format!(
"LanceDB does not support the requested language: '{}'",
params.language
))
})?
.with_position(params.with_position)
.lower_case(params.lower_case)
.max_token_length(params.max_token_length)
@@ -52,7 +57,7 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
.ngram_max_length(params.ngram_max_length)
.ngram_prefix_only(params.prefix_only);
Ok(LanceDbIndex::FTS(inner_opts))
},
}
"IvfFlat" => {
let params = source.extract::<IvfFlatParams>()?;
let distance_type = parse_distance_type(params.distance_type)?;
@@ -64,10 +69,11 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
ivf_flat_builder = ivf_flat_builder.num_partitions(num_partitions);
}
if let Some(target_partition_size) = params.target_partition_size {
ivf_flat_builder = ivf_flat_builder.target_partition_size(target_partition_size);
ivf_flat_builder =
ivf_flat_builder.target_partition_size(target_partition_size);
}
Ok(LanceDbIndex::IvfFlat(ivf_flat_builder))
},
}
"IvfPq" => {
let params = source.extract::<IvfPqParams>()?;
let distance_type = parse_distance_type(params.distance_type)?;
@@ -86,7 +92,7 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
ivf_pq_builder = ivf_pq_builder.num_sub_vectors(num_sub_vectors);
}
Ok(LanceDbIndex::IvfPq(ivf_pq_builder))
},
}
"IvfSq" => {
let params = source.extract::<IvfSqParams>()?;
let distance_type = parse_distance_type(params.distance_type)?;
@@ -101,7 +107,7 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
ivf_sq_builder = ivf_sq_builder.target_partition_size(target_partition_size);
}
Ok(LanceDbIndex::IvfSq(ivf_sq_builder))
},
}
"IvfRq" => {
let params = source.extract::<IvfRqParams>()?;
let distance_type = parse_distance_type(params.distance_type)?;
@@ -117,7 +123,7 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
ivf_rq_builder = ivf_rq_builder.target_partition_size(target_partition_size);
}
Ok(LanceDbIndex::IvfRq(ivf_rq_builder))
},
}
"HnswPq" => {
let params = source.extract::<IvfHnswPqParams>()?;
let distance_type = parse_distance_type(params.distance_type)?;
@@ -138,7 +144,7 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
hnsw_pq_builder = hnsw_pq_builder.num_sub_vectors(num_sub_vectors);
}
Ok(LanceDbIndex::IvfHnswPq(hnsw_pq_builder))
},
}
"HnswSq" => {
let params = source.extract::<IvfHnswSqParams>()?;
let distance_type = parse_distance_type(params.distance_type)?;
@@ -155,7 +161,7 @@ pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<Lance
hnsw_sq_builder = hnsw_sq_builder.target_partition_size(target_partition_size);
}
Ok(LanceDbIndex::IvfHnswSq(hnsw_sq_builder))
},
}
not_supported => Err(PyValueError::new_err(format!(
"Invalid index type '{}'. Must be one of BTree, Bitmap, LabelList, FTS, IvfPq, IvfSq, IvfHnswPq, or IvfHnswSq",
not_supported

View File

@@ -2,14 +2,14 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use arrow::RecordBatchStream;
use connection::{connect, Connection};
use connection::{Connection, connect};
use env_logger::Env;
use index::IndexConfig;
use permutation::{PyAsyncPermutationBuilder, PyPermutationReader};
use pyo3::{
pymodule,
Bound, PyResult, Python, pymodule,
types::{PyModule, PyModuleMethods},
wrap_pyfunction, Bound, PyResult, Python,
wrap_pyfunction,
};
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
use session::Session;

View File

@@ -16,10 +16,10 @@ use lancedb::{
query::Select,
};
use pyo3::{
Bound, PyAny, PyRef, PyRefMut, PyResult, Python,
exceptions::PyRuntimeError,
pyclass, pymethods,
types::{PyAnyMethods, PyDict, PyDictMethods, PyType},
Bound, PyAny, PyRef, PyRefMut, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;

View File

@@ -4,9 +4,9 @@
use std::sync::Arc;
use std::time::Duration;
use arrow::array::make_array;
use arrow::array::Array;
use arrow::array::ArrayData;
use arrow::array::make_array;
use arrow::pyarrow::FromPyArrow;
use arrow::pyarrow::IntoPyArrow;
use arrow::pyarrow::ToPyArrow;
@@ -22,23 +22,23 @@ use lancedb::query::{
VectorQuery as LanceDbVectorQuery,
};
use lancedb::table::AnyQuery;
use pyo3::prelude::{PyAnyMethods, PyDictMethods};
use pyo3::pyfunction;
use pyo3::pymethods;
use pyo3::types::PyList;
use pyo3::types::{PyDict, PyString};
use pyo3::Bound;
use pyo3::IntoPyObject;
use pyo3::PyAny;
use pyo3::PyRef;
use pyo3::PyResult;
use pyo3::Python;
use pyo3::{exceptions::PyRuntimeError, FromPyObject};
use pyo3::prelude::{PyAnyMethods, PyDictMethods};
use pyo3::pyfunction;
use pyo3::pymethods;
use pyo3::types::PyList;
use pyo3::types::{PyDict, PyString};
use pyo3::{FromPyObject, exceptions::PyRuntimeError};
use pyo3::{PyErr, pyclass};
use pyo3::{
exceptions::{PyNotImplementedError, PyValueError},
intern,
};
use pyo3::{pyclass, PyErr};
use pyo3_async_runtimes::tokio::future_into_py;
use crate::util::parse_distance_type;

View File

@@ -4,7 +4,7 @@
use std::sync::Arc;
use lancedb::{ObjectStoreRegistry, Session as LanceSession};
use pyo3::{pyclass, pymethods, PyResult};
use pyo3::{PyResult, pyclass, pymethods};
/// A session for managing caches and object stores across LanceDB operations.
///

View File

@@ -66,13 +66,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
.inner
.bind(py)
.call_method0("fetch_storage_options")
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
))),
location: snafu::location!(),
})?;
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Failed to call fetch_storage_options: {}",
e
)))))?;
// If result is None, return None
if result.is_none() {
@@ -81,26 +78,19 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
// Extract the result dict - should be a flat Map<String, String>
let result_dict = result.downcast::<PyDict>().map_err(|_| {
lance_core::Error::InvalidInput {
source: "fetch_storage_options() must return None or a dict of string key-value pairs".into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(
"fetch_storage_options() must return a dict of string key-value pairs or None",
)
})?;
// Convert all entries to HashMap<String, String>
let mut storage_options = HashMap::new();
for (key, value) in result_dict.iter() {
let key_str: String = key.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option key must be a string: {}", e).into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(format!("Storage option key must be a string: {}", e))
})?;
let value_str: String = value.extract().map_err(|e| {
lance_core::Error::InvalidInput {
source: format!("Storage option value must be a string: {}", e).into(),
location: snafu::location!(),
}
lance_core::Error::invalid_input(format!("Storage option value must be a string: {}", e))
})?;
storage_options.insert(key_str, value_str);
}
@@ -109,13 +99,10 @@ impl StorageOptionsProvider for PyStorageOptionsProviderWrapper {
})
})
.await
.map_err(|e| lance_core::Error::IO {
source: Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
))),
location: snafu::location!(),
})?
.map_err(|e| lance_core::Error::io_source(Box::new(std::io::Error::other(format!(
"Task join error: {}",
e
)))))?
}
fn provider_id(&self) -> String {

View File

@@ -5,7 +5,7 @@ use std::{collections::HashMap, sync::Arc};
use crate::{
connection::Connection,
error::PythonErrorExt,
index::{extract_index_params, IndexConfig},
index::{IndexConfig, extract_index_params},
query::{Query, TakeQuery},
table::scannable::PyScannable,
};
@@ -19,10 +19,10 @@ use lancedb::table::{
Table as LanceDbTable,
};
use pyo3::{
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
@@ -112,19 +112,24 @@ impl From<lancedb::table::AddResult> for AddResult {
#[pyclass(get_all)]
#[derive(Clone, Debug)]
pub struct DeleteResult {
pub num_deleted_rows: u64,
pub version: u64,
}
#[pymethods]
impl DeleteResult {
pub fn __repr__(&self) -> String {
format!("DeleteResult(version={})", self.version)
format!(
"DeleteResult(num_deleted_rows={}, version={})",
self.num_deleted_rows, self.version
)
}
}
impl From<lancedb::table::DeleteResult> for DeleteResult {
fn from(result: lancedb::table::DeleteResult) -> Self {
Self {
num_deleted_rows: result.num_deleted_rows,
version: result.version,
}
}
@@ -537,7 +542,7 @@ impl Table {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let versions = inner.list_versions().await.infer_error()?;
let versions_as_dict = Python::attach(|py| {
Python::attach(|py| {
versions
.iter()
.map(|v| {
@@ -554,9 +559,7 @@ impl Table {
Ok(dict.unbind())
})
.collect::<PyResult<Vec<_>>>()
});
versions_as_dict
})
})
}
@@ -707,6 +710,9 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(mem_wal) = parameters.mem_wal {
builder.mem_wal(mem_wal);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -867,6 +873,7 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
mem_wal: Option<bool>,
}
#[pyclass]

View File

@@ -10,11 +10,11 @@ use arrow::{
};
use futures::StreamExt;
use lancedb::{
Error,
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
data::scannable::Scannable,
Error,
};
use pyo3::{types::PyAnyMethods, FromPyObject, Py, PyAny, Python};
use pyo3::{FromPyObject, Py, PyAny, Python, types::PyAnyMethods};
/// Adapter that implements Scannable for a Python reader factory callable.
///
@@ -99,15 +99,15 @@ impl Scannable for PyScannable {
// Channel closed. Check if the task panicked — a panic
// drops the sender without sending an error, so without
// this check we'd silently return a truncated stream.
if let Some(handle) = join_handle {
if let Err(join_err) = handle.await {
return Some((
Err(Error::Runtime {
message: format!("Reader task panicked: {}", join_err),
}),
(rx, None),
));
}
if let Some(handle) = join_handle
&& let Err(join_err) = handle.await
{
return Some((
Err(Error::Runtime {
message: format!("Reader task panicked: {}", join_err),
}),
(rx, None),
));
}
None
}

View File

@@ -5,8 +5,9 @@ use std::sync::Mutex;
use lancedb::DistanceType;
use pyo3::{
PyResult,
exceptions::{PyRuntimeError, PyValueError},
pyfunction, PyResult,
pyfunction,
};
/// A wrapper around a rust builder

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.27.0-beta.1"
version = "0.27.0-beta.3"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -25,9 +25,9 @@ datafusion-catalog.workspace = true
datafusion-common.workspace = true
datafusion-execution.workspace = true
datafusion-expr.workspace = true
datafusion-functions = "51.0"
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datafusion-sql = "51.0"
datafusion-sql.workspace = true
datafusion-physical-plan.workspace = true
datafusion.workspace = true
object_store = { workspace = true }

View File

@@ -9,10 +9,9 @@ use aws_config::Region;
use aws_sdk_bedrockruntime::Client;
use futures::StreamExt;
use lancedb::{
connect,
embeddings::{bedrock::BedrockEmbeddingFunction, EmbeddingDefinition, EmbeddingFunction},
Result, connect,
embeddings::{EmbeddingDefinition, EmbeddingFunction, bedrock::BedrockEmbeddingFunction},
query::{ExecutableQuery, QueryBase},
Result,
};
#[tokio::main]

View File

@@ -10,10 +10,10 @@ use futures::TryStreamExt;
use lance_index::scalar::FullTextSearchQuery;
use lancedb::connection::Connection;
use lancedb::index::scalar::FtsIndexBuilder;
use lancedb::index::Index;
use lancedb::index::scalar::FtsIndexBuilder;
use lancedb::query::{ExecutableQuery, QueryBase};
use lancedb::{connect, Result, Table};
use lancedb::{Result, Table, connect};
use rand::random;
#[tokio::main]
@@ -46,19 +46,21 @@ fn create_some_records() -> Result<Box<dyn arrow_array::RecordBatchReader + Send
.collect::<Vec<_>>();
let n_terms = 3;
let batches = RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
Arc::new(StringArray::from_iter_values((0..TOTAL).map(|_| {
(0..n_terms)
.map(|_| words[random::<u32>() as usize % words.len()])
.collect::<Vec<_>>()
.join(" ")
}))),
],
)
.unwrap()]
vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
Arc::new(StringArray::from_iter_values((0..TOTAL).map(|_| {
(0..n_terms)
.map(|_| words[random::<u32>() as usize % words.len()])
.collect::<Vec<_>>()
.join(" ")
}))),
],
)
.unwrap(),
]
.into_iter()
.map(Ok),
schema.clone(),

View File

@@ -5,16 +5,15 @@ use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance_index::scalar::FullTextSearchQuery;
use lancedb::index::scalar::FtsIndexBuilder;
use lancedb::index::Index;
use lancedb::index::scalar::FtsIndexBuilder;
use lancedb::{
connect,
Result, Table, connect,
embeddings::{
sentence_transformers::SentenceTransformersEmbeddings, EmbeddingDefinition,
EmbeddingFunction,
EmbeddingDefinition, EmbeddingFunction,
sentence_transformers::SentenceTransformersEmbeddings,
},
query::{QueryBase, QueryExecutionOptions},
Result, Table,
};
use std::{iter::once, sync::Arc};

View File

@@ -14,10 +14,10 @@ use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lancedb::connection::Connection;
use lancedb::index::vector::IvfPqIndexBuilder;
use lancedb::index::Index;
use lancedb::index::vector::IvfPqIndexBuilder;
use lancedb::query::{ExecutableQuery, QueryBase};
use lancedb::{connect, DistanceType, Result, Table};
use lancedb::{DistanceType, Result, Table, connect};
#[tokio::main]
async fn main() -> Result<()> {
@@ -51,19 +51,21 @@ fn create_some_records() -> Result<Box<dyn arrow_array::RecordBatchReader + Send
// Create a RecordBatch stream.
let batches = RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
Arc::new(
FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
(0..TOTAL).map(|_| Some(vec![Some(1.0); DIM])),
DIM as i32,
vec![
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..TOTAL as i32)),
Arc::new(
FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
(0..TOTAL).map(|_| Some(vec![Some(1.0); DIM])),
DIM as i32,
),
),
),
],
)
.unwrap()]
],
)
.unwrap(),
]
.into_iter()
.map(Ok),
schema.clone(),

View File

@@ -8,10 +8,9 @@ use std::{iter::once, sync::Arc};
use arrow_array::{RecordBatch, StringArray};
use futures::StreamExt;
use lancedb::{
connect,
embeddings::{openai::OpenAIEmbeddingFunction, EmbeddingDefinition, EmbeddingFunction},
Result, connect,
embeddings::{EmbeddingDefinition, EmbeddingFunction, openai::OpenAIEmbeddingFunction},
query::{ExecutableQuery, QueryBase},
Result,
};
// --8<-- [end:imports]

View File

@@ -7,13 +7,12 @@ use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use futures::StreamExt;
use lancedb::{
connect,
Result, connect,
embeddings::{
sentence_transformers::SentenceTransformersEmbeddings, EmbeddingDefinition,
EmbeddingFunction,
EmbeddingDefinition, EmbeddingFunction,
sentence_transformers::SentenceTransformersEmbeddings,
},
query::{ExecutableQuery, QueryBase},
Result,
};
#[tokio::main]

View File

@@ -14,7 +14,7 @@ use futures::TryStreamExt;
use lancedb::connection::Connection;
use lancedb::index::Index;
use lancedb::query::{ExecutableQuery, QueryBase};
use lancedb::{connect, Result, Table as LanceDbTable};
use lancedb::{Result, Table as LanceDbTable, connect};
#[tokio::main]
async fn main() -> Result<()> {

View File

@@ -12,7 +12,7 @@ use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount};
#[cfg(feature = "polars")]
use {crate::polars_arrow_convertors, polars::frame::ArrowChunk, polars::prelude::DataFrame};
use crate::{error::Result, Error};
use crate::{Error, error::Result};
/// An iterator of batches that also has a schema
pub trait RecordBatchReader: Iterator<Item = Result<arrow_array::RecordBatch>> {

View File

@@ -17,6 +17,7 @@ use lance_namespace::models::{
#[cfg(feature = "aws")]
use object_store::aws::AwsCredential;
use crate::Table;
use crate::connection::create_table::CreateTableBuilder;
use crate::data::scannable::Scannable;
use crate::database::listing::ListingDatabase;
@@ -31,7 +32,6 @@ use crate::remote::{
client::ClientConfig,
db::{OPT_REMOTE_API_KEY, OPT_REMOTE_HOST_OVERRIDE, OPT_REMOTE_REGION},
};
use crate::Table;
use lance::io::ObjectStoreParams;
pub use lance_encoding::version::LanceFileVersion;
#[cfg(feature = "remote")]
@@ -566,8 +566,11 @@ pub struct ConnectBuilder {
}
#[cfg(feature = "remote")]
const ENV_VARS_TO_STORAGE_OPTS: [(&str, &str); 1] =
[("AZURE_STORAGE_ACCOUNT_NAME", "azure_storage_account_name")];
const ENV_VARS_TO_STORAGE_OPTS: [(&str, &str); 3] = [
("AZURE_STORAGE_ACCOUNT_NAME", "azure_storage_account_name"),
("AZURE_CLIENT_ID", "azure_client_id"),
("AZURE_TENANT_ID", "azure_tenant_id"),
];
impl ConnectBuilder {
/// Create a new [`ConnectOptions`] with the given database URI.
@@ -758,10 +761,10 @@ impl ConnectBuilder {
options: &mut HashMap<String, String>,
) {
for (env_key, opt_key) in env_var_to_remote_storage_option {
if let Ok(env_value) = std::env::var(env_key) {
if !options.contains_key(*opt_key) {
options.insert((*opt_key).to_string(), env_value);
}
if let Ok(env_value) = std::env::var(env_key)
&& !options.contains_key(*opt_key)
{
options.insert((*opt_key).to_string(), env_value);
}
}
}
@@ -781,13 +784,19 @@ impl ConnectBuilder {
message: "An api_key is required when connecting to LanceDb Cloud".to_string(),
})?;
// Propagate mem_wal_enabled from options to client_config
let mut client_config = self.request.client_config;
if options.mem_wal_enabled.is_some() {
client_config.mem_wal_enabled = options.mem_wal_enabled;
}
let storage_options = StorageOptions(options.storage_options.clone());
let internal = Arc::new(crate::remote::db::RemoteDatabase::try_new(
&self.request.uri,
&api_key,
&region,
options.host_override,
self.request.client_config,
client_config,
storage_options.into(),
)?);
Ok(Connection {
@@ -1011,14 +1020,13 @@ mod tests {
#[cfg(feature = "remote")]
#[test]
fn test_apply_env_defaults() {
let env_key = "TEST_APPLY_ENV_DEFAULTS_ENVIRONMENT_VARIABLE_ENV_KEY";
let env_val = "TEST_APPLY_ENV_DEFAULTS_ENVIRONMENT_VARIABLE_ENV_VAL";
let env_key = "PATH";
let env_val = std::env::var(env_key).expect("PATH should be set in test environment");
let opts_key = "test_apply_env_defaults_environment_variable_opts_key";
std::env::set_var(env_key, env_val);
let mut options = HashMap::new();
ConnectBuilder::apply_env_defaults(&[(env_key, opts_key)], &mut options);
assert_eq!(Some(&env_val.to_string()), options.get(opts_key));
assert_eq!(Some(&env_val), options.get(opts_key));
options.insert(opts_key.to_string(), "EXPLICIT-VALUE".to_string());
ConnectBuilder::apply_env_defaults(&[(env_key, opts_key)], &mut options);

View File

@@ -6,12 +6,12 @@ use std::sync::Arc;
use lance_io::object_store::StorageOptionsProvider;
use crate::{
Error, Result, Table,
connection::{merge_storage_options, set_storage_options_provider},
data::scannable::{Scannable, WithEmbeddingsScannable},
database::{CreateTableMode, CreateTableRequest, Database},
embeddings::{EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry},
table::WriteOptions,
Error, Result, Table,
};
pub struct CreateTableBuilder {
@@ -167,7 +167,7 @@ impl CreateTableBuilder {
#[cfg(test)]
mod tests {
use arrow_array::{
record_batch, Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator,
Array, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, record_batch,
};
use arrow_schema::{ArrowError, DataType, Field, Schema};
use futures::TryStreamExt;
@@ -380,11 +380,12 @@ mod tests {
.await
.unwrap();
let other_schema = Arc::new(Schema::new(vec![Field::new("y", DataType::Int32, false)]));
assert!(db
.create_empty_table("test", other_schema.clone())
.execute()
.await
.is_err()); // TODO: assert what this error is
assert!(
db.create_empty_table("test", other_schema.clone())
.execute()
.await
.is_err()
); // TODO: assert what this error is
let overwritten = db
.create_empty_table("test", other_schema.clone())
.mode(CreateTableMode::Overwrite)

View File

@@ -5,9 +5,9 @@ use std::collections::HashMap;
use arrow::compute::kernels::{aggregate::bool_and, length::length};
use arrow_array::{
Array, GenericListArray, OffsetSizeTrait, PrimitiveArray, RecordBatchReader,
cast::AsArray,
types::{ArrowPrimitiveType, Int32Type, Int64Type},
Array, GenericListArray, OffsetSizeTrait, PrimitiveArray, RecordBatchReader,
};
use arrow_ord::cmp::eq;
use arrow_schema::DataType;
@@ -78,7 +78,7 @@ pub fn infer_vector_columns(
_ => {
return Err(Error::Schema {
message: format!("Column {} is not a list", col_name),
})
});
}
} {
if let Some(Some(prev_dim)) = columns_to_infer.get(&col_name) {
@@ -102,8 +102,8 @@ mod tests {
use super::*;
use arrow_array::{
types::{Float32Type, Float64Type},
FixedSizeListArray, Float32Array, ListArray, RecordBatch, RecordBatchIterator, StringArray,
types::{Float32Type, Float64Type},
};
use arrow_schema::{DataType, Field, Schema};
use std::{sync::Arc, vec};

View File

@@ -4,10 +4,10 @@
use std::{iter::repeat_with, sync::Arc};
use arrow_array::{
cast::AsArray,
types::{Float16Type, Float32Type, Float64Type, Int32Type, Int64Type},
Array, ArrowNumericType, FixedSizeListArray, PrimitiveArray, RecordBatch, RecordBatchIterator,
RecordBatchReader,
cast::AsArray,
types::{Float16Type, Float32Type, Float64Type, Int32Type, Int64Type},
};
use arrow_cast::{can_cast_types, cast};
use arrow_schema::{ArrowError, DataType, Field, Schema};
@@ -184,7 +184,7 @@ mod tests {
use std::sync::Arc;
use arrow_array::{
FixedSizeListArray, Float16Array, Float32Array, Float64Array, Int32Array, Int8Array,
FixedSizeListArray, Float16Array, Float32Array, Float64Array, Int8Array, Int32Array,
RecordBatch, RecordBatchIterator, StringArray,
};
use arrow_schema::Field;

View File

@@ -13,16 +13,16 @@ use crate::arrow::{
SendableRecordBatchStream, SendableRecordBatchStreamExt, SimpleRecordBatchStream,
};
use crate::embeddings::{
compute_embeddings_for_batch, compute_output_schema, EmbeddingDefinition, EmbeddingFunction,
EmbeddingRegistry,
EmbeddingDefinition, EmbeddingFunction, EmbeddingRegistry, compute_embeddings_for_batch,
compute_output_schema,
};
use crate::table::{ColumnDefinition, ColumnKind, TableDefinition};
use crate::{Error, Result};
use arrow_array::{ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
use async_trait::async_trait;
use futures::stream::once;
use futures::StreamExt;
use futures::stream::once;
use lance_datafusion::utils::StreamingWriteSource;
pub trait Scannable: Send {

View File

@@ -19,12 +19,12 @@ use std::sync::Arc;
use std::time::Duration;
use lance::dataset::ReadParams;
use lance_namespace::LanceNamespace;
use lance_namespace::models::{
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use lance_namespace::LanceNamespace;
use crate::data::scannable::Scannable;
use crate::error::Result;

View File

@@ -8,7 +8,7 @@ use std::path::Path;
use std::{collections::HashMap, sync::Arc};
use lance::dataset::refs::Ref;
use lance::dataset::{builder::DatasetBuilder, ReadParams, WriteMode};
use lance::dataset::{ReadParams, WriteMode, builder::DatasetBuilder};
use lance::io::{ObjectStore, ObjectStoreParams, WrappingObjectStore};
use lance_datafusion::utils::StreamingWriteSource;
use lance_encoding::version::LanceFileVersion;
@@ -1097,11 +1097,11 @@ impl Database for ListingDatabase {
#[cfg(test)]
mod tests {
use super::*;
use crate::Table;
use crate::connection::ConnectRequest;
use crate::data::scannable::Scannable;
use crate::database::{CreateTableMode, CreateTableRequest};
use crate::table::WriteOptions;
use crate::Table;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use std::path::PathBuf;

View File

@@ -9,16 +9,15 @@ use std::sync::Arc;
use async_trait::async_trait;
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
models::{
CreateEmptyTableRequest, CreateNamespaceRequest, CreateNamespaceResponse,
DeclareTableRequest, DescribeNamespaceRequest, DescribeNamespaceResponse,
DescribeTableRequest, DropNamespaceRequest, DropNamespaceResponse, DropTableRequest,
ListNamespacesRequest, ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
},
LanceNamespace,
models::{
CreateNamespaceRequest, CreateNamespaceResponse, DeclareTableRequest,
DescribeNamespaceRequest, DescribeNamespaceResponse, DescribeTableRequest,
DropNamespaceRequest, DropNamespaceResponse, DropTableRequest, ListNamespacesRequest,
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
},
};
use lance_namespace_impls::ConnectBuilder;
use log::warn;
use crate::database::ReadConsistency;
use crate::error::{Error, Result};
@@ -213,63 +212,18 @@ impl Database for LanceNamespaceDatabase {
..Default::default()
};
let (location, initial_storage_options) =
match self.namespace.declare_table(declare_request).await {
Ok(response) => {
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response"
.to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
}
Err(e) => {
// Check if the error is "not supported" and try create_empty_table as fallback
let err_str = e.to_string().to_lowercase();
if err_str.contains("not supported") || err_str.contains("not implemented") {
warn!(
"declare_table is not supported by the namespace client, \
falling back to deprecated create_empty_table. \
create_empty_table is deprecated and will be removed in Lance 3.0.0. \
Please upgrade your namespace client to support declare_table."
);
#[allow(deprecated)]
let create_empty_request = CreateEmptyTableRequest {
id: Some(table_id.clone()),
..Default::default()
};
#[allow(deprecated)]
let create_response = self
.namespace
.create_empty_table(create_empty_request)
.await
.map_err(|e| Error::Runtime {
message: format!("Failed to create empty table: {}", e),
})?;
let loc = create_response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from create_empty_table response"
.to_string(),
})?;
// For deprecated path, use self.storage_options
let opts = if self.storage_options.is_empty() {
None
} else {
Some(self.storage_options.clone())
};
(loc, opts)
} else {
return Err(Error::Runtime {
message: format!("Failed to declare table: {}", e),
});
}
}
};
let (location, initial_storage_options) = {
let response = self.namespace.declare_table(declare_request).await?;
let loc = response.location.ok_or_else(|| Error::Runtime {
message: "Table location is missing from declare_table response".to_string(),
})?;
// Use storage options from response, fall back to self.storage_options
let opts = response
.storage_options
.or_else(|| Some(self.storage_options.clone()))
.filter(|o| !o.is_empty());
(loc, opts)
};
let write_params = if let Some(storage_opts) = initial_storage_options {
let mut params = request.write_options.lance_write_params.unwrap_or_default();

View File

@@ -11,16 +11,16 @@ use lance_core::ROW_ID;
use lance_datafusion::exec::SessionContextExt;
use crate::{
Error, Result, Table,
arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt, SimpleRecordBatchStream},
connect,
database::{CreateTableRequest, Database},
dataloader::permutation::{
shuffle::{Shuffler, ShufflerConfig},
split::{SplitStrategy, Splitter, SPLIT_ID_COLUMN},
util::{rename_column, TemporaryDirectory},
split::{SPLIT_ID_COLUMN, SplitStrategy, Splitter},
util::{TemporaryDirectory, rename_column},
},
query::{ExecutableQuery, QueryBase, Select},
Error, Result, Table,
};
pub const SRC_ROW_ID_COL: &str = "row_id";

View File

@@ -25,8 +25,8 @@ use futures::{StreamExt, TryStreamExt};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::io::RecordBatchStream;
use lance_arrow::RecordBatchExt;
use lance_core::error::LanceOptionExt;
use lance_core::ROW_ID;
use lance_core::error::LanceOptionExt;
use std::collections::HashMap;
use std::sync::Arc;
@@ -500,10 +500,10 @@ mod tests {
use rand::seq::SliceRandom;
use crate::{
Table,
arrow::SendableRecordBatchStream,
query::{ExecutableQuery, QueryBase},
test_utils::datagen::{virtual_table, LanceDbDatagenExt},
Table,
test_utils::datagen::{LanceDbDatagenExt, virtual_table},
};
use super::*;

View File

@@ -18,12 +18,12 @@ use lance_io::{
scheduler::{ScanScheduler, SchedulerConfig},
utils::CachedFileSize,
};
use rand::{seq::SliceRandom, Rng, RngCore};
use rand::{Rng, RngCore, seq::SliceRandom};
use crate::{
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
dataloader::permutation::util::{non_crypto_rng, TemporaryDirectory},
Error, Result,
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
dataloader::permutation::util::{TemporaryDirectory, non_crypto_rng},
};
#[derive(Debug, Clone)]
@@ -281,7 +281,7 @@ mod tests {
use datafusion_expr::col;
use futures::TryStreamExt;
use lance_datagen::{BatchCount, BatchGeneratorBuilder, ByteCount, RowCount, Seed};
use rand::{rngs::SmallRng, SeedableRng};
use rand::{SeedableRng, rngs::SmallRng};
fn test_gen() -> BatchGeneratorBuilder {
lance_datagen::gen_batch()

View File

@@ -2,8 +2,8 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
};
use arrow_array::{Array, BooleanArray, RecordBatch, UInt64Array};
@@ -15,13 +15,13 @@ use lance_arrow::SchemaExt;
use lance_core::ROW_ID;
use crate::{
Error, Result,
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
dataloader::{
permutation::shuffle::{Shuffler, ShufflerConfig},
permutation::util::TemporaryDirectory,
},
query::{Query, QueryBase, Select},
Error, Result,
};
pub const SPLIT_ID_COLUMN: &str = "split_id";

View File

@@ -7,12 +7,12 @@ use arrow_array::RecordBatch;
use arrow_schema::{Fields, Schema};
use datafusion_execution::disk_manager::DiskManagerMode;
use futures::TryStreamExt;
use rand::{rngs::SmallRng, RngCore, SeedableRng};
use rand::{RngCore, SeedableRng, rngs::SmallRng};
use tempfile::TempDir;
use crate::{
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
Error, Result,
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
};
/// Directory to use for temporary files

View File

@@ -23,9 +23,9 @@ use arrow_schema::{DataType, Field, SchemaBuilder, SchemaRef};
use serde::{Deserialize, Serialize};
use crate::{
Error,
error::Result,
table::{ColumnDefinition, ColumnKind, TableDefinition},
Error,
};
/// Trait for embedding functions

View File

@@ -8,7 +8,7 @@ use arrow::array::{AsArray, Float32Builder};
use arrow_array::{Array, ArrayRef, FixedSizeListArray, Float32Array};
use arrow_data::ArrayData;
use arrow_schema::DataType;
use serde_json::{json, Value};
use serde_json::{Value, json};
use super::EmbeddingFunction;
use crate::{Error, Result};

View File

@@ -8,9 +8,9 @@ use arrow_array::{Array, ArrayRef, FixedSizeListArray, Float32Array};
use arrow_data::ArrayData;
use arrow_schema::DataType;
use async_openai::{
Client,
config::OpenAIConfig,
types::{CreateEmbeddingRequest, Embedding, EmbeddingInput, EncodingFormat},
Client,
};
use tokio::{runtime::Handle, task};

View File

@@ -7,7 +7,7 @@ use super::EmbeddingFunction;
use arrow::{
array::{AsArray, PrimitiveBuilder},
datatypes::{
ArrowPrimitiveType, Float16Type, Float32Type, Float64Type, Int64Type, UInt32Type, UInt8Type,
ArrowPrimitiveType, Float16Type, Float32Type, Float64Type, Int64Type, UInt8Type, UInt32Type,
},
};
use arrow_array::{Array, FixedSizeListArray, PrimitiveArray};
@@ -16,8 +16,8 @@ use arrow_schema::DataType;
use candle_core::{CpuStorage, Device, Layout, Storage, Tensor};
use candle_nn::VarBuilder;
use candle_transformers::models::bert::{BertModel, DTYPE};
use hf_hub::{api::sync::Api, Repo, RepoType};
use tokenizers::{tokenizer::Tokenizer, PaddingParams};
use hf_hub::{Repo, RepoType, api::sync::Api};
use tokenizers::{PaddingParams, tokenizer::Tokenizer};
/// Compute embeddings using huggingface sentence-transformers.
pub struct SentenceTransformersEmbeddingsBuilder {
@@ -230,7 +230,7 @@ impl SentenceTransformersEmbeddings {
Storage::Cpu(CpuStorage::BF16(_)) => {
return Err(crate::Error::Runtime {
message: "unsupported data type".to_string(),
})
});
}
_ => unreachable!("we already moved the tensor to the CPU device"),
};
@@ -298,12 +298,12 @@ impl SentenceTransformersEmbeddings {
DataType::Utf8View => {
return Err(crate::Error::Runtime {
message: "Utf8View not yet implemented".to_string(),
})
});
}
_ => {
return Err(crate::Error::Runtime {
message: "invalid type".to_string(),
})
});
}
};

View File

@@ -24,7 +24,7 @@ pub use sql::expr_to_sql_string;
use std::sync::Arc;
use arrow_schema::DataType;
use datafusion_expr::{expr_fn::cast, Expr, ScalarUDF};
use datafusion_expr::{Expr, ScalarUDF, expr_fn::cast};
use datafusion_functions::string::expr_fn as string_expr_fn;
pub use datafusion_expr::{col, lit};

View File

@@ -9,7 +9,7 @@ use std::time::Duration;
use vector::IvfFlatIndexBuilder;
use crate::index::vector::IvfRqIndexBuilder;
use crate::{table::BaseTable, DistanceType, Error, Result};
use crate::{DistanceType, Error, Result, table::BaseTable};
use self::{
scalar::{BTreeIndexBuilder, BitmapIndexBuilder, LabelListIndexBuilder},

View File

@@ -27,7 +27,7 @@
///
/// The btree index does not currently have any parameters though parameters such as the
/// block size may be added in the future.
#[derive(Default, Debug, Clone)]
#[derive(Default, Debug, Clone, serde::Serialize)]
pub struct BTreeIndexBuilder {}
impl BTreeIndexBuilder {}
@@ -39,7 +39,7 @@ impl BTreeIndexBuilder {}
/// This index works best for low-cardinality (i.e., less than 1000 unique values) columns,
/// where the number of unique values is small.
/// The bitmap stores a list of row ids where the value is present.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct BitmapIndexBuilder {}
/// Builder for LabelList index.
@@ -48,10 +48,10 @@ pub struct BitmapIndexBuilder {}
/// support queries with `array_contains_all` and `array_contains_any`
/// using an underlying bitmap index.
///
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct LabelListIndexBuilder {}
pub use lance_index::scalar::inverted::query::*;
pub use lance_index::scalar::FullTextSearchQuery;
pub use lance_index::scalar::InvertedIndexParams as FtsIndexBuilder;
pub use lance_index::scalar::InvertedIndexParams;
pub use lance_index::scalar::inverted::query::*;

View File

@@ -7,6 +7,7 @@
//! Vector indices are only supported on fixed-size-list (tensor) columns of floating point
//! values
use lance::table::format::{IndexMetadata, Manifest};
use serde::Serialize;
use crate::DistanceType;
@@ -181,14 +182,17 @@ macro_rules! impl_hnsw_params_setter {
/// The partitioning process is called IVF and the `num_partitions` parameter controls how many groups to create.
///
/// Note that training an IVF Flat index on a large dataset is a slow operation and currently is also a memory intensive operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfFlatIndexBuilder {
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
// IVF
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
}
@@ -213,14 +217,17 @@ impl IvfFlatIndexBuilder {
///
/// This index compresses vectors using scalar quantization and groups them into IVF partitions.
/// It offers a balance between search performance and storage footprint.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfSqIndexBuilder {
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
// IVF
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
}
@@ -261,18 +268,23 @@ impl IvfSqIndexBuilder {
///
/// Note that training an IVF PQ index on a large dataset is a slow operation and
/// currently is also a memory intensive operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfPqIndexBuilder {
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
// IVF
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
// PQ
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_sub_vectors: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_bits: Option<u32>,
}
@@ -323,14 +335,18 @@ pub(crate) fn suggested_num_sub_vectors(dim: u32) -> u32 {
///
/// Note that training an IVF RQ index on a large dataset is a slow operation and
/// currently is also a memory intensive operation.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfRqIndexBuilder {
// IVF
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_bits: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
}
@@ -365,13 +381,16 @@ impl IvfRqIndexBuilder {
/// quickly find the closest vectors to a query vector.
///
/// The PQ (product quantizer) is used to compress the vectors as the same as IVF PQ.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfHnswPqIndexBuilder {
// IVF
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
// HNSW
@@ -379,7 +398,9 @@ pub struct IvfHnswPqIndexBuilder {
pub(crate) ef_construction: u32,
// PQ
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_sub_vectors: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_bits: Option<u32>,
}
@@ -415,13 +436,16 @@ impl IvfHnswPqIndexBuilder {
///
/// The SQ (scalar quantizer) is used to compress the vectors,
/// each vector is mapped to a 8-bit integer vector, 4x compression ratio for float32 vector.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct IvfHnswSqIndexBuilder {
// IVF
#[serde(rename = "metric_type")]
pub(crate) distance_type: DistanceType,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) num_partitions: Option<u32>,
pub(crate) sample_rate: u32,
pub(crate) max_iterations: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) target_partition_size: Option<u32>,
// HNSW

View File

@@ -1,9 +1,9 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::Error;
use crate::error::Result;
use crate::table::BaseTable;
use crate::Error;
use log::debug;
use std::time::{Duration, Instant};
use tokio::time::sleep;

View File

@@ -5,11 +5,11 @@
use std::{fmt::Formatter, sync::Arc};
use futures::{stream::BoxStream, TryFutureExt};
use futures::{TryFutureExt, stream::BoxStream};
use lance::io::WrappingObjectStore;
use object_store::{
path::Path, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
};
use async_trait::async_trait;

View File

@@ -10,8 +10,9 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use lance::io::WrappingObjectStore;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
path::Path,
};
#[derive(Debug, Default)]

View File

@@ -5,26 +5,26 @@ use std::sync::Arc;
use std::{future::Future, time::Duration};
use arrow::compute::concat_batches;
use arrow_array::{make_array, Array, Float16Array, Float32Array, Float64Array};
use arrow_array::{Array, Float16Array, Float32Array, Float64Array, make_array};
use arrow_schema::{DataType, SchemaRef};
use datafusion_expr::Expr;
use datafusion_physical_plan::ExecutionPlan;
use futures::{stream, try_join, FutureExt, TryFutureExt, TryStreamExt};
use futures::{FutureExt, TryFutureExt, TryStreamExt, stream, try_join};
use half::f16;
use lance::dataset::{scanner::DatasetRecordBatchStream, ROW_ID};
use lance::dataset::{ROW_ID, scanner::DatasetRecordBatchStream};
use lance_arrow::RecordBatchExt;
use lance_datafusion::exec::execute_plan;
use lance_index::scalar::inverted::SCORE_COL;
use lance_index::scalar::FullTextSearchQuery;
use lance_index::scalar::inverted::SCORE_COL;
use lance_index::vector::DIST_COL;
use lance_io::stream::RecordBatchStreamAdapter;
use crate::DistanceType;
use crate::error::{Error, Result};
use crate::rerankers::rrf::RRFReranker;
use crate::rerankers::{check_reranker_result, NormalizeMethod, Reranker};
use crate::rerankers::{NormalizeMethod, Reranker, check_reranker_result};
use crate::table::BaseTable;
use crate::utils::TimeoutStream;
use crate::DistanceType;
use crate::{arrow::SendableRecordBatchStream, table::AnyQuery};
mod hybrid;
@@ -161,10 +161,11 @@ impl IntoQueryVector for &dyn Array {
if data_type != self.data_type() {
Err(Error::InvalidInput {
message: format!(
"failed to create query vector, the input data type was {:?} but the expected data type was {:?}",
self.data_type(),
data_type
)})
"failed to create query vector, the input data type was {:?} but the expected data type was {:?}",
self.data_type(),
data_type
),
})
} else {
let data = self.to_data();
Ok(make_array(data))
@@ -186,7 +187,7 @@ impl IntoQueryVector for &[f16] {
DataType::Float32 => {
let arr: Vec<f32> = self.iter().map(|x| f32::from(*x)).collect();
Ok(Arc::new(Float32Array::from(arr)))
},
}
DataType::Float64 => {
let arr: Vec<f64> = self.iter().map(|x| f64::from(*x)).collect();
Ok(Arc::new(Float64Array::from(arr)))
@@ -194,8 +195,7 @@ impl IntoQueryVector for &[f16] {
_ => Err(Error::InvalidInput {
message: format!(
"failed to create query vector, the input data type was &[f16] but the embedding model \"{}\" expected data type {:?}",
embedding_model_label,
data_type
embedding_model_label, data_type
),
}),
}
@@ -216,7 +216,7 @@ impl IntoQueryVector for &[f32] {
DataType::Float32 => {
let arr: Vec<f32> = self.to_vec();
Ok(Arc::new(Float32Array::from(arr)))
},
}
DataType::Float64 => {
let arr: Vec<f64> = self.iter().map(|x| *x as f64).collect();
Ok(Arc::new(Float64Array::from(arr)))
@@ -224,8 +224,7 @@ impl IntoQueryVector for &[f32] {
_ => Err(Error::InvalidInput {
message: format!(
"failed to create query vector, the input data type was &[f32] but the embedding model \"{}\" expected data type {:?}",
embedding_model_label,
data_type
embedding_model_label, data_type
),
}),
}
@@ -239,26 +238,25 @@ impl IntoQueryVector for &[f64] {
embedding_model_label: &str,
) -> Result<Arc<dyn Array>> {
match data_type {
DataType::Float16 => {
let arr: Vec<f16> = self.iter().map(|x| f16::from_f64(*x)).collect();
Ok(Arc::new(Float16Array::from(arr)))
}
DataType::Float32 => {
let arr: Vec<f32> = self.iter().map(|x| *x as f32).collect();
Ok(Arc::new(Float32Array::from(arr)))
},
DataType::Float64 => {
let arr: Vec<f64> = self.to_vec();
Ok(Arc::new(Float64Array::from(arr)))
}
_ => Err(Error::InvalidInput {
message: format!(
"failed to create query vector, the input data type was &[f64] but the embedding model \"{}\" expected data type {:?}",
embedding_model_label,
data_type
),
}),
DataType::Float16 => {
let arr: Vec<f16> = self.iter().map(|x| f16::from_f64(*x)).collect();
Ok(Arc::new(Float16Array::from(arr)))
}
DataType::Float32 => {
let arr: Vec<f32> = self.iter().map(|x| *x as f32).collect();
Ok(Arc::new(Float32Array::from(arr)))
}
DataType::Float64 => {
let arr: Vec<f64> = self.to_vec();
Ok(Arc::new(Float64Array::from(arr)))
}
_ => Err(Error::InvalidInput {
message: format!(
"failed to create query vector, the input data type was &[f64] but the embedding model \"{}\" expected data type {:?}",
embedding_model_label, data_type
),
}),
}
}
}
@@ -1011,13 +1009,13 @@ impl VectorQuery {
message: "minimum_nprobes must be greater than 0".to_string(),
});
}
if let Some(maximum_nprobes) = self.request.maximum_nprobes {
if minimum_nprobes > maximum_nprobes {
return Err(Error::InvalidInput {
message: "minimum_nprobes must be less than or equal to maximum_nprobes"
.to_string(),
});
}
if let Some(maximum_nprobes) = self.request.maximum_nprobes
&& minimum_nprobes > maximum_nprobes
{
return Err(Error::InvalidInput {
message: "minimum_nprobes must be less than or equal to maximum_nprobes"
.to_string(),
});
}
self.request.minimum_nprobes = minimum_nprobes;
Ok(self)
@@ -1407,8 +1405,8 @@ mod tests {
use super::*;
use arrow::{array::downcast_array, compute::concat_batches, datatypes::Int32Type};
use arrow_array::{
cast::AsArray, types::Float32Type, FixedSizeListArray, Float32Array, Int32Array,
RecordBatch, StringArray,
FixedSizeListArray, Float32Array, Int32Array, RecordBatch, StringArray, cast::AsArray,
types::Float32Type,
};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use futures::{StreamExt, TryStreamExt};
@@ -1416,7 +1414,7 @@ mod tests {
use rand::seq::IndexedRandom;
use tempfile::tempdir;
use crate::{connect, database::CreateTableMode, index::Index, Table};
use crate::{Table, connect, database::CreateTableMode, index::Index};
#[tokio::test]
async fn test_setters_getters() {
@@ -1754,11 +1752,13 @@ mod tests {
.limit(1)
.execute()
.await;
assert!(error_result
.err()
.unwrap()
.to_string()
.contains("No vector column found to match with the query vector dimension: 3"));
assert!(
error_result
.err()
.unwrap()
.to_string()
.contains("No vector column found to match with the query vector dimension: 3")
);
}
#[tokio::test]
@@ -2010,7 +2010,7 @@ mod tests {
// Sample 1 - 3 tokens for each string value
let tokens = ["a", "b", "c", "d", "e"];
use rand::{rng, Rng};
use rand::{Rng, rng};
let mut rng = rng();
let text: StringArray = (0..nrows)

View File

@@ -5,7 +5,7 @@ use arrow::compute::{
kernels::numeric::{div, sub},
max, min,
};
use arrow_array::{cast::downcast_array, Float32Array, RecordBatch};
use arrow_array::{Float32Array, RecordBatch, cast::downcast_array};
use arrow_schema::{DataType, Field, Schema, SortOptions};
use lance::dataset::ROW_ID;
use lance_index::{scalar::inverted::SCORE_COL, vector::DIST_COL};
@@ -253,7 +253,10 @@ mod test {
let result = rank(batch.clone(), "bad_col", None);
match result {
Err(Error::InvalidInput { message }) => {
assert_eq!("expected column bad_col not found in rank. found columns [\"name\", \"score\"]", message);
assert_eq!(
"expected column bad_col not found in rank. found columns [\"name\", \"score\"]",
message
);
}
_ => {
panic!("expected invalid input error, received {:?}", result)

View File

@@ -4,8 +4,8 @@
use http::HeaderName;
use log::debug;
use reqwest::{
header::{HeaderMap, HeaderValue},
Body, Request, RequestBuilder, Response,
header::{HeaderMap, HeaderValue},
};
use std::{collections::HashMap, future::Future, str::FromStr, sync::Arc, time::Duration};
@@ -14,6 +14,7 @@ use crate::remote::db::RemoteOptions;
use crate::remote::retry::{ResolvedRetryConfig, RetryCounter};
const REQUEST_ID_HEADER: HeaderName = HeaderName::from_static("x-request-id");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
/// Configuration for TLS/mTLS settings.
#[derive(Clone, Debug, Default)]
@@ -52,6 +53,10 @@ pub struct ClientConfig {
pub tls_config: Option<TlsConfig>,
/// Provider for custom headers to be added to each request
pub header_provider: Option<Arc<dyn HeaderProvider>>,
/// Enable MemWAL write path for streaming writes.
/// When true, write operations will use the MemWAL architecture
/// for high-performance streaming writes.
pub mem_wal_enabled: Option<bool>,
}
impl std::fmt::Debug for ClientConfig {
@@ -67,6 +72,7 @@ impl std::fmt::Debug for ClientConfig {
"header_provider",
&self.header_provider.as_ref().map(|_| "Some(...)"),
)
.field("mem_wal_enabled", &self.mem_wal_enabled)
.finish()
}
}
@@ -81,6 +87,7 @@ impl Default for ClientConfig {
id_delimiter: None,
tls_config: None,
header_provider: None,
mem_wal_enabled: None,
}
}
}
@@ -446,13 +453,23 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
})?,
);
}
if let Some(v) = options.0.get("azure_storage_account_name") {
headers.insert(
HeaderName::from_static("x-azure-storage-account-name"),
HeaderValue::from_str(v).map_err(|_| Error::InvalidInput {
message: format!("non-ascii storage account name '{}' provided", db_name),
})?,
);
// Map azure storage options to x-azure-* headers.
// The option key uses underscores (e.g. "azure_client_id") while the
// header uses hyphens (e.g. "x-azure-client-id").
let azure_opts: [(&str, &str); 3] = [
("azure_storage_account_name", "x-azure-storage-account-name"),
("azure_client_id", "x-azure-client-id"),
("azure_tenant_id", "x-azure-tenant-id"),
];
for (opt_key, header_name) in azure_opts {
if let Some(v) = options.0.get(opt_key) {
headers.insert(
HeaderName::from_static(header_name),
HeaderValue::from_str(v).map_err(|_| Error::InvalidInput {
message: format!("non-ascii value '{}' for option '{}'", v, opt_key),
})?,
);
}
}
for (key, value) in &config.extra_headers {
@@ -467,6 +484,11 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
);
}
// Add MemWAL header if enabled
if let Some(true) = config.mem_wal_enabled {
headers.insert(MEM_WAL_ENABLED_HEADER, HeaderValue::from_static("true"));
}
Ok(headers)
}
@@ -650,14 +672,13 @@ impl<S: HttpSend> RestfulLanceDbClient<S> {
pub fn extract_request_id(&self, request: &mut Request) -> String {
// Set a request id.
// TODO: allow the user to supply this, through middleware?
let request_id = if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
if let Some(request_id) = request.headers().get(REQUEST_ID_HEADER) {
request_id.to_str().unwrap().to_string()
} else {
let request_id = uuid::Uuid::new_v4().to_string();
self.set_request_id(request, &request_id);
request_id
};
request_id
}
}
/// Set the request ID header
@@ -1076,4 +1097,34 @@ mod tests {
_ => panic!("Expected Runtime error"),
}
}
#[test]
fn test_default_headers_azure_opts() {
let mut opts = HashMap::new();
opts.insert(
"azure_storage_account_name".to_string(),
"myaccount".to_string(),
);
opts.insert("azure_client_id".to_string(), "my-client-id".to_string());
opts.insert("azure_tenant_id".to_string(), "my-tenant-id".to_string());
let remote_opts = RemoteOptions::new(opts);
let headers = RestfulLanceDbClient::<Sender>::default_headers(
"test-key",
"us-east-1",
"testdb",
false,
&remote_opts,
None,
&ClientConfig::default(),
)
.unwrap();
assert_eq!(
headers.get("x-azure-storage-account-name").unwrap(),
"myaccount"
);
assert_eq!(headers.get("x-azure-client-id").unwrap(), "my-client-id");
assert_eq!(headers.get("x-azure-tenant-id").unwrap(), "my-tenant-id");
}
}

View File

@@ -16,6 +16,7 @@ use lance_namespace::models::{
ListNamespacesResponse, ListTablesRequest, ListTablesResponse,
};
use crate::Error;
use crate::database::{
CloneTableRequest, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, ReadConsistency, TableNamesRequest,
@@ -23,12 +24,11 @@ use crate::database::{
use crate::error::Result;
use crate::remote::util::stream_as_body;
use crate::table::BaseTable;
use crate::Error;
use super::ARROW_STREAM_CONTENT_TYPE;
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
use super::util::parse_server_version;
use super::ARROW_STREAM_CONTENT_TYPE;
// Request structure for the remote clone table API
#[derive(serde::Serialize)]
@@ -78,6 +78,7 @@ pub const OPT_REMOTE_PREFIX: &str = "remote_database_";
pub const OPT_REMOTE_API_KEY: &str = "remote_database_api_key";
pub const OPT_REMOTE_REGION: &str = "remote_database_region";
pub const OPT_REMOTE_HOST_OVERRIDE: &str = "remote_database_host_override";
pub const OPT_REMOTE_MEM_WAL_ENABLED: &str = "remote_database_mem_wal_enabled";
// TODO: add support for configuring client config via key/value options
#[derive(Clone, Debug, Default)]
@@ -98,6 +99,12 @@ pub struct RemoteDatabaseOptions {
/// These options are only used for LanceDB Enterprise and only a subset of options
/// are supported.
pub storage_options: HashMap<String, String>,
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations (insert, merge_insert, etc.) will use
/// the MemWAL architecture which buffers writes in memory and Write-Ahead Log
/// before asynchronously merging to the base table.
pub mem_wal_enabled: Option<bool>,
}
impl RemoteDatabaseOptions {
@@ -109,6 +116,9 @@ impl RemoteDatabaseOptions {
let api_key = map.get(OPT_REMOTE_API_KEY).cloned();
let region = map.get(OPT_REMOTE_REGION).cloned();
let host_override = map.get(OPT_REMOTE_HOST_OVERRIDE).cloned();
let mem_wal_enabled = map
.get(OPT_REMOTE_MEM_WAL_ENABLED)
.map(|v| v.to_lowercase() == "true");
let storage_options = map
.iter()
.filter(|(key, _)| !key.starts_with(OPT_REMOTE_PREFIX))
@@ -119,6 +129,7 @@ impl RemoteDatabaseOptions {
region,
host_override,
storage_options,
mem_wal_enabled,
})
}
}
@@ -137,6 +148,12 @@ impl DatabaseOptions for RemoteDatabaseOptions {
if let Some(host_override) = &self.host_override {
map.insert(OPT_REMOTE_HOST_OVERRIDE.to_string(), host_override.clone());
}
if let Some(mem_wal_enabled) = &self.mem_wal_enabled {
map.insert(
OPT_REMOTE_MEM_WAL_ENABLED.to_string(),
mem_wal_enabled.to_string(),
);
}
}
}
@@ -181,6 +198,20 @@ impl RemoteDatabaseOptionsBuilder {
self.options.host_override = Some(host_override);
self
}
/// Enable MemWAL write path for high-performance streaming writes.
///
/// When enabled, write operations will use the MemWAL architecture
/// which buffers writes in memory and Write-Ahead Log before
/// asynchronously merging to the base table.
///
/// # Arguments
///
/// * `enabled` - Whether to enable MemWAL writes
pub fn mem_wal_enabled(mut self, enabled: bool) -> Self {
self.options.mem_wal_enabled = Some(enabled);
self
}
}
#[derive(Debug)]
@@ -249,9 +280,9 @@ impl RemoteDatabase {
#[cfg(all(test, feature = "remote"))]
mod test_utils {
use super::*;
use crate::remote::ClientConfig;
use crate::remote::client::test_utils::MockSender;
use crate::remote::client::test_utils::{client_with_handler, client_with_handler_and_config};
use crate::remote::ClientConfig;
impl RemoteDatabase<MockSender> {
pub fn new_mock<F, T>(handler: F) -> Self
@@ -777,7 +808,12 @@ impl RemoteOptions {
impl From<StorageOptions> for RemoteOptions {
fn from(options: StorageOptions) -> Self {
let supported_opts = vec!["account_name", "azure_storage_account_name"];
let supported_opts = vec![
"account_name",
"azure_storage_account_name",
"azure_client_id",
"azure_tenant_id",
];
let mut filtered = HashMap::new();
for opt in supported_opts {
if let Some(v) = options.0.get(opt) {
@@ -799,9 +835,9 @@ mod tests {
use crate::connection::ConnectBuilder;
use crate::{
database::CreateTableMode,
remote::{ClientConfig, HeaderProvider, ARROW_STREAM_CONTENT_TYPE, JSON_CONTENT_TYPE},
Connection, Error,
database::CreateTableMode,
remote::{ARROW_STREAM_CONTENT_TYPE, ClientConfig, HeaderProvider, JSON_CONTENT_TYPE},
};
#[test]

View File

@@ -1,8 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use crate::remote::RetryConfig;
use crate::Error;
use crate::remote::RetryConfig;
use log::debug;
use std::time::Duration;

View File

@@ -6,15 +6,14 @@ pub mod insert;
use self::insert::RemoteInsertExec;
use crate::expr::expr_to_sql_string;
use super::ARROW_STREAM_CONTENT_TYPE;
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
use crate::index::waiter::wait_for_index;
use crate::index::Index;
use crate::index::IndexStatistics;
use crate::index::waiter::wait_for_index;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::query::create_multi_vector_plan;
use crate::table::AddColumnsResult;
use crate::table::AddResult;
use crate::table::AlterColumnsResult;
@@ -23,19 +22,20 @@ use crate::table::DropColumnsResult;
use crate::table::MergeResult;
use crate::table::Tags;
use crate::table::UpdateResult;
use crate::table::query::create_multi_vector_plan;
use crate::table::{AnyQuery, Filter, TableStatistics};
use crate::utils::background_cache::BackgroundCache;
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error};
use crate::{
error::Result,
index::{IndexBuilder, IndexConfig},
query::QueryExecutionOptions,
table::{
merge::MergeInsertBuilder, AddDataBuilder, BaseTable, OptimizeAction, OptimizeStats,
TableDefinition, UpdateBuilder,
AddDataBuilder, BaseTable, OptimizeAction, OptimizeStats, TableDefinition, UpdateBuilder,
merge::MergeInsertBuilder,
},
};
use crate::{DistanceType, Error};
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
@@ -50,7 +50,7 @@ use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::refs::TagContents;
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::{execute_plan, OneShotExec};
use lance_datafusion::exec::{OneShotExec, execute_plan};
use reqwest::{RequestBuilder, Response};
use serde::{Deserialize, Serialize};
use serde_json::Number;
@@ -62,6 +62,7 @@ use std::time::Duration;
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MEM_WAL_ENABLED_HEADER: HeaderName = HeaderName::from_static("x-lancedb-mem-wal-enabled");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
@@ -612,8 +613,8 @@ impl<S: HttpSend> RemoteTable<S> {
message: format!(
"Cannot mutate table reference fixed at version {}. Call checkout_latest() to get a mutable table reference.",
version
)
})
),
}),
}
}
@@ -697,10 +698,10 @@ impl<S: HttpSend> RemoteTable<S> {
Error::Retry { status_code, .. } => *status_code,
_ => None,
};
if let Some(status_code) = status_code {
if Self::should_invalidate_cache_for_status(status_code) {
self.invalidate_schema_cache();
}
if let Some(status_code) = status_code
&& Self::should_invalidate_cache_for_status(status_code)
{
self.invalidate_schema_cache();
}
}
}
@@ -783,9 +784,9 @@ impl<S: HttpSend> std::fmt::Display for RemoteTable<S> {
#[cfg(all(test, feature = "remote"))]
mod test_utils {
use super::*;
use crate::remote::client::test_utils::client_with_handler;
use crate::remote::client::test_utils::{client_with_handler_and_config, MockSender};
use crate::remote::ClientConfig;
use crate::remote::client::test_utils::client_with_handler;
use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config};
impl RemoteTable<MockSender> {
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
@@ -1227,7 +1228,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let body = response.text().await.err_to_http(request_id.clone())?;
if body.trim().is_empty() {
// Backward compatible with old servers
return Ok(DeleteResult { version: 0 });
return Ok(DeleteResult {
num_deleted_rows: 0,
version: 0,
});
}
let delete_response: DeleteResult =
serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -1248,13 +1252,13 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
0 => {
return Err(Error::InvalidInput {
message: "No columns specified".into(),
})
});
}
1 => index.columns.pop().unwrap(),
_ => {
return Err(Error::NotSupported {
message: "Indices over multiple columns not yet supported".into(),
})
});
}
};
let mut body = serde_json::json!({
@@ -1273,73 +1277,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
);
}
match index.index {
// TODO: Should we pass the actual index parameters? SaaS does not
// yet support them.
Index::IvfFlat(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_FLAT".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfPq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_PQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
if let Some(num_bits) = index.num_bits {
body["num_bits"] = serde_json::Value::Number(num_bits.into());
}
}
Index::IvfSq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_SQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfHnswSq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_HNSW_SQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
}
Index::IvfRq(index) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_RQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(index.distance_type.to_string().to_lowercase());
if let Some(num_partitions) = index.num_partitions {
body["num_partitions"] = serde_json::Value::Number(num_partitions.into());
}
if let Some(num_bits) = index.num_bits {
body["num_bits"] = serde_json::Value::Number(num_bits.into());
}
}
Index::BTree(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BTREE".to_string());
}
Index::Bitmap(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BITMAP".to_string());
}
Index::LabelList(_) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("LABEL_LIST".to_string());
}
Index::FTS(fts) => {
body[INDEX_TYPE_KEY] = serde_json::Value::String("FTS".to_string());
let params = serde_json::to_value(&fts).map_err(|e| Error::InvalidInput {
message: format!("failed to serialize FTS index params {:?}", e),
})?;
for (key, value) in params.as_object().unwrap() {
body[key] = value.clone();
}
}
fn to_json(params: &impl serde::Serialize) -> crate::Result<serde_json::Value> {
serde_json::to_value(params).map_err(|e| Error::InvalidInput {
message: format!("failed to serialize index params {:?}", e),
})
}
// Map each Index variant to its wire type name and serializable params.
// Auto is special-cased since it needs schema inspection.
let (index_type_str, params) = match &index.index {
Index::IvfFlat(p) => ("IVF_FLAT", Some(to_json(p)?)),
Index::IvfPq(p) => ("IVF_PQ", Some(to_json(p)?)),
Index::IvfSq(p) => ("IVF_SQ", Some(to_json(p)?)),
Index::IvfHnswSq(p) => ("IVF_HNSW_SQ", Some(to_json(p)?)),
Index::IvfRq(p) => ("IVF_RQ", Some(to_json(p)?)),
Index::BTree(p) => ("BTREE", Some(to_json(p)?)),
Index::Bitmap(p) => ("BITMAP", Some(to_json(p)?)),
Index::LabelList(p) => ("LABEL_LIST", Some(to_json(p)?)),
Index::FTS(p) => ("FTS", Some(to_json(p)?)),
Index::Auto => {
let schema = self.schema().await?;
let field = schema
@@ -1348,11 +1303,11 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
message: format!("Column {} not found in schema", column),
})?;
if supported_vector_data_type(field.data_type()) {
body[INDEX_TYPE_KEY] = serde_json::Value::String("IVF_PQ".to_string());
body[METRIC_TYPE_KEY] =
serde_json::Value::String(DistanceType::L2.to_string().to_lowercase());
("IVF_PQ", None)
} else if supported_btree_data_type(field.data_type()) {
body[INDEX_TYPE_KEY] = serde_json::Value::String("BTREE".to_string());
("BTREE", None)
} else {
return Err(Error::NotSupported {
message: format!(
@@ -1366,10 +1321,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
_ => {
return Err(Error::NotSupported {
message: "Index type not supported".into(),
})
});
}
};
body[INDEX_TYPE_KEY] = index_type_str.into();
if let Some(params) = params {
for (key, value) in params.as_object().expect("params should be a JSON object") {
body[key] = value.clone();
}
}
let request = request.json(&body);
let (request_id, response) = self.send(request, true).await?;
@@ -1398,6 +1360,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.check_mutable().await?;
let timeout = params.timeout;
let mem_wal = params.mem_wal;
let query = MergeInsertRequest::try_from(params)?;
let mut request = self
@@ -1413,6 +1376,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
}
if mem_wal {
request = request.header(MEM_WAL_ENABLED_HEADER, "true");
}
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1810,8 +1777,8 @@ impl TryFrom<MergeInsertBuilder> for MergeInsertRequest {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::{collections::HashMap, pin::Pin};
@@ -1820,25 +1787,27 @@ mod tests {
use crate::table::AddDataMode;
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
use arrow_array::{record_batch, Int32Array, RecordBatch, RecordBatchIterator};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch};
use arrow_schema::{DataType, Field, Schema};
use chrono::{DateTime, Utc};
use futures::{future::BoxFuture, StreamExt, TryFutureExt};
use futures::{StreamExt, TryFutureExt, future::BoxFuture};
use lance_index::scalar::inverted::query::MatchQuery;
use lance_index::scalar::{FullTextSearchQuery, InvertedIndexParams};
use reqwest::Body;
use rstest::rstest;
use serde_json::json;
use crate::index::vector::{IvfFlatIndexBuilder, IvfHnswSqIndexBuilder};
use crate::remote::db::DEFAULT_SERVER_VERSION;
use crate::index::vector::{
IvfFlatIndexBuilder, IvfHnswSqIndexBuilder, IvfRqIndexBuilder, IvfSqIndexBuilder,
};
use crate::remote::JSON_CONTENT_TYPE;
use crate::remote::db::DEFAULT_SERVER_VERSION;
use crate::utils::background_cache::clock;
use crate::{
index::{vector::IvfPqIndexBuilder, Index, IndexStatistics, IndexType},
DistanceType, Error, Table,
index::{Index, IndexStatistics, IndexType, vector::IvfPqIndexBuilder},
query::{ExecutableQuery, QueryBase},
remote::ARROW_FILE_CONTENT_TYPE,
DistanceType, Error, Table,
};
#[tokio::test]
@@ -2067,11 +2036,13 @@ mod tests {
.unwrap(),
"/v1/table/my_table/insert/" => {
assert_eq!(request.method(), "POST");
assert!(request
.url()
.query_pairs()
.filter(|(k, _)| k == "mode")
.all(|(_, v)| v == "append"));
assert!(
request
.url()
.query_pairs()
.filter(|(k, _)| k == "mode")
.all(|(_, v)| v == "append")
);
assert_eq!(
request.headers().get("Content-Type").unwrap(),
ARROW_STREAM_CONTENT_TYPE
@@ -2992,6 +2963,8 @@ mod tests {
"IVF_FLAT",
json!({
"metric_type": "hamming",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfFlat(IvfFlatIndexBuilder::default().distance_type(DistanceType::Hamming)),
),
@@ -3000,6 +2973,8 @@ mod tests {
json!({
"metric_type": "hamming",
"num_partitions": 128,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfFlat(
IvfFlatIndexBuilder::default()
@@ -3011,6 +2986,8 @@ mod tests {
"IVF_PQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfPq(Default::default()),
),
@@ -3020,6 +2997,8 @@ mod tests {
"metric_type": "cosine",
"num_partitions": 128,
"num_bits": 4,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfPq(
IvfPqIndexBuilder::default()
@@ -3028,10 +3007,29 @@ mod tests {
.num_bits(4),
),
),
(
"IVF_PQ",
json!({
"metric_type": "l2",
"num_sub_vectors": 16,
"sample_rate": 512,
"max_iterations": 100,
}),
Index::IvfPq(
IvfPqIndexBuilder::default()
.num_sub_vectors(16)
.sample_rate(512)
.max_iterations(100),
),
),
(
"IVF_HNSW_SQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
"m": 20,
"ef_construction": 300,
}),
Index::IvfHnswSq(Default::default()),
),
@@ -3040,11 +3038,65 @@ mod tests {
json!({
"metric_type": "l2",
"num_partitions": 128,
"sample_rate": 256,
"max_iterations": 50,
"m": 40,
"ef_construction": 500,
}),
Index::IvfHnswSq(
IvfHnswSqIndexBuilder::default()
.distance_type(DistanceType::L2)
.num_partitions(128),
.num_partitions(128)
.num_edges(40)
.ef_construction(500),
),
),
(
"IVF_SQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfSq(Default::default()),
),
(
"IVF_SQ",
json!({
"metric_type": "cosine",
"num_partitions": 64,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfSq(
IvfSqIndexBuilder::default()
.distance_type(DistanceType::Cosine)
.num_partitions(64),
),
),
(
"IVF_RQ",
json!({
"metric_type": "l2",
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfRq(Default::default()),
),
(
"IVF_RQ",
json!({
"metric_type": "cosine",
"num_partitions": 64,
"num_bits": 8,
"sample_rate": 256,
"max_iterations": 50,
}),
Index::IvfRq(
IvfRqIndexBuilder::default()
.distance_type(DistanceType::Cosine)
.num_partitions(64)
.num_bits(8),
),
),
// HNSW_PQ isn't yet supported on SaaS
@@ -3548,7 +3600,7 @@ mod tests {
}
fn _make_table_with_indices(unindexed_rows: usize) -> Table {
let table = Table::new_with_handler("my_table", move |request| {
Table::new_with_handler("my_table", move |request| {
assert_eq!(request.method(), "POST");
let response_body = match request.url().path() {
@@ -3592,8 +3644,7 @@ mod tests {
let body = serde_json::to_string(&response_body).unwrap();
let status = if body == "null" { 404 } else { 200 };
http::Response::builder().status(status).body(body).unwrap()
});
table
})
}
#[tokio::test]
@@ -3804,8 +3855,8 @@ mod tests {
#[tokio::test]
async fn test_uri_caching() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = call_count.clone();

View File

@@ -16,12 +16,12 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Plan
use futures::StreamExt;
use http::header::CONTENT_TYPE;
use crate::Error;
use crate::remote::ARROW_STREAM_CONTENT_TYPE;
use crate::remote::client::{HttpSend, RestfulLanceDbClient, Sender};
use crate::remote::table::RemoteTable;
use crate::remote::ARROW_STREAM_CONTENT_TYPE;
use crate::table::datafusion::insert::COUNT_SCHEMA;
use crate::table::AddResult;
use crate::Error;
use crate::table::datafusion::insert::COUNT_SCHEMA;
/// ExecutionPlan for inserting data into a remote LanceDB table.
///
@@ -309,12 +309,12 @@ mod tests {
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::Table;
use crate::remote::ARROW_STREAM_CONTENT_TYPE;
use crate::table::datafusion::BaseTableAdapter;
use crate::Table;
fn schema_json() -> &'static str {
r#"{"fields": [{"name": "id", "type": {"type": "int32"}, "nullable": true}]}"#

View File

@@ -5,7 +5,7 @@ use arrow_ipc::CompressionType;
use futures::{Stream, StreamExt};
use reqwest::Response;
use crate::{arrow::SendableRecordBatchStream, Result};
use crate::{Result, arrow::SendableRecordBatchStream};
use super::db::ServerVersion;

View File

@@ -14,7 +14,7 @@ use async_trait::async_trait;
use lance::dataset::ROW_ID;
use crate::error::{Error, Result};
use crate::rerankers::{Reranker, RELEVANCE_SCORE};
use crate::rerankers::{RELEVANCE_SCORE, Reranker};
/// Reranks the results using Reciprocal Rank Fusion(RRF) algorithm based
/// on the scores of vector and FTS search.

View File

@@ -8,29 +8,29 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_execution::TaskContext;
use datafusion_expr::Expr;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use datafusion_physical_plan::ExecutionPlan;
use futures::stream::FuturesUnordered;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use futures::StreamExt;
use lance::dataset::builder::DatasetBuilder;
use futures::stream::FuturesUnordered;
pub use lance::dataset::ColumnAlteration;
pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
pub use lance::dataset::Version;
use lance::dataset::WriteMode;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::{InsertBuilder, WriteParams};
use lance::index::vector::utils::infer_vector_dim;
use lance::index::vector::VectorIndexParams;
use lance::index::vector::utils::infer_vector_dim;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lance_datafusion::utils::StreamingWriteSource;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams};
use lance_index::vector::bq::RQBuildParams;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::sq::builder::SQBuildParams;
use lance_index::DatasetIndexExt;
use lance_index::IndexType;
use lance_io::object_store::{LanceNamespaceStorageOptionsProvider, StorageOptionsAccessor};
pub use query::AnyQuery;
@@ -43,19 +43,19 @@ use std::format;
use std::path::Path;
use std::sync::Arc;
use crate::data::scannable::{estimate_write_partitions, PeekedScannable, Scannable};
use crate::data::scannable::{PeekedScannable, Scannable, estimate_write_partitions};
use crate::database::Database;
use crate::embeddings::{EmbeddingDefinition, EmbeddingRegistry, MemoryRegistry};
use crate::error::{Error, Result};
use crate::index::vector::VectorIndex;
use crate::index::IndexStatistics;
use crate::index::{vector::suggested_num_sub_vectors, Index, IndexBuilder};
use crate::index::vector::VectorIndex;
use crate::index::{Index, IndexBuilder, vector::suggested_num_sub_vectors};
use crate::index::{IndexConfig, IndexStatisticsImpl};
use crate::query::{IntoQueryVector, Query, QueryExecutionOptions, TakeQuery, VectorQuery};
use crate::table::datafusion::insert::InsertExec;
use crate::utils::{
supported_bitmap_data_type, supported_btree_data_type, supported_fts_data_type,
supported_label_list_data_type, supported_vector_data_type, PatchReadParam, PatchWriteParam,
PatchReadParam, PatchWriteParam, supported_bitmap_data_type, supported_btree_data_type,
supported_fts_data_type, supported_label_list_data_type, supported_vector_data_type,
};
use self::dataset::DatasetConsistencyWrapper;
@@ -2555,22 +2555,21 @@ pub struct FragmentSummaryStats {
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use arrow_array::{
builder::{ListBuilder, StringBuilder},
Array, BooleanArray, FixedSizeListArray, Int32Array, LargeStringArray, RecordBatch,
RecordBatchIterator, RecordBatchReader, StringArray,
builder::{ListBuilder, StringBuilder},
};
use arrow_array::{BinaryArray, LargeBinaryArray};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lance::Dataset;
use rand::Rng;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use tempfile::tempdir;
use super::*;
@@ -2777,9 +2776,8 @@ mod tests {
false,
)]));
let mut rng = rand::thread_rng();
let float_arr = Float32Array::from(
repeat_with(|| rng.gen::<f32>())
repeat_with(rand::random::<f32>)
.take(512 * dimension as usize)
.collect::<Vec<f32>>(),
);
@@ -2884,8 +2882,8 @@ mod tests {
.await
.unwrap();
use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
use lance::index::DatasetIndexInternalExt;
use lance::index::vector::ivf::v2::IvfPq as LanceIvfPq;
use lance_index::metrics::NoOpMetricsCollector;
use lance_index::vector::VectorIndex as LanceVectorIndex;
@@ -2933,9 +2931,8 @@ mod tests {
false,
)]));
let mut rng = rand::thread_rng();
let float_arr = Float32Array::from(
repeat_with(|| rng.gen::<f32>())
repeat_with(rand::random::<f32>)
.take(512 * dimension as usize)
.collect::<Vec<f32>>(),
);
@@ -2993,9 +2990,8 @@ mod tests {
false,
)]));
let mut rng = rand::thread_rng();
let float_arr = Float32Array::from(
repeat_with(|| rng.gen::<f32>())
repeat_with(rand::random::<f32>)
.take(512 * dimension as usize)
.collect::<Vec<f32>>(),
);
@@ -3256,16 +3252,20 @@ mod tests {
.unwrap();
// Can not create btree or bitmap index on list column
assert!(table
.create_index(&["tags"], Index::BTree(Default::default()))
.execute()
.await
.is_err());
assert!(table
.create_index(&["tags"], Index::Bitmap(Default::default()))
.execute()
.await
.is_err());
assert!(
table
.create_index(&["tags"], Index::BTree(Default::default()))
.execute()
.await
.is_err()
);
assert!(
table
.create_index(&["tags"], Index::Bitmap(Default::default()))
.execute()
.await
.is_err()
);
// Create bitmap index on the "category" column
table

View File

@@ -7,8 +7,8 @@ use arrow_schema::{DataType, Fields, Schema};
use lance::dataset::WriteMode;
use serde::{Deserialize, Serialize};
use crate::data::scannable::scannable_with_embeddings;
use crate::data::scannable::Scannable;
use crate::data::scannable::scannable_with_embeddings;
use crate::embeddings::EmbeddingRegistry;
use crate::table::datafusion::cast::cast_to_table_schema;
use crate::table::datafusion::reject_nan::reject_nan_vectors;
@@ -155,7 +155,9 @@ impl AddDataBuilder {
pub struct PreprocessingOutput {
pub plan: Arc<dyn datafusion_physical_plan::ExecutionPlan>,
#[cfg_attr(not(feature = "remote"), allow(dead_code))]
pub overwrite: bool,
#[cfg_attr(not(feature = "remote"), allow(dead_code))]
pub rescannable: bool,
pub write_options: WriteOptions,
pub mode: AddDataMode,
@@ -202,13 +204,14 @@ mod tests {
use arrow::datatypes::Float64Type;
use arrow_array::{
record_batch, FixedSizeListArray, Float32Array, Int32Array, LargeStringArray, ListArray,
RecordBatch, RecordBatchIterator,
FixedSizeListArray, Float32Array, Int32Array, LargeStringArray, ListArray, RecordBatch,
RecordBatchIterator, record_batch,
};
use arrow_schema::{ArrowError, DataType, Field, Schema};
use futures::TryStreamExt;
use lance::dataset::{WriteMode, WriteParams};
use crate::Error;
use crate::arrow::{SendableRecordBatchStream, SimpleRecordBatchStream};
use crate::connect;
use crate::data::scannable::Scannable;
@@ -218,9 +221,8 @@ mod tests {
use crate::query::{ExecutableQuery, QueryBase, Select};
use crate::table::add_data::NaNVectorBehavior;
use crate::table::{ColumnDefinition, ColumnKind, Table, TableDefinition, WriteOptions};
use crate::test_utils::embeddings::MockEmbed;
use crate::test_utils::TestCustomError;
use crate::Error;
use crate::test_utils::embeddings::MockEmbed;
use super::AddDataMode;

View File

@@ -17,17 +17,17 @@ use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider};
use datafusion_common::{DataFusionError, Result as DataFusionResult, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{dml::InsertOp, Expr, TableProviderFilterPushDown, TableType};
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType, dml::InsertOp};
use datafusion_physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, stream::RecordBatchStreamAdapter,
};
use futures::{TryFutureExt, TryStreamExt};
use lance::dataset::{WriteMode, WriteParams};
use super::{AnyQuery, BaseTable};
use crate::{
query::{QueryExecutionOptions, QueryFilter, QueryRequest, Select},
Result,
query::{QueryExecutionOptions, QueryFilter, QueryRequest, Select},
};
use arrow_schema::{DataType, Field};
use lance_index::scalar::FullTextSearchQuery;
@@ -268,7 +268,7 @@ impl TableProvider for BaseTableAdapter {
InsertOp::Replace => {
return Err(DataFusionError::NotImplemented(
"Replace mode is not supported for LanceDB tables".to_string(),
))
));
}
};
@@ -300,13 +300,13 @@ pub mod tests {
use datafusion_catalog::TableProvider;
use datafusion_common::stats::Precision;
use datafusion_execution::SendableRecordBatchStream;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, col, lit};
use futures::{StreamExt, TryStreamExt};
use tempfile::tempdir;
use crate::{
connect,
index::{scalar::BTreeIndexBuilder, Index},
index::{Index, scalar::BTreeIndexBuilder},
table::datafusion::BaseTableAdapter,
};

View File

@@ -5,10 +5,10 @@ use std::sync::Arc;
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema};
use datafusion::functions::core::{get_field, named_struct};
use datafusion_common::config::ConfigOptions;
use datafusion_common::ScalarValue;
use datafusion_physical_expr::expressions::{cast, Literal};
use datafusion_common::config::ConfigOptions;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::{Literal, cast};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};

View File

@@ -16,9 +16,9 @@ use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
use lance::Dataset;
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{CommitBuilder, InsertBuilder, WriteParams};
use lance::Dataset;
use lance_table::format::Fragment;
use crate::table::dataset::DatasetConsistencyWrapper;
@@ -195,13 +195,13 @@ impl ExecutionPlan for InsertExec {
}
};
if let Some(transactions) = to_commit {
if let Some(merged_txn) = merge_transactions(transactions) {
let new_dataset = CommitBuilder::new(dataset.clone())
.execute(merged_txn)
.await?;
ds_wrapper.update(new_dataset);
}
if let Some(transactions) = to_commit
&& let Some(merged_txn) = merge_transactions(transactions)
{
let new_dataset = CommitBuilder::new(dataset.clone())
.execute(merged_txn)
.await?;
ds_wrapper.update(new_dataset);
}
Ok(RecordBatch::try_new(
@@ -222,7 +222,7 @@ mod tests {
use std::vec;
use super::*;
use arrow_array::{record_batch, RecordBatchIterator};
use arrow_array::{RecordBatchIterator, record_batch};
use datafusion::prelude::SessionContext;
use datafusion_catalog::MemTable;
use tempfile::tempdir;

View File

@@ -4,11 +4,11 @@
use core::fmt;
use std::sync::{Arc, Mutex};
use datafusion_common::{stats::Precision, DataFusionError, Result as DFResult, Statistics};
use datafusion_common::{DataFusionError, Result as DFResult, Statistics, stats::Precision};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_plan::{
execution_plan::EmissionType, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execution_plan::EmissionType,
};
use crate::{arrow::SendableRecordBatchStreamExt, data::scannable::Scannable};

Some files were not shown because too many files have changed in this diff Show More