Compare commits

..

20 Commits

Author SHA1 Message Date
dependabot[bot]
412fd2afa6 chore(deps): bump aws-smithy-runtime in the rust-minor-patch group
Bumps the rust-minor-patch group with 1 update: [aws-smithy-runtime](https://github.com/smithy-lang/smithy-rs).


Updates `aws-smithy-runtime` from 1.11.1 to 1.11.3
- [Release notes](https://github.com/smithy-lang/smithy-rs/releases)
- [Changelog](https://github.com/smithy-lang/smithy-rs/blob/main/CHANGELOG.md)
- [Commits](https://github.com/smithy-lang/smithy-rs/commits)

---
updated-dependencies:
- dependency-name: aws-smithy-runtime
  dependency-version: 1.11.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: rust-minor-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-20 16:14:44 +00:00
dependabot[bot]
2d5298b6ee chore(deps): bump the rust-minor-patch group across 1 directory with 23 updates (#3382)
Weekly dependabot refresh of `Cargo.lock`.

Dependabot's original PR also raised the lower-bound version
requirements
in `Cargo.toml` (arrow, tokio, aws-sdk-*, etc.) to match the new
lockfile
versions. That forces our library's consumers onto newer minimum
versions and broke the MSRV check, which downgrades aws-sdk-* crates to
verify they still build on Rust 1.91.

Changes from the original:

- Reverted all `Cargo.toml` requirement changes; `Cargo.lock`
regenerated
  with `cargo update` within the existing ranges. The lockfile (and the
  binaries we ship) stays current on security fixes without bumping our
  public minimum versions.
- Set `versioning-strategy: lockfile-only` in `.github/dependabot.yml`
so
  future cargo dependabot PRs only touch `Cargo.lock`.

Note: `aws-lc-rs` stays at 1.16.3 — `nodejs/Cargo.toml` pins it with
`=`,
which `lockfile-only` cannot move; bumping it needs a manual change.

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Will Jones <will.jones127@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Will Jones <willjones127@gmail.com>
2026-05-20 09:09:39 -07:00
Brendan Clement
4cb9147bbf feat(nodejs): add renameTable on Connection (#3386)
Adds `Connection.renameTable` to the Node SDK. Closes #3381.
2026-05-20 09:05:48 -07:00
Xuanwo
54a1982ef1 docs: document Python uv agent workflow (#3417) 2026-05-20 21:35:42 +08:00
Xuanwo
5bfde47a8e fix: support nested field paths in native index creation (#3408)
Native index creation was resolving requested columns through top-level
Arrow schema lookup before handing the request to Lance, which rejected
nested paths and could collapse a nested field to its leaf name. This PR
resolves index targets with Lance field-path semantics, passes the
canonical path through to Lance, and reports indexed columns from field
ids as canonical full paths.

This also removes the Python native FTS guard that rejected dotted paths
so scalar, vector, and FTS index creation share the same nested-field
contract. Related to #3402.
2026-05-20 11:15:15 +08:00
Brendan Clement
049b0c8f09 feat(nodejs): add progress to Table.add (#3398)
### Summary

- Add an optional `progress` callback to `Table.add(data, { progress
})`. Callback fires once per batch written and once more with `done:
true` when the write completes.
- Errors thrown from the user's callback are logged with `console.warn`
and swallowed

### Testing
- npm test 
- ran smoke test script to verify functionality
2026-05-19 18:35:07 -07:00
Vishal Kumar Singh
20556e23a9 docs: add missing Python index classes to API reference (#3392)
Adds three index configuration classes to the Python API Reference that
were missing from the documentation:

- `IvfSq` - IVF Scalar Quantization index
- `IvfRq` - IVF RabitQ Quantization index
- `HnswFlat` - HNSW without quantization (stores raw vectors)

These classes are exported in `lancedb.index.__all__` and have complete
docstrings in the source, but weren't showing up in the rendered docs at
https://lancedb.github.io/lancedb/python/python/#indices-asynchronous.

Closes #1855
2026-05-19 16:06:41 -07:00
Weston Pace
01e272c0b0 fix(rust): match embedding scannable columns by name (#3410)
Fixes #3136.

## Summary

- `WithEmbeddingsScannable::scan_as_stream` matched columns positionally
  against the table schema, so a `CastError` was raised whenever the
  computed batch order differed from the table schema order.
- The mismatch surfaced when `add_columns` added a new physical column
  **after** an embedding column: the table schema became
  `[..., embedding, extra]`, but `compute_embeddings_for_batch` always
  appends embeddings at the end, producing `[..., extra, embedding]`.
  Position 2 then tried to cast e.g. `score: Float64` →
  `embedding: FixedSizeList` and failed.
- Now we look each output column up by name in the result batch, which
  is order-independent. If a non-embedding column required by the table
  schema is missing from the input, we return a clear `InvalidInput`
  error instead of a confusing cast error.

## Reproduction (from the issue)

```text
Table created with:           [id, text, text_vec(embedding)]
add_columns("score")        → schema: [id, text, text_vec, score]
table.add([id, text, score]) → BEFORE: CastError on position 2
                               AFTER:  succeeds, embedding is computed
```

## Tests

-
`data::scannable.rs::test_with_embeddings_scannable_column_added_after_embedding`
  — unit test exercising the exact column-order mismatch via
  `WithEmbeddingsScannable::with_schema`.
-
`data::scannable.rs::test_with_embeddings_scannable_missing_required_column`
  — covers the new "missing column" error path.
- `table::add_data.rs::test_add_with_embeddings_after_add_columns`
  — end-to-end regression test mirroring the reproduction in the issue
  (create table with embedding → `add_columns` → `table.add`).

## Test plan

- [x] `cargo check --quiet --features remote --tests --examples`
- [x] `cargo clippy --quiet --features remote --tests --examples`
- [x] `cargo fmt --all`
- [x] `cargo test --quiet --features remote -p lancedb embedding` — 18
tests pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 15:08:12 -07:00
Xuanwo
ad1634a0a5 docs: document CI preflight requirements (#3409)
This updates the agent instructions to codify the CI gates that failed
before code review started: PR titles must satisfy Conventional Commits,
and Python changes need root-level ruff format/lint checks.

It also makes the touched-language preflight explicit so mixed Rust,
Python, and TypeScript changes run the checks CI expects before opening
a PR.
2026-05-19 08:51:29 -07:00
Yang Cen
5d1c28922a feat(python): align to_pandas pandas kwargs (#3397)
## Feature

This PR aligns LanceDB Python `to_pandas()` APIs with Lance pandas
conversion capabilities while keeping LanceDB query-specific semantics
intact.

- Adds `blob_mode` and pandas `**kwargs` support to local table
`to_pandas()`.
- Delegates local `LanceTable.to_pandas()` to Lance dataset
`to_pandas(blob_mode=..., **kwargs)`.
- Keeps remote table `to_pandas()` unsupported with
`NotImplementedError`.
- Allows sync and async query `to_pandas()` to forward pandas kwargs
after LanceDB `flatten` and `timeout` handling.

Why we need this feature:

Users can access Lance blob-aware pandas conversion from LanceDB local
tables and can pass PyArrow pandas conversion options through
table/query APIs without losing existing `flatten` or `timeout`
behavior.

How it works:

The table API exposes a `BlobMode` literal type for `lazy`, `bytes`, and
`descriptions`. Local tables call through to the backing Lance dataset.
Query APIs do not add `blob_mode`; they materialize Arrow results, apply
LanceDB flattening when requested, and then call `to_pandas(**kwargs)`.

## Validation

- `uv run --frozen --extra tests pytest
python/tests/test_table.py::test_table_to_pandas_default_matches_arrow
python/tests/test_table.py::test_table_to_pandas_blob_bytes
python/tests/test_table.py::test_table_to_pandas_kwargs
python/tests/test_query.py::test_query_to_pandas_kwargs
python/tests/test_query.py::test_query_timeout
python/tests/test_remote_db.py::test_table_to_pandas_not_supported`
- `uv run --frozen --extra dev ruff check python/lancedb/table.py
python/lancedb/query.py python/lancedb/remote/table.py
python/tests/test_table.py python/tests/test_query.py
python/tests/test_remote_db.py`
- `uv run --frozen --extra tests pytest python/tests/test_table.py
python/tests/test_query.py python/tests/test_remote_db.py`

Note: `python/uv.lock` was intentionally not committed in this branch.
2026-05-19 20:05:51 +08:00
Lance Release
53c2164b84 Bump version: 0.29.0 → 0.29.1-beta.0 2026-05-18 22:07:52 +00:00
Lance Release
6286ee8192 Bump version: 0.32.0 → 0.32.1-beta.0 2026-05-18 22:06:40 +00:00
LanceDB Robot
af8ca2ad5e chore: update lance dependency to v7.0.0-beta.13 (#3399)
## Summary
- Bump Lance Rust workspace dependencies to `v7.0.0-beta.13` using
`ci/set_lance_version.py`.
- Update the Java `lance-core` dependency property to `7.0.0-beta.13`.
- Triggering tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.13

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-05-18 13:19:32 -07:00
Drew Gallardo
aac6c62459 feat(python): add public take_offsets method on Permutation (#3375)
Closes #3243.

This PR exposes a new public api `Permutation.take_offsets(offsets:
list[int])`, since users initially had to call __getitems__ directly to
batch-fetch rows by position.

Currently, the name matches the existing `Table.take_offsets` pattern,
and now the dunder `__getitem__` and `__getitems__` now delegate to it.

Also, fixes a parse error when `PermutationReader::take_offsets` gets an
empty list. Now returns an empty `RecordBatch` with the correct schema
instead. Bundled this because without the fix the new public API blows
up on a perfectly reasonable input.

`__getitems__` is preserved since PyTorch's batched DataLoader requires
it.

### Testing

- Added 3 new Rust tests for empty offsets including permutation table
with Select::All, Select::Columns, and identity path
- Added 3 new Python tests for the public API including a happy case,
and empty input on both identity and permutation

clippy, format, check all clean!

cc: @westonpace
2026-05-18 09:35:56 -07:00
Weston Pace
8df2fff75f ci: bump version after 0.29 release (#3378)
The 0.29 release happened on a branch because the main line had already
moved past the 6.0.0 stable lance release. As a result the version bump
commits ended up on the branch. This merges those commits back into
main.

---------

Co-authored-by: Lance Release <lance-dev@lancedb.com>
2026-05-18 05:34:33 -07:00
Heng Ge
0d30b31998 feat: support setting LSM write spec for a table (#3396)
## Summary

Split out from #3354

Adds `LsmWriteSpec` and `Table::set_lsm_write_spec` /
`unset_lsm_write_spec` to
install and clear the spec that selects Lance's MemWAL LSM-style write
path for
`merge_insert`.

`LsmWriteSpec` offers three sharding strategies, all built on Lance's
`InitializeMemWalBuilder`:

- `LsmWriteSpec::bucket(column, num_buckets)` — hash-bucket sharding by
the
  single-column unenforced primary key.
- `LsmWriteSpec::identity(column)` — identity sharding by the raw value
of a
  scalar column.
- `LsmWriteSpec::unsharded()` — a single MemWAL shard.

Each can be refined with `with_maintained_indexes(...)` (indexes the
MemWAL
keeps up to date as rows are appended) and
`with_writer_config_defaults(...)`
(default `ShardWriter` configuration recorded in the MemWAL index, so
every
writer starts from the same defaults). All variants require the table to
have
an unenforced primary key.

- `set_lsm_write_spec` installs the spec by initializing the MemWAL
index;
`unset_lsm_write_spec` removes it (dropping the MemWAL index), reverting
to
  the standard `merge_insert` path. `unset` is idempotent.
- Bindings: Python (`LsmWriteSpec.bucket` / `.identity` / `.unsharded`,
  `set_lsm_write_spec` / `unset_lsm_write_spec`) and TypeScript
  (`setLsmWriteSpec` with `specType` `"bucket"` / `"identity"` /
  `"unsharded"`). `RemoteTable` returns `NotSupported`.

The actual `merge_insert` LSM dispatch and `ShardWriter` write path are
a
follow-up — this PR only installs and clears the spec.
2026-05-18 00:11:33 -07:00
Heng Ge
6a431ff0a0 feat: support setting unenforced primary key (#3394)
## Summary

Adds `Table::set_unenforced_primary_key` — records a single column as
the
table's unenforced primary key in Lance schema field metadata.
"Unenforced"
means LanceDB does not check uniqueness on write; the key is metadata
that
`merge_insert` consumes.

- Single-column only; the column must exist and have a supported dtype
(Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary).
The
API accepts an iterable for binding ergonomics but requires exactly one
  column — compound keys are rejected.
- The primary key is immutable: calling this on a table that already has
an
unenforced primary key is rejected. Concurrent writers racing to set the
key
  fail at commit time rather than silently overriding it.
- `RemoteTable` returns `NotSupported`.
- Bindings: Python (`AsyncTable`, `LanceTable`, `RemoteTable`) and
TypeScript
  (`Table.setUnenforcedPrimaryKey`).

## Context

Split out from #3354 per review feedback, so the unenforced primary key
and the
`merge_insert` sharding spec land as separate reviewable PRs.

No Lance dependency bump — `main` is already on v7.0.0-beta.10, which
includes
the field-metadata round-trip fix the API relies on. Enforcing
primary-key
immutability at the Lance commit layer (so the cross-column concurrent
race is
also rejected) is a companion Lance change: lance-format/lance#6810.
2026-05-16 23:12:55 -07:00
Xin Sun
ab2c5adf5e feat(nodejs): add order_by method to Query (#3123) 2026-05-16 22:49:08 -07:00
LanceDB Robot
f02c4cad90 chore: update lance dependency to v7.0.0-beta.10 (#3393)
## Summary
- Update Lance Rust dependencies to `7.0.0-beta.10` using
`ci/set_lance_version.py`.
- Update Java `lance-core.version` to `7.0.0-beta.10`.
- Refresh `Cargo.lock` for the new Lance tag.

Triggering tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.10

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-05-16 11:58:45 -05:00
LanceDB Robot
7b74c3dd91 chore: update lance dependency to v7.0.0-beta.9 (#3391)
## Summary
- Update Lance Rust workspace dependencies from v7.0.0-beta.7 to
v7.0.0-beta.9 using `ci/set_lance_version.py`.
- Update the Java `lance-core.version` property to `7.0.0-beta.9`.
- Refresh `Cargo.lock` for the Lance dependency bump.

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`

Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.9

---------

Co-authored-by: Daniel Rammer <hamersaw@protonmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 12:56:29 -05:00
73 changed files with 15790 additions and 656 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.28.0-beta.11"
current_version = "0.29.1-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -11,6 +11,11 @@ updates:
schedule:
interval: weekly
open-pull-requests-limit: 10
# Only update Cargo.lock, never widen/raise the version requirements in
# Cargo.toml. The goal is keeping the lockfile (and the binaries we ship)
# current on security fixes, not forcing our library's consumers onto
# newer minimum versions.
versioning-strategy: lockfile-only
groups:
rust-minor-patch:
update-types:

View File

@@ -157,7 +157,10 @@ jobs:
npx jest --testEnvironment jest-environment-node-single-context --verbose
macos:
timeout-minutes: 30
runs-on: "macos-14"
# macos-15 ships a newer linker; the older macos-14 linker fails to insert
# branch islands when the debug cdylib's __text section exceeds the 128 MB
# AArch64 B/BL branch range.
runs-on: "macos-15"
defaults:
run:
shell: bash

View File

@@ -205,7 +205,7 @@ jobs:
- name: Delete wheels
run: rm -rf target/wheels
pydantic1x:
timeout-minutes: 30
timeout-minutes: 60
runs-on: "ubuntu-24.04"
defaults:
run:

View File

@@ -233,6 +233,26 @@ jobs:
cargo update -p aws-sdk-sso --precise 1.62.0
cargo update -p aws-sdk-ssooidc --precise 1.63.0
cargo update -p aws-sdk-sts --precise 1.63.0
# aws-runtime/sigv4/credential-types/types and the aws-smithy-*
# crates bumped their MSRV to 1.91.1 in late 2026; pin to the last
# 1.91.0-compatible versions. The order matters — each downgrade
# only succeeds once everything that still pins it at a higher
# version has itself been downgraded.
cargo update -p aws-runtime --precise 1.5.12
cargo update -p aws-types --precise 1.3.9
cargo update -p aws-sigv4 --precise 1.3.5
cargo update -p aws-credential-types --precise 1.2.8
cargo update -p aws-smithy-checksums --precise 0.63.9
cargo update -p aws-smithy-runtime --precise 1.9.3
cargo update -p aws-smithy-http --precise 0.62.4
cargo update -p aws-smithy-eventstream --precise 0.60.12
cargo update -p aws-smithy-http-client --precise 1.1.3
cargo update -p aws-smithy-observability --precise 0.1.4
cargo update -p aws-smithy-query --precise 0.60.8
cargo update -p aws-smithy-runtime-api --precise 1.9.1
cargo update -p aws-smithy-async --precise 1.2.6
cargo update -p aws-smithy-types --precise 1.3.5
cargo update -p aws-smithy-xml --precise 0.60.11
cargo update -p home --precise 0.5.9
- name: cargo +${{ matrix.msrv }} check
env:

View File

@@ -17,9 +17,30 @@ Common commands:
* Run tests: `cargo test --quiet --features remote --tests`
* Run specific test: `cargo test --quiet --features remote -p <package_name> --test <test_name>`
* Lint: `cargo clippy --quiet --features remote --tests --examples`
* Format: `cargo fmt --all`
* Format Rust: `cargo fmt --all`
* Format Python: `ruff format .`
* Lint Python: `ruff check .`
* Bootstrap Python dev env: `cd python && uv run --extra tests --extra dev maturin develop --extras tests,dev`
* Run Python tests: `cd python && uv run --extra tests pytest python/tests -vv --durations=10 -m "not slow and not s3_test"`
* Run specific Python test: `cd python && uv run --extra tests pytest python/tests/<test_file>.py::<test_name> -q`
Before committing changes, run formatting.
For Python validation, prefer the uv-managed environment declared by `python/uv.lock`.
Do not treat system `python`, global `pytest`, or missing editable-install errors as
final blockers; bootstrap or enter the uv environment instead. If `lancedb._lancedb`
is missing or stale, or if Rust/PyO3 binding code changed, rebuild the Python
extension with the bootstrap command above before running tests.
Before committing changes, run formatting for every language you touched. At minimum:
* Rust changes: run `cargo fmt --all`.
* Python changes: run `ruff format .` and `ruff check .` from the repository root,
and run targeted tests through `cd python && uv run ...`.
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
Before creating a PR, make sure the PR title follows Conventional Commits, such as
`fix: support nested field paths in native index creation` or
`feat(python): add dataset multiprocessing support`. The semantic-release check uses the
PR title and body as the merge commit message, so a non-conventional PR title will fail CI.
## Coding tips

1186
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.0.0-beta.9", default-features = false, "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.9", default-features = false, "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.9", default-features = false, "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.9", "tag" = "v7.0.0-beta.9", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.28.0-beta.11</version>
<version>0.29.1-beta.0</version>
</dependency>
```

View File

@@ -441,18 +441,28 @@ Open a table in the database.
```ts
abstract renameTable(
oldName,
currentName,
newName,
namespacePath?): Promise<void>
options?): Promise<void>
```
Rename a table.
Currently only supported by LanceDB Cloud. Local OSS connections and
namespace-backed connections (via [connectNamespace](../functions/connectNamespace.md)) reject with
a "not supported" error.
#### Parameters
* **oldName**: `string`
* **currentName**: `string`
The current name of the table.
* **newName**: `string`
The new name for the table.
* **namespacePath?**: `string`[]
* **options?**: [`RenameTableOptions`](../interfaces/RenameTableOptions.md)
Optional namespace paths. When
`newNamespacePath` is omitted the table stays in `namespacePath`.
#### Returns

View File

@@ -343,6 +343,30 @@ This is useful for pagination.
***
### orderBy()
```ts
orderBy(ordering): this
```
Sort the results by the specified column(s).
#### Parameters
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
#### Returns
`this`
This query builder.
#### Inherited from
`StandardQueryBase.orderBy`
***
### outputSchema()
```ts

View File

@@ -690,6 +690,74 @@ of the given query
***
### setLsmWriteSpec()
```ts
abstract setLsmWriteSpec(spec): Promise<void>
```
Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL
LSM-style write path for future `mergeInsert` calls.
`LsmWriteSpec` chooses one of three sharding strategies via `specType`:
- `"bucket"` — hash-bucket writes by the single-column unenforced primary
key (`column` and `numBuckets` required).
- `"identity"` — shard by the raw value of a scalar `column`.
- `"unsharded"` — route every write to a single shard.
All variants require the table to have an unenforced primary key
([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally
requires it to be the single column being bucketed.
#### Parameters
* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md)
The sharding spec to install.
#### Returns
`Promise`&lt;`void`&gt;
#### Example
```ts
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 16,
maintainedIndexes: ["id_idx"],
});
```
***
### setUnenforcedPrimaryKey()
```ts
abstract setUnenforcedPrimaryKey(columns): Promise<void>
```
Set the unenforced primary key for this table to a single column.
"Unenforced" means LanceDB does not check uniqueness on writes; the
column is recorded in the schema as the primary key for use by features
such as `merge_insert`. Only single-column primary keys are supported,
and the key cannot be changed once set.
#### Parameters
* **columns**: `string` \| `string`[]
The primary key column. A one-element
array is also accepted; passing more than one column is rejected.
#### Returns
`Promise`&lt;`void`&gt;
***
### stats()
```ts
@@ -793,6 +861,23 @@ Return the table as an arrow table
***
### unsetLsmWriteSpec()
```ts
abstract unsetLsmWriteSpec(): Promise<void>
```
Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard
`mergeInsert` write path.
Errors if no spec is currently set.
#### Returns
`Promise`&lt;`void`&gt;
***
### update()
#### update(opts)

View File

@@ -498,6 +498,30 @@ This is useful for pagination.
***
### orderBy()
```ts
orderBy(ordering): this
```
Sort the results by the specified column(s).
#### Parameters
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
#### Returns
`this`
This query builder.
#### Inherited from
`StandardQueryBase.orderBy`
***
### outputSchema()
```ts

View File

@@ -51,6 +51,7 @@
- [AlterColumnsResult](interfaces/AlterColumnsResult.md)
- [ClientConfig](interfaces/ClientConfig.md)
- [ColumnAlteration](interfaces/ColumnAlteration.md)
- [ColumnOrdering](interfaces/ColumnOrdering.md)
- [CompactionStats](interfaces/CompactionStats.md)
- [ConnectNamespaceOptions](interfaces/ConnectNamespaceOptions.md)
- [ConnectionOptions](interfaces/ConnectionOptions.md)
@@ -79,12 +80,14 @@
- [IvfRqOptions](interfaces/IvfRqOptions.md)
- [ListNamespacesOptions](interfaces/ListNamespacesOptions.md)
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
- [MergeResult](interfaces/MergeResult.md)
- [OpenTableOptions](interfaces/OpenTableOptions.md)
- [OptimizeOptions](interfaces/OptimizeOptions.md)
- [OptimizeStats](interfaces/OptimizeStats.md)
- [QueryExecutionOptions](interfaces/QueryExecutionOptions.md)
- [RemovalStats](interfaces/RemovalStats.md)
- [RenameTableOptions](interfaces/RenameTableOptions.md)
- [RestNamespaceConfig](interfaces/RestNamespaceConfig.md)
- [RetryConfig](interfaces/RetryConfig.md)
- [ScannableOptions](interfaces/ScannableOptions.md)
@@ -102,6 +105,7 @@
- [UpdateResult](interfaces/UpdateResult.md)
- [Version](interfaces/Version.md)
- [WriteExecutionOptions](interfaces/WriteExecutionOptions.md)
- [WriteProgress](interfaces/WriteProgress.md)
## Type Aliases

View File

@@ -19,3 +19,39 @@ mode: "append" | "overwrite";
If "append" (the default) then the new data will be added to the table
If "overwrite" then the new data will replace the existing data in the table.
***
### progress()
```ts
progress: (progress) => void;
```
Optional callback invoked periodically with write progress.
The callback is fired once per batch written and once more with
`done: true` when the write completes. Calls are dispatched
asynchronously to the JS event loop and never block the write — a slow
callback will queue events rather than back-pressure the writer.
Errors thrown from the callback are logged with `console.warn` and
swallowed — they do not abort the write.
#### Parameters
* **progress**: [`WriteProgress`](WriteProgress.md)
#### Returns
`void`
#### Example
```ts
await table.add(data, {
progress: (p) => {
console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
},
});
```

View File

@@ -0,0 +1,31 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / ColumnOrdering
# Interface: ColumnOrdering
## Properties
### ascending?
```ts
optional ascending: boolean;
```
***
### columnName
```ts
columnName: string;
```
***
### nullsFirst?
```ts
optional nullsFirst: boolean;
```

View File

@@ -0,0 +1,64 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / LsmWriteSpec
# Interface: LsmWriteSpec
Specification selecting Lance's MemWAL LSM-style write path for
`mergeInsert`.
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
`column` and `numBuckets` are required; for `"identity"`, `column` is
required.
## Properties
### column?
```ts
optional column: string;
```
Bucket and identity variants: the sharding column.
***
### maintainedIndexes?
```ts
optional maintainedIndexes: string[];
```
Names of indexes the MemWAL should keep up to date during writes.
***
### numBuckets?
```ts
optional numBuckets: number;
```
Bucket variant: the number of buckets, in `[1, 1024]`.
***
### specType
```ts
specType: "bucket" | "identity" | "unsharded";
```
One of `"bucket"`, `"identity"`, or `"unsharded"`.
***
### writerConfigDefaults?
```ts
optional writerConfigDefaults: Record<string, string>;
```
Default `ShardWriter` configuration recorded in the MemWAL index.

View File

@@ -0,0 +1,29 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / RenameTableOptions
# Interface: RenameTableOptions
## Properties
### namespacePath?
```ts
optional namespacePath: string[];
```
The namespace path of the table being renamed. Defaults to the root
namespace (`[]`) when omitted.
***
### newNamespacePath?
```ts
optional newNamespacePath: string[];
```
The namespace path to move the table to as part of the rename. When
omitted the table stays in `namespacePath`.

View File

@@ -0,0 +1,84 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / WriteProgress
# Interface: WriteProgress
Progress snapshot for a write operation, delivered to the `progress`
callback passed to [Table.add](../classes/Table.md#add).
## Properties
### activeTasks
```ts
activeTasks: number;
```
Number of parallel write tasks currently in flight.
***
### done
```ts
done: boolean;
```
`true` for the final callback; `false` otherwise.
***
### elapsedSeconds
```ts
elapsedSeconds: number;
```
Wall-clock seconds since the write started.
***
### outputBytes
```ts
outputBytes: number;
```
Number of bytes written so far.
***
### outputRows
```ts
outputRows: number;
```
Number of rows written so far.
***
### totalRows?
```ts
optional totalRows: number;
```
Total rows expected, when the input source reports it.
Always set on the final callback (the one with `done: true`), falling
back to the actual number of rows written when the source could not
report a row count up front.
***
### totalTasks
```ts
totalTasks: number;
```
Total number of parallel write tasks (the write parallelism).

View File

@@ -166,6 +166,12 @@ lists the indices that LanceDb supports.
::: lancedb.index.IvfFlat
::: lancedb.index.IvfSq
::: lancedb.index.IvfRq
::: lancedb.index.HnswFlat
::: lancedb.table.IndexStatistics
## Querying (Asynchronous)

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.11</version>
<version>0.29.1-beta.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.11</version>
<version>0.29.1-beta.0</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.0.0-beta.9</lance-core.version>
<lance-core.version>7.0.0-beta.13</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.28.0-beta.11"
version = "0.29.1-beta.0"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -47,6 +47,14 @@ describe("given a connection", () => {
await db.close();
expect(db.isOpen()).toBe(false);
await expect(db.tableNames()).rejects.toThrow("Connection is closed");
await expect(db.renameTable("a", "b")).rejects.toThrow(
"Connection is closed",
);
});
it("should report renameTable as unsupported on an OSS connection", async () => {
await db.createTable("a", [{ id: 1 }]);
await expect(db.renameTable("a", "b")).rejects.toThrow(/not supported/);
});
it("should be able to create a table from an object arg `createTable(options)`, or args `createTable(name, data, options)`", async () => {
let tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
@@ -81,16 +89,6 @@ describe("given a connection", () => {
await db.createTable("test4", [{ id: 1 }, { id: 2 }]);
});
it("should expose renameTable and reject on OSS listing DB", async () => {
await db.createTable("old_name", [{ id: 1 }]);
await expect(db.renameTable("old_name", "new_name")).rejects.toThrow(
"rename_table is not supported in LanceDB OSS",
);
await expect(db.tableNames()).resolves.toEqual(["old_name"]);
});
it("should fail if creating table twice, unless overwrite is true", async () => {
let tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
await expect(tbl.countRows()).resolves.toBe(2);

View File

@@ -109,3 +109,209 @@ describe("Query outputSchema", () => {
expect(schema.fields.length).toBe(3);
});
});
describe("Query orderBy", () => {
let tmpDir: tmp.DirResult;
let table: Table;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const db = await connect(tmpDir.name);
// Create table with numeric data for sorting
const schema = new Schema([
new Field("id", new Int64(), true),
new Field("score", new Float32(), true),
new Field("name", new Utf8(), true),
]);
const data = makeArrowTable(
[
{ id: 1n, score: 3.5, name: "charlie" },
{ id: 2n, score: 1.2, name: "alice" },
{ id: 3n, score: 2.8, name: "bob" },
{ id: 4n, score: 0.5, name: "david" },
{ id: 5n, score: 4.1, name: "eve" },
],
{ schema },
);
table = await db.createTable("test", data);
});
afterEach(() => {
tmpDir.removeCallback();
});
it("should sort by single column ascending", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: true, nullsFirst: false })
.toArray();
expect(results.length).toBe(5);
// Verify ascending order
expect(results[0].score).toBeCloseTo(0.5, 0.001);
expect(results[1].score).toBeCloseTo(1.2, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(3.5, 0.001);
expect(results[4].score).toBeCloseTo(4.1, 0.001);
});
it("should sort by single column descending", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: false, nullsFirst: false })
.toArray();
expect(results.length).toBe(5);
// Verify descending order
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(1.2, 0.001);
expect(results[4].score).toBeCloseTo(0.5, 0.001);
});
it("should use ascending as default direction", async () => {
const results = await table
.query()
.orderBy({ columnName: "score" })
.toArray();
expect(results.length).toBe(5);
// Verify ascending order (default)
expect(results[0].score).toBeCloseTo(0.5, 0.001);
expect(results[1].score).toBeCloseTo(1.2, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(3.5, 0.001);
expect(results[4].score).toBeCloseTo(4.1, 0.001);
});
it("should sort by string column", async () => {
const results = await table
.query()
.orderBy({ columnName: "name" })
.toArray();
expect(results.length).toBe(5);
// Verify alphabetical order
expect(results[0].name).toBe("alice");
expect(results[1].name).toBe("bob");
expect(results[2].name).toBe("charlie");
expect(results[3].name).toBe("david");
expect(results[4].name).toBe("eve");
});
it("should support method chaining with where", async () => {
const results = await table
.query()
.where("score > 2.0")
.orderBy({ columnName: "score" })
.toArray();
expect(results.length).toBe(3);
// Verify filtered and sorted
expect(results[0].score).toBeCloseTo(2.8, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(4.1, 0.001);
});
it("should support method chaining with limit", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: false })
.limit(3)
.toArray();
expect(results.length).toBe(3);
// Verify top 3 in descending order
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
});
it("should support method chaining with offset", async () => {
const results = await table
.query()
.orderBy({ columnName: "score" })
.offset(2)
.limit(2)
.toArray();
expect(results.length).toBe(2);
// Verify results skip first 2 and take next 2
expect(results[0].score).toBeCloseTo(2.8, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
});
it("should support method chaining with select", async () => {
const results = await table
.query()
.orderBy({ columnName: "name" })
.select(["name", "score"])
.toArray();
expect(results.length).toBe(5);
// Verify only selected columns are present
expect(Object.keys(results[0])).toEqual(["name", "score"]);
expect(Object.keys(results[4])).toEqual(["name", "score"]);
// Verify sorted by name
expect(results[0].name).toBe("alice");
expect(results[4].name).toBe("eve");
});
it("should support complex method chaining", async () => {
const results = await table
.query()
.where("score > 1.0")
.orderBy({ columnName: "score", ascending: false })
.limit(3)
.select(["id", "score", "name"])
.toArray();
expect(results.length).toBe(3);
// Verify filtered, sorted, limited, and projected
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(Object.keys(results[0])).toEqual(["id", "score", "name"]);
});
it("should support multi-column ordering and null placement", async () => {
const schema = new Schema([
new Field("group", new Int64(), true),
new Field("score", new Float32(), true),
new Field("name", new Utf8(), true),
]);
const data = makeArrowTable(
[
{ group: 1n, score: null, name: "z" },
{ group: 1n, score: 1.0, name: "b" },
{ group: 1n, score: 1.0, name: "a" },
{ group: 2n, score: 0.5, name: "c" },
],
{ schema },
);
const nullTable = await (await connect(tmpDir.name)).createTable(
"test_multi_order",
data,
{ mode: "overwrite" },
);
const results = await nullTable
.query()
.orderBy([
{ columnName: "group", ascending: true, nullsFirst: false },
{ columnName: "score", ascending: true, nullsFirst: true },
{ columnName: "name", ascending: true, nullsFirst: false },
])
.toArray();
expect(results.map((r) => [r.group, r.score, r.name])).toEqual([
[1n, null, "z"],
[1n, 1.0, "a"],
[1n, 1.0, "b"],
[2n, 0.5, "c"],
]);
});
});

View File

@@ -617,4 +617,68 @@ describe("remote connection", () => {
);
});
});
describe("renameTable", () => {
async function captureRenameRequest(
call: (db: Connection) => Promise<void>,
): Promise<{ url: string; body: Record<string, unknown> }> {
let captured: { url: string; body: Record<string, unknown> } | undefined;
await withMockDatabase((req, res) => {
let raw = "";
req.on("data", (chunk) => {
raw += chunk;
});
req.on("end", () => {
captured = {
url: req.url ?? "",
body: raw ? JSON.parse(raw) : {},
};
res.writeHead(200, { "Content-Type": "application/json" }).end("");
});
}, call);
if (!captured) {
throw new Error("mock server never saw a request");
}
return captured;
}
it("sends rename request for a table in the root namespace", async () => {
const { url, body } = await captureRenameRequest(async (db) => {
await db.renameTable("table1", "table2");
});
expect(url).toBe("/v1/table/table1/rename/");
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
expect(body).toEqual({ new_table_name: "table2" });
});
it("omits new_namespace when only the current namespace is supplied", async () => {
// Safe-default check: passing namespacePath alone must not send
// `new_namespace`, so the server keeps the table in its current
// namespace instead of silently moving it to root.
const { url, body } = await captureRenameRequest(async (db) => {
await db.renameTable("table1", "table2", {
namespacePath: ["ns1"],
});
});
expect(url).toBe("/v1/table/ns1$table1/rename/");
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
expect(body).toEqual({ new_table_name: "table2" });
});
it("includes new_namespace in the body for a cross-namespace rename", async () => {
const { url, body } = await captureRenameRequest(async (db) => {
await db.renameTable("table1", "table2", {
namespacePath: ["ns1"],
newNamespacePath: ["ns2"],
});
});
expect(url).toBe("/v1/table/ns1$table1/rename/");
expect(body).toEqual({
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
new_table_name: "table2",
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
new_namespace: ["ns2"],
});
});
});
});

View File

@@ -115,6 +115,48 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
await expect(table.countRows()).resolves.toBe(1);
});
it("should invoke the progress callback", async () => {
const events: import("../lancedb").WriteProgress[] = [];
await table.add([{ id: 1 }, { id: 2 }, { id: 3 }], {
progress: (p) => events.push(p),
});
expect(events.length).toBeGreaterThan(0);
const last = events[events.length - 1];
expect(last.done).toBe(true);
// Earlier callbacks must have done=false.
for (const ev of events.slice(0, -1)) {
expect(ev.done).toBe(false);
}
// outputRows reflects the rows added in this call, not table size.
expect(last.outputRows).toBe(3);
// The input source (an array) reports a row count, so totalRows is set.
expect(last.totalRows).toBe(3);
// outputRows is monotonic.
for (let i = 1; i < events.length; i++) {
expect(events[i].outputRows).toBeGreaterThanOrEqual(
events[i - 1].outputRows,
);
}
});
it("should swallow errors thrown from the progress callback", async () => {
const warn = jest
.spyOn(console, "warn")
.mockImplementation(() => undefined);
try {
const res = await table.add([{ id: 1 }, { id: 2 }], {
progress: () => {
throw new Error("callback bomb");
},
});
expect(res.version).toBeGreaterThan(0);
expect(warn).toHaveBeenCalled();
} finally {
warn.mockRestore();
}
});
it("should let me close the table", async () => {
expect(table.isOpen()).toBe(true);
table.close();
@@ -2348,3 +2390,130 @@ describe("when creating a table with Float32Array vectors", () => {
expect((fsl.children[0].type as Float32).precision).toBe(1);
});
});
describe("setUnenforcedPrimaryKey", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
it("sets a single-column primary key (string or one-element array)", async () => {
const conn = await connect(tmpDir.name);
const schema = new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
]);
const t1 = await conn.createEmptyTable("t1", schema);
await t1.setUnenforcedPrimaryKey("id");
const t2 = await conn.createEmptyTable("t2", schema);
await t2.setUnenforcedPrimaryKey(["id"]);
});
it("rejects a compound primary key", async () => {
const conn = await connect(tmpDir.name);
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
new arrow.Field("name", new arrow.Utf8(), false),
]),
);
await expect(
table.setUnenforcedPrimaryKey(["id", "name"]),
).rejects.toThrow();
});
it("rejects changing the primary key once set", async () => {
const conn = await connect(tmpDir.name);
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
new arrow.Field("name", new arrow.Utf8(), false),
]),
);
await table.setUnenforcedPrimaryKey("id");
await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow();
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
});
});
describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
async function makeTable(conn: Connection): Promise<Table> {
return await conn.createEmptyTable(
"t",
new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]),
);
}
it("installs and removes a bucket spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 4,
});
await table.unsetLsmWriteSpec();
// A second unset errors — there is no spec left to remove.
await expect(table.unsetLsmWriteSpec()).rejects.toThrow();
// A fresh spec can be installed after unset.
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 8,
});
});
it("installs an unsharded spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({ specType: "unsharded" });
await table.unsetLsmWriteSpec();
});
it("installs an identity spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({ specType: "identity", column: "id" });
await table.unsetLsmWriteSpec();
});
it("rejects an invalid spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
// num_buckets out of range.
await expect(
table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 0,
}),
).rejects.toThrow();
// Column mismatch.
await expect(
table.setLsmWriteSpec({
specType: "bucket",
column: "missing",
numBuckets: 4,
}),
).rejects.toThrow();
});
});

View File

@@ -38,5 +38,14 @@ test("filtering examples", async () => {
// --8<-- [start:sql_search]
await tbl.query().where("id = 10").limit(10).toArray();
// --8<-- [end:sql_search]
// --8<-- [start:orderby_search]
await tbl
.query()
.where("id > 10")
.orderBy({ columnName: "id", ascending: false })
.limit(5)
.toArray();
// --8<-- [end:orderby_search]
});
});

View File

@@ -144,6 +144,19 @@ export interface DropNamespaceOptions {
behavior?: "restrict" | "cascade";
}
export interface RenameTableOptions {
/**
* The namespace path of the table being renamed. Defaults to the root
* namespace (`[]`) when omitted.
*/
namespacePath?: string[];
/**
* The namespace path to move the table to as part of the rename. When
* omitted the table stays in `namespacePath`.
*/
newNamespacePath?: string[];
}
/**
* A LanceDB Connection that allows you to open tables and create new ones.
*
@@ -296,12 +309,6 @@ export abstract class Connection {
*/
abstract dropTable(name: string, namespacePath?: string[]): Promise<void>;
abstract renameTable(
oldName: string,
newName: string,
namespacePath?: string[],
): Promise<void>;
/**
* Drop all tables in the database.
* @param {string[]} namespacePath The namespace path to drop tables from (defaults to root namespace).
@@ -397,6 +404,24 @@ export abstract class Connection {
isShallow?: boolean;
},
): Promise<Table>;
/**
* Rename a table.
*
* Currently only supported by LanceDB Cloud. Local OSS connections and
* namespace-backed connections (via {@link connectNamespace}) reject with
* a "not supported" error.
*
* @param {string} currentName - The current name of the table.
* @param {string} newName - The new name for the table.
* @param {RenameTableOptions} options - Optional namespace paths. When
* `newNamespacePath` is omitted the table stays in `namespacePath`.
*/
abstract renameTable(
currentName: string,
newName: string,
options?: RenameTableOptions,
): Promise<void>;
}
/** @hideconstructor */
@@ -615,14 +640,6 @@ export class LocalConnection extends Connection {
return this.inner.dropTable(name, namespacePath ?? []);
}
async renameTable(
oldName: string,
newName: string,
namespacePath?: string[],
): Promise<void> {
return this.inner.renameTable(oldName, newName, namespacePath ?? []);
}
async dropAllTables(namespacePath?: string[]): Promise<void> {
return this.inner.dropAllTables(namespacePath ?? []);
}
@@ -665,6 +682,19 @@ export class LocalConnection extends Connection {
options?.behavior,
);
}
async renameTable(
currentName: string,
newName: string,
options?: RenameTableOptions,
): Promise<void> {
return this.inner.renameTable(
currentName,
newName,
options?.namespacePath ?? [],
options?.newNamespacePath,
);
}
}
/**

View File

@@ -71,6 +71,7 @@ export {
CreateNamespaceResponse,
DropNamespaceResponse,
DescribeNamespaceResponse,
RenameTableOptions,
} from "./connection";
export { Session } from "./native.js";
@@ -82,6 +83,7 @@ export {
VectorQuery,
TakeQuery,
QueryExecutionOptions,
ColumnOrdering,
FullTextSearchOptions,
RecordBatchIterator,
FullTextQuery,
@@ -112,6 +114,8 @@ export {
UpdateOptions,
OptimizeOptions,
Version,
WriteProgress,
LsmWriteSpec,
ColumnAlteration,
} from "./table";

View File

@@ -79,6 +79,12 @@ export interface QueryExecutionOptions {
timeoutMs?: number;
}
export interface ColumnOrdering {
columnName: string;
ascending?: boolean;
nullsFirst?: boolean;
}
/**
* Options that control the behavior of a full text search
*/
@@ -417,6 +423,21 @@ export class StandardQueryBase<
return this;
}
/**
* Sort the results by the specified column(s).
* @returns This query builder.
*/
orderBy(ordering: ColumnOrdering | ColumnOrdering[]): this {
const orderings = Array.isArray(ordering) ? ordering : [ordering];
const normalized = orderings.map((o) => ({
columnName: o.columnName,
ascending: o.ascending ?? true,
nullsFirst: o.nullsFirst ?? false,
}));
this.doCall((inner) => inner.orderBy(normalized));
return this;
}
/**
* Skip searching un-indexed data. This can make search faster, but will miss
* any data that is not yet indexed.

View File

@@ -46,6 +46,33 @@ import { sanitizeType } from "./sanitize";
import { IntoSql, toSQL } from "./util";
export { IndexConfig } from "./native";
/**
* Progress snapshot for a write operation, delivered to the `progress`
* callback passed to {@link Table.add}.
*/
export interface WriteProgress {
/** Number of rows written so far. */
outputRows: number;
/** Number of bytes written so far. */
outputBytes: number;
/**
* Total rows expected, when the input source reports it.
*
* Always set on the final callback (the one with `done: true`), falling
* back to the actual number of rows written when the source could not
* report a row count up front.
*/
totalRows?: number;
/** Wall-clock seconds since the write started. */
elapsedSeconds: number;
/** Number of parallel write tasks currently in flight. */
activeTasks: number;
/** Total number of parallel write tasks (the write parallelism). */
totalTasks: number;
/** `true` for the final callback; `false` otherwise. */
done: boolean;
}
/**
* Options for adding data to a table.
*/
@@ -56,6 +83,28 @@ export interface AddDataOptions {
* If "overwrite" then the new data will replace the existing data in the table.
*/
mode: "append" | "overwrite";
/**
* Optional callback invoked periodically with write progress.
*
* The callback is fired once per batch written and once more with
* `done: true` when the write completes. Calls are dispatched
* asynchronously to the JS event loop and never block the write — a slow
* callback will queue events rather than back-pressure the writer.
*
* Errors thrown from the callback are logged with `console.warn` and
* swallowed — they do not abort the write.
*
* @example
* ```ts
* await table.add(data, {
* progress: (p) => {
* console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
* },
* });
* ```
*/
progress: (progress: WriteProgress) => void;
}
export interface UpdateOptions {
@@ -106,6 +155,27 @@ export interface Version {
metadata: Record<string, string>;
}
/**
* Specification selecting Lance's MemWAL LSM-style write path for
* `mergeInsert`.
*
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
* `column` and `numBuckets` are required; for `"identity"`, `column` is
* required.
*/
export interface LsmWriteSpec {
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
specType: "bucket" | "identity" | "unsharded";
/** Bucket and identity variants: the sharding column. */
column?: string;
/** Bucket variant: the number of buckets, in `[1, 1024]`. */
numBuckets?: number;
/** Names of indexes the MemWAL should keep up to date during writes. */
maintainedIndexes?: string[];
/** Default `ShardWriter` configuration recorded in the MemWAL index. */
writerConfigDefaults?: Record<string, string>;
}
/**
* A Table is a collection of Records in a LanceDB Database.
*
@@ -449,6 +519,54 @@ export abstract class Table {
* containing the new version number of the table after dropping the columns.
*/
abstract dropColumns(columnNames: string[]): Promise<DropColumnsResult>;
/**
* Set the unenforced primary key for this table to a single column.
*
* "Unenforced" means LanceDB does not check uniqueness on writes; the
* column is recorded in the schema as the primary key for use by features
* such as `merge_insert`. Only single-column primary keys are supported,
* and the key cannot be changed once set.
* @param {string | string[]} columns The primary key column. A one-element
* array is also accepted; passing more than one column is rejected.
* @returns {Promise<void>}
*/
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
/**
* Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL
* LSM-style write path for future `mergeInsert` calls.
*
* `LsmWriteSpec` chooses one of three sharding strategies via `specType`:
*
* - `"bucket"` — hash-bucket writes by the single-column unenforced primary
* key (`column` and `numBuckets` required).
* - `"identity"` — shard by the raw value of a scalar `column`.
* - `"unsharded"` — route every write to a single shard.
*
* All variants require the table to have an unenforced primary key
* ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally
* requires it to be the single column being bucketed.
* @param {LsmWriteSpec} spec The sharding spec to install.
* @returns {Promise<void>}
* @example
* ```ts
* await table.setUnenforcedPrimaryKey("id");
* await table.setLsmWriteSpec({
* specType: "bucket",
* column: "id",
* numBuckets: 16,
* maintainedIndexes: ["id_idx"],
* });
* ```
*/
abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise<void>;
/**
* Remove the {@link LsmWriteSpec} from this table, reverting to the standard
* `mergeInsert` write path.
*
* Errors if no spec is currently set.
* @returns {Promise<void>}
*/
abstract unsetLsmWriteSpec(): Promise<void>;
/** Retrieve the version of the table */
abstract version(): Promise<number>;
@@ -636,7 +754,20 @@ export class LocalTable extends Table {
const schema = await this.schema();
const buffer = await fromDataToBuffer(data, undefined, schema);
return await this.inner.add(buffer, mode);
// Wrap the user callback so a thrown error doesn't surface as an
// unhandled exception (the callback fires from a napi threadsafe
// function — exceptions there crash the process).
const userProgress = options?.progress;
const progress = userProgress
? (p: WriteProgress) => {
try {
userProgress(p);
} catch (e) {
console.warn("Table.add progress callback threw:", e);
}
}
: undefined;
return await this.inner.add(buffer, mode, progress);
}
async update(
@@ -897,6 +1028,19 @@ export class LocalTable extends Table {
return await this.inner.dropColumns(columnNames);
}
async setUnenforcedPrimaryKey(columns: string | string[]): Promise<void> {
const cols = typeof columns === "string" ? [columns] : columns;
return await this.inner.setUnenforcedPrimaryKey(cols);
}
async setLsmWriteSpec(spec: LsmWriteSpec): Promise<void> {
return await this.inner.setLsmWriteSpec(spec);
}
async unsetLsmWriteSpec(): Promise<void> {
return await this.inner.unsetLsmWriteSpec();
}
async version(): Promise<number> {
return await this.inner.version();
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

11029
nodejs/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.28.0-beta.11",
"version": "0.29.1-beta.0",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -328,20 +328,6 @@ impl Connection {
.default_error()
}
#[napi(catch_unwind)]
pub async fn rename_table(
&self,
old_name: String,
new_name: String,
namespace_path: Option<Vec<String>>,
) -> napi::Result<()> {
let ns = namespace_path.unwrap_or_default();
self.get_inner()?
.rename_table(&old_name, &new_name, &ns, &ns)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn drop_all_tables(&self, namespace_path: Option<Vec<String>>) -> napi::Result<()> {
let ns = namespace_path.unwrap_or_default();
@@ -473,4 +459,23 @@ impl Connection {
transaction_id: resp.transaction_id,
})
}
/// Rename a table. `current_namespace_path` and `new_namespace_path` default to
/// the root namespace when omitted; the caller is expected to either pass both
/// or pass neither.
#[napi(catch_unwind)]
pub async fn rename_table(
&self,
current_name: String,
new_name: String,
current_namespace_path: Option<Vec<String>>,
new_namespace_path: Option<Vec<String>>,
) -> napi::Result<()> {
let cur_ns = current_namespace_path.unwrap_or_default();
let new_ns = new_namespace_path.unwrap_or_default();
self.get_inner()?
.rename_table(&current_name, &new_name, &cur_ns, &new_ns)
.await
.default_error()
}
}

View File

@@ -3,6 +3,12 @@
use std::sync::Arc;
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer};
use arrow_array::{
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
@@ -19,16 +25,27 @@ use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::Select;
use lancedb::query::TakeQuery as LanceDbTakeQuery;
use lancedb::query::VectorQuery as LanceDbVectorQuery;
use lancedb::query::{ColumnOrdering as LanceDbColumnOrdering, VectorQuery as LanceDbVectorQuery};
use napi::bindgen_prelude::*;
use napi_derive::napi;
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer};
#[napi(object)]
pub struct ColumnOrdering {
pub ascending: bool,
pub nulls_first: bool,
pub column_name: String,
}
impl From<ColumnOrdering> for LanceDbColumnOrdering {
fn from(value: ColumnOrdering) -> Self {
match (value.ascending, value.nulls_first) {
(true, true) => Self::asc_nulls_first(value.column_name),
(true, false) => Self::asc_nulls_last(value.column_name),
(false, true) => Self::desc_nulls_first(value.column_name),
(false, false) => Self::desc_nulls_last(value.column_name),
}
}
}
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
let buf = arrow_buffer::Buffer::from(data.to_vec());
@@ -128,6 +145,18 @@ impl Query {
self.inner = self.inner.clone().with_row_id();
}
#[napi]
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
let ordering = ordering.map(|ordering| {
ordering
.into_iter()
.map(LanceDbColumnOrdering::from)
.collect()
});
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[napi(catch_unwind)]
pub async fn output_schema(&self) -> napi::Result<Buffer> {
let schema = self.inner.output_schema().await.default_error()?;
@@ -328,6 +357,18 @@ impl VectorQuery {
Ok(())
}
#[napi]
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
let ordering = ordering.map(|ordering| {
ordering
.into_iter()
.map(LanceDbColumnOrdering::from)
.collect()
});
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[napi(catch_unwind)]
pub async fn output_schema(&self) -> napi::Result<Buffer> {
let schema = self.inner.output_schema().await.default_error()?;

View File

@@ -9,6 +9,7 @@ use lancedb::table::{
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi;
use crate::error::NapiErrorExt;
@@ -67,8 +68,16 @@ impl Table {
schema_to_buffer(&schema)
}
#[napi(catch_unwind)]
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<AddResult> {
#[napi(
catch_unwind,
ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void"
)]
pub async fn add(
&self,
buf: Buffer,
mode: String,
progress_callback: Option<ProgressFn>,
) -> napi::Result<AddResult> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
let batches = batches
@@ -92,6 +101,19 @@ impl Table {
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
};
if let Some(tsfn) = progress_callback {
op = op.progress(move |p| {
// NonBlocking: dispatch onto the JS event loop without
// blocking the writer thread. With napi-rs's default
// unbounded queue, events are not dropped — a slow JS
// callback will just queue them.
tsfn.call(
WriteProgressInfo::from(p),
ThreadsafeFunctionCallMode::NonBlocking,
);
});
}
let res = op.execute().await.default_error()?;
Ok(res.into())
}
@@ -344,6 +366,31 @@ impl Table {
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn set_unenforced_primary_key(&self, columns: Vec<String>) -> napi::Result<()> {
self.inner_ref()?
.set_unenforced_primary_key(columns)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> {
let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?;
self.inner_ref()?
.set_lsm_write_spec(native_spec)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> {
self.inner_ref()?
.unset_lsm_write_spec()
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn version(&self) -> napi::Result<i64> {
self.inner_ref()?
@@ -538,6 +585,63 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
}
}
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `mergeInsert`.
///
/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For
/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`,
/// `column` is required.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct LsmWriteSpec {
/// One of `"bucket"`, `"identity"`, or `"unsharded"`.
pub spec_type: String,
/// Bucket and identity variants: the sharding column.
pub column: Option<String>,
/// Bucket variant: the number of buckets, in `[1, 1024]`.
pub num_buckets: Option<u32>,
/// Names of indexes the MemWAL should keep up to date during writes.
pub maintained_indexes: Option<Vec<String>>,
/// Default `ShardWriter` configuration recorded in the MemWAL index.
pub writer_config_defaults: Option<HashMap<String, String>>,
}
impl TryFrom<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
type Error = napi::Error;
fn try_from(value: LsmWriteSpec) -> napi::Result<Self> {
let maintained = value.maintained_indexes.unwrap_or_default();
let writer_config_defaults = value.writer_config_defaults.unwrap_or_default();
let spec = match value.spec_type.as_str() {
"bucket" => {
let column = value.column.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec bucket requires `column`")
})?;
let num_buckets = value.num_buckets.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`")
})?;
Self::bucket(column, num_buckets)
}
"identity" => {
let column = value.column.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec identity requires `column`")
})?;
Self::identity(column)
}
"unsharded" => Self::unsharded(),
other => {
return Err(napi::Error::from_reason(format!(
"LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'",
other
)));
}
};
Ok(spec
.with_maintained_indexes(maintained)
.with_writer_config_defaults(writer_config_defaults))
}
}
/// Statistics about a compaction operation.
#[napi(object)]
#[derive(Clone, Debug)]
@@ -572,6 +676,44 @@ pub struct OptimizeStats {
pub prune: RemovalStats,
}
/// Progress snapshot for a write operation, delivered to the JS callback
/// passed to `Table.add`.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct WriteProgressInfo {
/// Number of rows written so far.
pub output_rows: i64,
/// Number of bytes written so far.
pub output_bytes: i64,
/// Total rows expected, if the input source reports it.
/// Always set on the final callback (where `done` is `true`).
pub total_rows: Option<i64>,
/// Wall-clock seconds since monitoring started.
pub elapsed_seconds: f64,
/// Number of parallel write tasks currently in flight.
pub active_tasks: i64,
/// Total number of parallel write tasks (the write parallelism).
pub total_tasks: i64,
/// `true` for the final callback; `false` otherwise.
pub done: bool,
}
impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo {
fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self {
Self {
output_rows: p.output_rows() as i64,
output_bytes: p.output_bytes() as i64,
total_rows: p.total_rows().map(|n| n as i64),
elapsed_seconds: p.elapsed().as_secs_f64(),
active_tasks: p.active_tasks() as i64,
total_tasks: p.total_tasks() as i64,
done: p.done(),
}
}
}
type ProgressFn = ThreadsafeFunction<WriteProgressInfo, (), WriteProgressInfo, Status, false>;
/// A definition of a column alteration. The alteration changes the column at
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
/// and to have the data type `data_type`. At least one of `rename` or `nullable`

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.31.0-beta.11"
current_version = "0.32.1-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -4,16 +4,26 @@ code is in the `src/` directory and the Python bindings are in the `lancedb/` di
Common commands:
* Bootstrap dev env: `uv run --extra tests --extra dev maturin develop --extras tests,dev`
* Build: `make develop`
* Format: `make format`
* Lint: `make check`
* Fix lints: `make fix`
* Test: `make test`
* Doc test: `make doctest`
* Test: `uv run --extra tests pytest python/tests -vv --durations=10 -m "not slow and not s3_test"`
* Run specific test: `uv run --extra tests pytest python/tests/<test_file>.py::<test_name> -q`
* Doc test: `uv run --extra tests pytest --doctest-modules python/lancedb`
Use the uv-managed environment declared by `uv.lock` for Python validation. Do
not treat system `python`, global `pytest`, or missing editable-install errors
as final blockers; bootstrap or enter the uv environment instead. `make test`
and `make doctest` assume the development environment is already prepared.
Before committing changes, run lints and then formatting.
When you change the Rust code, you will need to recompile the Python bindings: `make develop`.
When you change the Rust code, PyO3 binding code, or see a missing/stale
`lancedb._lancedb`, recompile the Python bindings with
`uv run --extra tests --extra dev maturin develop --extras tests,dev` before
running tests.
When you export new types from Rust to Python, you must manually update `python/lancedb/_lancedb.pyi`
with the corresponding type hints. You can run `pyright` to check for type errors in the Python code.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.31.0-beta.11"
version = "0.32.1-beta.0"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -217,6 +217,9 @@ class Table:
async def uri(self) -> str: ...
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
async def unset_lsm_write_spec(self) -> None: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...
@@ -255,6 +258,11 @@ class RecordBatchStream:
def __aiter__(self) -> "RecordBatchStream": ...
async def __anext__(self) -> pa.RecordBatch: ...
class ColumnOrdering(TypedDict):
column_name: str
ascending: bool
nulls_first: bool
class Query:
def where(self, filter: str): ...
def where_expr(self, expr: PyExpr): ...
@@ -268,6 +276,7 @@ class Query:
def postfilter(self): ...
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
def nearest_to_text(self, query: dict) -> FTSQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
async def output_schema(self) -> pa.Schema: ...
async def execute(
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
@@ -296,6 +305,7 @@ class FTSQuery:
def get_query(self) -> str: ...
def add_query_vector(self, query_vec: pa.Array) -> None: ...
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
async def output_schema(self) -> pa.Schema: ...
async def execute(
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
@@ -321,6 +331,7 @@ class VectorQuery:
def maximum_nprobes(self, maximum_nprobes: int): ...
def bypass_vector_index(self): ...
def nearest_to_text(self, query: dict) -> HybridQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
def to_query_request(self) -> PyQueryRequest: ...
class HybridQuery:
@@ -339,6 +350,7 @@ class HybridQuery:
def minimum_nprobes(self, minimum_nprobes: int): ...
def maximum_nprobes(self, maximum_nprobes: int): ...
def bypass_vector_index(self): ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
def to_vector_query(self) -> VectorQuery: ...
def to_fts_query(self) -> FTSQuery: ...
def get_limit(self) -> int: ...
@@ -368,6 +380,7 @@ class PyQueryRequest:
bypass_vector_index: Optional[bool]
postfilter: Optional[bool]
norm: Optional[str]
order_by: Optional[List[ColumnOrdering]]
class CompactionStats:
fragments_removed: int
@@ -408,6 +421,37 @@ class MergeResult:
num_deleted_rows: int
num_attempts: int
class LsmWriteSpec:
"""Specification selecting Lance's MemWAL LSM-style write path for
`merge_insert`."""
@staticmethod
def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ...
@staticmethod
def identity(column: str) -> "LsmWriteSpec": ...
@staticmethod
def unsharded() -> "LsmWriteSpec": ...
def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec":
"""Return a copy of this spec asking the MemWAL to keep the named
indexes up to date as rows are appended."""
...
def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec":
"""Return a copy of this spec recording the given default
`ShardWriter` configuration in the MemWAL index."""
...
@property
def spec_type(self) -> str:
"""One of 'bucket', 'identity', or 'unsharded'."""
...
@property
def column(self) -> Optional[str]: ...
@property
def num_buckets(self) -> Optional[int]: ...
@property
def maintained_indexes(self) -> List[str]: ...
@property
def writer_config_defaults(self) -> Dict[str, str]: ...
class AddColumnsResult:
version: int

View File

@@ -968,22 +968,32 @@ class Permutation:
new.transform_fn = transform
return new
def take_offsets(self, offsets: list[int]) -> Any:
"""
Take rows from the permutation by offset
The returned value is passed through the permutation's current transform,
so `with_format` and `with_transform` affect this method in the same way
they affect iteration.
"""
async def do_take_offsets():
return await self.reader.take_offsets(offsets, selection=self.selection)
batch = LOOP.run(do_take_offsets())
return self.transform_fn(batch)
def __getitem__(self, index: int) -> Any:
"""
Returns a single row from the permutation by offset
"""
return self.__getitems__([index])
return self.take_offsets([index])
def __getitems__(self, indices: list[int]) -> Any:
"""
Returns rows from the permutation by offset
"""
async def do_getitems():
return await self.reader.take_offsets(indices, selection=self.selection)
batch = LOOP.run(do_getitems())
return self.transform_fn(batch)
return self.take_offsets(indices)
@deprecated(details="Use with_skip instead")
def skip(self, skip: int) -> "Permutation":

View File

@@ -92,6 +92,12 @@ def ensure_vector_query(
return val
class ColumnOrdering(pydantic.BaseModel):
column_name: str
ascending: bool = True
nulls_first: bool = False
class FullTextQueryType(str, Enum):
MATCH = "match"
MATCH_PHRASE = "match_phrase"
@@ -504,6 +510,8 @@ class Query(pydantic.BaseModel):
# Bypass the vector index and use a brute force search
bypass_vector_index: Optional[bool] = None
order_by: Optional[List[ColumnOrdering]] = None
@classmethod
def from_inner(cls, req: PyQueryRequest) -> Self:
query = cls()
@@ -524,6 +532,8 @@ class Query(pydantic.BaseModel):
query.refine_factor = req.refine_factor
query.bypass_vector_index = req.bypass_vector_index
query.postfilter = req.postfilter
if req.order_by is not None:
query.order_by = [ColumnOrdering(**o) for o in req.order_by]
if req.full_text_search is not None:
query.full_text_query = FullTextSearchQuery(
columns=None,
@@ -572,9 +582,22 @@ class LanceQueryBuilder(ABC):
If "auto", the query type is inferred based on the query.
vector_column_name: str
The name of the vector column to use for vector search.
ordering_field_name: Optional[str]
.. deprecated:: 0.27.0
Use ``order_by()`` method instead.
fts_columns: Optional[Union[str, List[str]]]
The columns to search in for full text search.
fast_search: bool
Skip flat search of unindexed data.
"""
if ordering_field_name is not None:
import warnings
warnings.warn(
"ordering_field_name is deprecated, use .order_by() method instead.",
DeprecationWarning,
stacklevel=2,
)
# Check hybrid search first as it supports empty query pattern
if query_type == "hybrid":
# hybrid fts and vector query
@@ -671,6 +694,7 @@ class LanceQueryBuilder(ABC):
self._text = None
self._ef = None
self._bypass_vector_index = None
self._order_by = None
@deprecation.deprecated(
deprecated_in="0.3.1",
@@ -694,6 +718,7 @@ class LanceQueryBuilder(ABC):
flatten: Optional[Union[int, bool]] = None,
*,
timeout: Optional[timedelta] = None,
**kwargs,
) -> "pd.DataFrame":
"""
Execute the query and return the results as a pandas DataFrame.
@@ -711,9 +736,12 @@ class LanceQueryBuilder(ABC):
timeout: Optional[timedelta]
The maximum time to wait for the query to complete.
If None, wait indefinitely.
**kwargs
Forwarded to pyarrow.Table.to_pandas after query execution and
optional flattening.
"""
tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten)
return tbl.to_pandas()
return tbl.to_pandas(**kwargs)
@abstractmethod
def to_arrow(self, *, timeout: Optional[timedelta] = None) -> pa.Table:
@@ -947,6 +975,24 @@ class LanceQueryBuilder(ABC):
""" # noqa: E501
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
"""
Set the ordering for the results.
Parameters
----------
ordering: Optional[List[ColumnOrdering]]
The ordering to use for the results. If None, then the default ordering
will be used.
Returns
-------
LanceQueryBuilder
The LanceQueryBuilder object.
"""
self._order_by = ordering
return self
def analyze_plan(self) -> str:
"""
Run the query and return its execution plan with runtime metrics.
@@ -1314,6 +1360,7 @@ class LanceVectorQueryBuilder(LanceQueryBuilder):
fast_search=self._fast_search,
ef=self._ef,
bypass_vector_index=self._bypass_vector_index,
order_by=self._order_by,
)
def to_batches(
@@ -1465,7 +1512,9 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
super().__init__(table)
self._query = query
self._phrase_query = False
self.ordering_field_name = ordering_field_name
# Deprecated compatibility parameter. Native FTS ordering is now
# configured through order_by(); LanceQueryBuilder.create emits the warning.
_ = ordering_field_name
self._reranker = None
self._fast_search = fast_search
if isinstance(fts_columns, str):
@@ -1514,6 +1563,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
),
offset=self._offset,
fast_search=self._fast_search,
order_by=self._order_by,
)
def output_schema(self) -> pa.Schema:
@@ -1579,6 +1629,7 @@ class LanceEmptyQueryBuilder(LanceQueryBuilder):
limit=self._limit,
with_row_id=self._with_row_id,
offset=self._offset,
order_by=self._order_by,
)
def output_schema(self) -> pa.Schema:
@@ -2305,6 +2356,7 @@ class AsyncQueryBase(object):
self,
flatten: Optional[Union[int, bool]] = None,
timeout: Optional[timedelta] = None,
**kwargs,
) -> "pd.DataFrame":
"""
Execute the query and collect the results into a pandas DataFrame.
@@ -2337,10 +2389,13 @@ class AsyncQueryBase(object):
The maximum time to wait for the query to complete.
If not specified, no timeout is applied. If the query does not
complete within the specified time, an error will be raised.
**kwargs
Forwarded to pyarrow.Table.to_pandas after query execution and
optional flattening.
"""
return (
flatten_columns(await self.to_arrow(timeout=timeout), flatten)
).to_pandas()
).to_pandas(**kwargs)
async def to_polars(
self,
@@ -2502,6 +2557,27 @@ class AsyncStandardQuery(AsyncQueryBase):
self._inner.offset(offset)
return self
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
"""
Set the ordering for the results.
Parameters
----------
ordering: Optional[List[ColumnOrdering]]
The ordering to use for the results. If None, then the default ordering
will be used.
"""
if ordering is None:
self._inner.order_by(None)
else:
self._inner.order_by(
[
o.model_dump() if hasattr(o, "model_dump") else o.dict()
for o in ordering
]
)
return self
def fast_search(self) -> Self:
"""
Skip searching un-indexed data.
@@ -3321,6 +3397,7 @@ class BaseQueryBuilder(object):
self,
flatten: Optional[Union[int, bool]] = None,
timeout: Optional[timedelta] = None,
**kwargs,
) -> "pd.DataFrame":
"""
Execute the query and collect the results into a pandas DataFrame.
@@ -3353,8 +3430,11 @@ class BaseQueryBuilder(object):
The maximum time to wait for the query to complete.
If not specified, no timeout is applied. If the query does not
complete within the specified time, an error will be raised.
**kwargs
Forwarded to pyarrow.Table.to_pandas after query execution and
optional flattening.
"""
return LOOP.run(self._inner.to_pandas(flatten, timeout))
return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs))
def to_polars(
self,

View File

@@ -14,6 +14,7 @@ from lancedb._lancedb import (
DeleteResult,
DropColumnsResult,
IndexConfig,
LsmWriteSpec,
MergeResult,
UpdateResult,
)
@@ -39,7 +40,7 @@ from lancedb.embeddings import EmbeddingFunctionRegistry
from lancedb.table import _normalize_progress
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
from ..table import AsyncTable, BlobMode, IndexStatistics, Query, Table, Tags
from ..types import BaseTokenizerType
@@ -100,7 +101,7 @@ class RemoteTable(Table):
"""to_arrow() is not yet supported on LanceDB cloud."""
raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.")
def to_pandas(self):
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs):
"""to_pandas() is not yet supported on LanceDB cloud."""
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
@@ -655,6 +656,18 @@ class RemoteTable(Table):
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.set_unenforced_primary_key(columns))
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.set_lsm_write_spec(spec))
def unset_lsm_write_spec(self) -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.unset_lsm_write_spec())
def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name))

View File

@@ -87,6 +87,8 @@ from .util import (
)
from .index import lang_mapping
BlobMode = Literal["lazy", "bytes", "descriptions"]
_MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera")
_MODEL_BACKED_TOKENIZER_ERRORS = (
"unknown base tokenizer",
@@ -154,6 +156,7 @@ if TYPE_CHECKING:
AlterColumnsResult,
DeleteResult,
DropColumnsResult,
LsmWriteSpec,
MergeResult,
UpdateResult,
)
@@ -759,14 +762,22 @@ class Table(ABC):
"""
raise NotImplementedError
def to_pandas(self) -> "pandas.DataFrame":
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pandas.DataFrame":
"""Return the table as a pandas DataFrame.
Parameters
----------
blob_mode: str, default "lazy"
Controls how blob columns are returned for backends that support
Lance blob-aware pandas conversion.
**kwargs
Forwarded to PyArrow / Lance pandas conversion.
Returns
-------
pd.DataFrame
"""
return self.to_arrow().to_pandas()
return self.to_arrow().to_pandas(**kwargs)
@abstractmethod
def to_arrow(self) -> pa.Table:
@@ -2182,14 +2193,27 @@ class LanceTable(Table):
"""Return the first n rows of the table."""
return LOOP.run(self._table.head(n))
def to_pandas(self) -> "pd.DataFrame":
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
"""Return the table as a pandas DataFrame.
Parameters
----------
blob_mode: str, default "lazy"
Controls how Lance blob columns are returned.
**kwargs
Forwarded to Lance pandas conversion.
Returns
-------
pd.DataFrame
"""
return self.to_arrow().to_pandas()
if blob_mode == "lazy" and (
self._namespace_client is not None
or get_uri_scheme(self._dataset_path) == "memory"
):
return self.to_arrow().to_pandas(**kwargs)
return self.to_lance().to_pandas(blob_mode=blob_mode, **kwargs)
def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
@@ -2518,11 +2542,6 @@ class LanceTable(Table):
"at a time. To search over multiple text fields, create a "
"separate FTS index for each field."
)
if "." in field_names:
raise ValueError(
"Native FTS indexes can only be created on top-level fields. "
f"Received nested field path: {field_names!r}."
)
if tokenizer_name is None:
tokenizer_configs = {
@@ -3263,6 +3282,21 @@ class LanceTable(Table):
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
"""Set the unenforced primary key. See
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
return LOOP.run(self._table.set_unenforced_primary_key(columns))
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Install an LsmWriteSpec. See
[`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec]."""
return LOOP.run(self._table.set_lsm_write_spec(spec))
def unset_lsm_write_spec(self) -> None:
"""Remove the LsmWriteSpec. See
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
return LOOP.run(self._table.unset_lsm_write_spec())
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
@@ -3808,6 +3842,69 @@ class AsyncTable:
Any attempt to use the table after it has been closed will raise an error."""
return self._inner.close()
async def set_unenforced_primary_key(
self, columns: Union[str, Iterable[str]]
) -> None:
"""Set the unenforced primary key for this table to the given
ordered list of columns.
"Unenforced" means LanceDB does not check uniqueness on writes; the
columns are recorded in the schema as the primary key so that
features such as `merge_insert` can use them. Calling this again
replaces any previously-set primary key.
Parameters
----------
columns : str or Iterable[str]
Either a single column name (single-column key) or an ordered
iterable of column names (composite key). Each column dtype
must be one of: int32, int64, utf8, large_utf8, binary,
large_binary, fixed_size_binary.
"""
if isinstance(columns, str):
columns = [columns]
else:
columns = list(columns)
await self._inner.set_unenforced_primary_key(columns)
async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Install an LsmWriteSpec on this table.
The spec selects Lance's MemWAL LSM-style write path for future
`merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding
strategies:
- ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by
the single-column unenforced primary key.
- ``LsmWriteSpec.identity(column)`` — shard by the raw value of a
scalar column.
- ``LsmWriteSpec.unsharded()`` — route every write to a single shard.
All variants require the table to have an unenforced primary key set
via [`set_unenforced_primary_key`]; bucket sharding additionally
requires it to be the single column being bucketed.
Parameters
----------
spec : LsmWriteSpec
The sharding spec to install.
Examples
--------
>>> from lancedb._lancedb import LsmWriteSpec
>>> # table.set_unenforced_primary_key("id")
>>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16))
"""
await self._inner.set_lsm_write_spec(spec)
async def unset_lsm_write_spec(self) -> None:
"""Remove the LsmWriteSpec from this table.
Reverts to the standard `merge_insert` write path. Errors if no spec
is currently set.
"""
await self._inner.unset_lsm_write_spec()
@property
def name(self) -> str:
"""The name of the table."""
@@ -3866,14 +3963,39 @@ class AsyncTable:
"""
return AsyncQuery(self._inner.query())
async def to_pandas(self) -> "pd.DataFrame":
async def _to_lance(self, **kwargs) -> lance.LanceDataset:
try:
import lance
except ImportError:
raise ImportError(
"The lance library is required to use this function. "
"Please install with `pip install pylance`."
)
return lance.dataset(
await self.uri(),
version=await self.version(),
storage_options=await self.latest_storage_options(),
**kwargs,
)
async def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
"""Return the table as a pandas DataFrame.
Parameters
----------
blob_mode: str, default "lazy"
Controls how Lance blob columns are returned.
**kwargs
Forwarded to PyArrow / Lance pandas conversion.
Returns
-------
pd.DataFrame
"""
return (await self.to_arrow()).to_pandas()
if blob_mode == "lazy":
return (await self.to_arrow()).to_pandas(**kwargs)
return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs)
async def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
@@ -4512,6 +4634,8 @@ class AsyncTable:
async_query = async_query.fast_search()
if query.with_row_id:
async_query = async_query.with_row_id()
if query.order_by:
async_query = async_query.order_by(query.order_by)
if query.vector:
async_query = async_query.nearest_to(query.vector).distance_range(

View File

@@ -29,6 +29,7 @@ from lancedb.query import (
MultiMatchQuery,
PhraseQuery,
BooleanQuery,
ColumnOrdering,
Occur,
LanceFtsQueryBuilder,
)
@@ -499,6 +500,36 @@ async def test_search_fts_specify_column_async(async_table):
pass
def test_search_order_by_descending(table):
table.create_fts_index("text")
rows = (
table.search("puppy")
.order_by([ColumnOrdering(column_name="count", ascending=False)])
.limit(20)
.select(["text", "count"])
.to_list()
)
for r in rows:
assert "puppy" in r["text"]
assert sorted(rows, key=lambda x: x["count"], reverse=True) == rows
def test_search_order_by_ascending(table):
table.create_fts_index("text")
rows = (
table.search("puppy")
.order_by([ColumnOrdering(column_name="count", ascending=True)])
.limit(20)
.select(["text", "count"])
.to_list()
)
for r in rows:
assert "puppy" in r["text"]
assert sorted(rows, key=lambda x: x["count"]) == rows
def test_create_index_from_table(tmp_path, table):
table.create_fts_index("text")
df = table.search("puppy").limit(5).select(["text"]).to_pandas()
@@ -532,8 +563,19 @@ def test_create_index_multiple_columns(tmp_path, table):
def test_nested_schema(tmp_path, table):
with pytest.raises(ValueError, match="top-level fields"):
table.create_fts_index("nested.text")
table.create_fts_index("nested.text")
indices = table.list_indices()
assert len(indices) == 1
assert indices[0].index_type == "FTS"
assert indices[0].columns == ["nested.text"]
results = (
table.search("puppy", query_type="fts", fts_columns="nested.text")
.limit(5)
.to_list()
)
assert len(results) > 0
assert all("puppy" in row["nested"]["text"] for row in results)
def test_search_index_with_filter(table):

View File

@@ -0,0 +1,149 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for installing and clearing an LsmWriteSpec via
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
"""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
from lancedb._lancedb import LsmWriteSpec
SCHEMA = pa.schema(
[
pa.field("id", pa.utf8(), nullable=False),
pa.field("v", pa.int32(), nullable=False),
]
)
def _batch(ids, vs):
return pa.RecordBatch.from_arrays(
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
schema=SCHEMA,
)
def _reader(ids, vs):
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
def _make_table(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader(["seed"], [0]))
return db, table
def test_set_lsm_write_spec_validates(tmp_path):
_db, table = _make_table(tmp_path)
# No PK set yet.
with pytest.raises(Exception, match="primary key"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.set_unenforced_primary_key("id")
# Column mismatch.
with pytest.raises(Exception, match="match"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
# Out-of-range num_buckets.
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
# Happy path then mutation rejected.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
with pytest.raises(Exception, match="mutation"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
def test_unset_lsm_write_spec(tmp_path):
_db, table = _make_table(tmp_path)
# unset errors when no spec is set.
with pytest.raises(Exception, match="no LSM write spec"):
table.unset_lsm_write_spec()
# Install a spec, then remove it; afterwards a fresh spec can be set.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.unset_lsm_write_spec()
# A second unset errors — there is no spec left to remove.
with pytest.raises(Exception, match="no LSM write spec"):
table.unset_lsm_write_spec()
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
def test_set_unsharded_spec(tmp_path):
_db, table = _make_table(tmp_path)
# Lance MemWAL still requires a primary key on the dataset; Unsharded
# just skips per-row hashing.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
table.unset_lsm_write_spec()
def test_lsm_write_spec_repr():
s = LsmWriteSpec.bucket("id", 4)
assert s.spec_type == "bucket"
assert s.column == "id"
assert s.num_buckets == 4
assert s.maintained_indexes == []
assert "bucket" in repr(s)
assert "id" in repr(s)
assert "4" in repr(s)
u = LsmWriteSpec.unsharded()
assert u.spec_type == "unsharded"
assert u.column is None
assert u.num_buckets is None
assert "unsharded" in repr(u)
def test_lsm_write_spec_with_maintained_indexes():
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
assert s.maintained_indexes == ["idx_a", "idx_b"]
@pytest.mark.asyncio
async def test_async_set_unset_lsm_write_spec(tmp_path):
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=0)
)
table = await db.create_table(
"t",
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
)
await table.set_unenforced_primary_key("id")
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
await table.unset_lsm_write_spec()
# A second unset errors.
with pytest.raises(Exception, match="no LSM write spec"):
await table.unset_lsm_write_spec()
def test_set_identity_spec(tmp_path):
_db, table = _make_table(tmp_path)
# Identity sharding still requires an unenforced primary key on the
# table; it shards by the raw value of the given column.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
table.unset_lsm_write_spec()
def test_lsm_write_spec_identity_and_writer_config_defaults():
s = LsmWriteSpec.identity("v")
assert s.spec_type == "identity"
assert s.column == "v"
assert s.num_buckets is None
assert "identity" in repr(s)
s = s.with_writer_config_defaults({"durable_write": "false"})
assert s.writer_config_defaults == {"durable_write": "false"}
assert "durable_write" in repr(s)

View File

@@ -1080,3 +1080,29 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
"""Test __getitems__ with an out-of-range offset raises an error."""
with pytest.raises(Exception):
some_permutation.__getitems__([999999])
def test_take_offsets(some_permutation: Permutation):
result = some_permutation.take_offsets([0, 1, 2])
assert isinstance(result, list)
assert "id" in result[0]
assert "value" in result[0]
assert len(result) == 3
def test_take_offsets_empty_identity_permutation(mem_db):
tbl = mem_db.create_table(
"test_table", pa.table({"id": range(10), "value": range(10)})
)
permutation = Permutation.identity(tbl)
result = permutation.take_offsets([])
assert result == []
def test_take_offsets_empty_permutation(some_permutation: Permutation):
result = some_permutation.take_offsets([])
assert result == []

View File

@@ -0,0 +1,79 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for Table.set_unenforced_primary_key."""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
def _empty_table(path, schema):
db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0))
return db.create_table("t", schema=schema)
def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path):
schema = pa.schema([pa.field("id", pa.int64(), nullable=False)])
# Bare string.
table = _empty_table(tmp_path / "s", schema)
table.set_unenforced_primary_key("id")
# One-element list.
table = _empty_table(tmp_path / "l", schema)
table.set_unenforced_primary_key(["id"])
def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path):
table = _empty_table(
tmp_path,
pa.schema(
[
pa.field("a", pa.utf8(), nullable=False),
pa.field("b", pa.int64(), nullable=False),
]
),
)
# Compound keys are not supported.
with pytest.raises(Exception, match="compound"):
table.set_unenforced_primary_key(["a", "b"])
# Empty input.
with pytest.raises(Exception, match="required"):
table.set_unenforced_primary_key([])
def test_set_unenforced_primary_key_is_immutable(tmp_path):
table = _empty_table(
tmp_path,
pa.schema(
[
pa.field("a", pa.utf8(), nullable=False),
pa.field("b", pa.int64(), nullable=False),
]
),
)
table.set_unenforced_primary_key("a")
# The primary key cannot be changed or re-set once installed.
with pytest.raises(Exception, match="already set"):
table.set_unenforced_primary_key("b")
with pytest.raises(Exception, match="already set"):
table.set_unenforced_primary_key("a")
def test_set_unenforced_primary_key_validates(tmp_path):
table = _empty_table(
tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)])
)
# Unknown column.
with pytest.raises(Exception, match="not found"):
table.set_unenforced_primary_key("nonexistent")
# Unsupported dtype (Float32 not in the supported set).
bad = _empty_table(
tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)])
)
with pytest.raises(Exception, match="not supported"):
bad.set_unenforced_primary_key("id")

View File

@@ -25,6 +25,7 @@ from lancedb.query import (
AsyncHybridQuery,
AsyncQueryBase,
AsyncVectorQuery,
ColumnOrdering,
LanceVectorQueryBuilder,
MatchQuery,
PhraseQuery,
@@ -164,6 +165,87 @@ def test_offset(table):
assert len(results_with_offset.to_pandas()) == 1
@pytest.mark.asyncio
async def test_query_to_pandas_kwargs(table, table_async):
sync_df = (
LanceVectorQueryBuilder(table, [0, 0], "vector")
.select(["id"])
.limit(1)
.to_pandas(split_blocks=True)
)
assert sync_df["id"].tolist() == [1]
async_df = await (
table_async.query().select(["id"]).limit(2).to_pandas(split_blocks=True)
)
assert async_df["id"].tolist() == [1, 2]
def test_order_by_plain_query(mem_db):
table = mem_db.create_table(
"test_order_by",
pa.table(
{
"group": [1, 1, 1, 2],
"score": [None, 1.0, 1.0, 0.5],
"name": ["z", "b", "a", "c"],
}
),
)
res = (
table.search()
.order_by(
[
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
]
)
.to_arrow()
)
assert res.select(["group", "score", "name"]).to_pylist() == [
{"group": 1, "score": None, "name": "z"},
{"group": 1, "score": 1.0, "name": "a"},
{"group": 1, "score": 1.0, "name": "b"},
{"group": 2, "score": 0.5, "name": "c"},
]
@pytest.mark.asyncio
async def test_order_by_async_query(mem_db_async: AsyncConnection):
table = await mem_db_async.create_table(
"test_order_by_async",
pa.table(
{
"group": [1, 1, 1, 2],
"score": [None, 1.0, 1.0, 0.5],
"name": ["z", "b", "a", "c"],
}
),
)
res = await (
table.query()
.order_by(
[
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
]
)
.to_arrow()
)
assert res.select(["group", "score", "name"]).to_pylist() == [
{"group": 1, "score": None, "name": "z"},
{"group": 1, "score": 1.0, "name": "a"},
{"group": 1, "score": 1.0, "name": "b"},
{"group": 2, "score": 0.5, "name": "c"},
]
def test_query_builder(table):
rs = (
LanceVectorQueryBuilder(table, [0, 0], "vector")

View File

@@ -16,6 +16,7 @@ from packaging.version import Version
import lancedb
from lancedb.conftest import MockTextEmbeddingFunction
from lancedb.query import ColumnOrdering
from lancedb.remote import ClientConfig
from lancedb.remote.errors import HttpError, RetryError
import pytest
@@ -268,6 +269,25 @@ def test_table_unimplemented_functions():
table.to_pandas()
def test_table_to_pandas_not_supported():
def handler(request):
if request.path == "/v1/table/test/create/?mode=create":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}])
with pytest.raises(NotImplementedError):
table.to_pandas()
with pytest.raises(NotImplementedError):
table.to_pandas(blob_mode="bytes", split_blocks=True)
def test_table_add_in_threadpool():
def handler(request):
if request.path == "/v1/table/test/insert/":
@@ -660,6 +680,18 @@ def test_query_sync_maximal():
"ef": None,
"filter": "id > 0",
"columns": ["id", "name"],
"order_by": [
{
"column_name": "score",
"ascending": False,
"nulls_first": True,
},
{
"column_name": "id",
"ascending": True,
"nulls_first": False,
},
],
"vector_column": "vector2",
"fast_search": True,
"with_row_id": True,
@@ -677,6 +709,14 @@ def test_query_sync_maximal():
.refine_factor(10)
.nprobes(5)
.where("id > 0", prefilter=True)
.order_by(
[
ColumnOrdering(
column_name="score", ascending=False, nulls_first=True
),
ColumnOrdering(column_name="id", ascending=True, nulls_first=False),
]
)
.with_row_id(True)
.select(["id", "name"])
.to_list()

View File

@@ -47,6 +47,85 @@ def test_basic(mem_db: DBConnection):
assert table.to_arrow() == expected_data
def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection):
pd = pytest.importorskip("pandas")
data = pa.table({"id": [1, 2], "text": ["one", "two"]})
table = tmp_db.create_table("test_to_pandas_old_call", data=data)
expected = data.to_pandas()
pd.testing.assert_frame_equal(table.to_pandas(), expected)
def test_table_to_pandas_blob_bytes(tmp_db: DBConnection):
pytest.importorskip("lance")
data = pa.table(
{
"id": pa.array([1, 2], pa.int64()),
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
},
schema=pa.schema(
[
pa.field("id", pa.int64()),
pa.field(
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
),
]
),
)
table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data)
df = table.to_pandas(blob_mode="bytes")
assert df["blob"].tolist() == [b"hello", b"world"]
def test_table_to_pandas_kwargs(tmp_db: DBConnection):
pd = pytest.importorskip("pandas")
data = pa.table({"id": pa.array([1, 2], pa.int64())})
table = tmp_db.create_table("test_to_pandas_kwargs", data=data)
df = table.to_pandas(types_mapper=pd.ArrowDtype)
assert str(df["id"].dtype) == "int64[pyarrow]"
@pytest.mark.asyncio
async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection):
pytest.importorskip("lance")
data = pa.table(
{
"id": pa.array([1, 2], pa.int64()),
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
},
schema=pa.schema(
[
pa.field("id", pa.int64()),
pa.field(
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
),
]
),
)
table = await tmp_db_async.create_table(
"test_async_to_pandas_blob_bytes", data=data
)
df = await table.to_pandas(blob_mode="bytes")
assert df["blob"].tolist() == [b"hello", b"world"]
@pytest.mark.asyncio
async def test_async_table_to_pandas_kwargs(tmp_db_async: AsyncConnection):
pd = pytest.importorskip("pandas")
data = pa.table({"id": pa.array([1, 2], pa.int64())})
table = await tmp_db_async.create_table("test_async_to_pandas_kwargs", data=data)
df = await table.to_pandas(types_mapper=pd.ArrowDtype)
assert str(df["id"].dtype) == "int64[pyarrow]"
def test_create_table_infers_large_int_vectors(mem_db: DBConnection):
data = [{"vector": [0, 300]}]
@@ -1811,6 +1890,55 @@ def test_create_scalar_index(mem_db: DBConnection):
assert scalar_index.name == "custom_y_index"
def test_create_index_nested_field_paths(mem_db: DBConnection):
schema = pa.schema(
[
pa.field("metadata", pa.struct([pa.field("user_id", pa.int32())])),
pa.field(
"image",
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
),
]
)
data = pa.Table.from_pylist(
[
{
"metadata": {"user_id": i},
"image": {"embedding": [float(i), float(i + 1)]},
}
for i in range(256)
],
schema=schema,
)
table = mem_db.create_table("nested_index_paths", data=data)
table.create_scalar_index("metadata.user_id", name="metadata_user_id_idx")
table.create_index(
vector_column_name="image.embedding",
num_partitions=1,
num_sub_vectors=1,
name="image_embedding_idx",
)
indices = sorted(table.list_indices(), key=lambda idx: idx.name)
assert [(idx.name, idx.index_type, idx.columns) for idx in indices] == [
("image_embedding_idx", "IvfPq", ["image.embedding"]),
("metadata_user_id_idx", "BTree", ["metadata.user_id"]),
]
vector_results = (
table.search([0.0, 1.0], vector_column_name="image.embedding")
.limit(1)
.to_list()
)
assert len(vector_results) == 1
assert vector_results[0]["metadata"]["user_id"] == 0
filtered_results = table.search().where("metadata.user_id = 42").limit(1).to_list()
assert len(filtered_results) == 1
assert filtered_results[0]["metadata"]["user_id"] == 42
def test_empty_query(mem_db: DBConnection):
table = mem_db.create_table(
"my_table",

View File

@@ -15,8 +15,8 @@ use pyo3::{
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
use session::Session;
use table::{
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult,
Table, UpdateResult,
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
MergeResult, Table, UpdateResult,
};
pub mod arrow;
@@ -52,6 +52,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<AlterColumnsResult>()?;
m.add_class::<AddResult>()?;
m.add_class::<MergeResult>()?;
m.add_class::<LsmWriteSpec>()?;
m.add_class::<DeleteResult>()?;
m.add_class::<DropColumnsResult>()?;
m.add_class::<UpdateResult>()?;

View File

@@ -23,7 +23,7 @@ use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::QueryFilter;
use lancedb::query::{
ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
ColumnOrdering, ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
VectorQuery as LanceDbVectorQuery,
};
use lancedb::table::AnyQuery;
@@ -207,6 +207,48 @@ impl<'py> IntoPyObject<'py> for PyLanceDB<FtsQuery> {
#[derive(Clone)]
pub struct PyQueryVectors(Vec<Arc<dyn Array>>);
#[derive(Clone, FromPyObject)]
#[pyo3(from_item_all)]
pub struct PyColumnOrdering {
pub column_name: String,
pub ascending: bool,
pub nulls_first: bool,
}
impl From<ColumnOrdering> for PyColumnOrdering {
fn from(ordering: ColumnOrdering) -> Self {
Self {
column_name: ordering.column_name,
ascending: ordering.ascending,
nulls_first: ordering.nulls_first,
}
}
}
impl From<PyColumnOrdering> for ColumnOrdering {
fn from(ordering: PyColumnOrdering) -> Self {
Self {
column_name: ordering.column_name,
ascending: ordering.ascending,
nulls_first: ordering.nulls_first,
}
}
}
impl<'py> IntoPyObject<'py> for PyColumnOrdering {
type Target = PyDict;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;
fn into_pyobject(self, py: pyo3::Python<'py>) -> PyResult<Self::Output> {
let dict = PyDict::new(py);
dict.set_item("column_name", self.column_name)?;
dict.set_item("ascending", self.ascending)?;
dict.set_item("nulls_first", self.nulls_first)?;
Ok(dict)
}
}
impl<'py> IntoPyObject<'py> for PyQueryVectors {
type Target = PyList;
type Output = Bound<'py, Self::Target>;
@@ -246,6 +288,7 @@ pub struct PyQueryRequest {
pub bypass_vector_index: Option<bool>,
pub postfilter: Option<bool>,
pub norm: Option<String>,
pub order_by: Option<Vec<PyColumnOrdering>>,
}
impl From<AnyQuery> for PyQueryRequest {
@@ -273,6 +316,9 @@ impl From<AnyQuery> for PyQueryRequest {
bypass_vector_index: None,
postfilter: None,
norm: None,
order_by: query_request
.order_by
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
},
AnyQuery::VectorQuery(vector_query) => Self {
limit: vector_query.base.limit,
@@ -297,6 +343,10 @@ impl From<AnyQuery> for PyQueryRequest {
bypass_vector_index: Some(!vector_query.use_index),
postfilter: Some(!vector_query.base.prefilter),
norm: vector_query.base.norm.map(|n| n.to_string()),
order_by: vector_query
.base
.order_by
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
},
}
}
@@ -475,6 +525,13 @@ impl Query {
})
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[pyo3(signature = ())]
pub fn output_schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
@@ -647,6 +704,13 @@ impl FTSQuery {
self.inner = self.inner.clone().offset(offset as usize);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search();
}
@@ -782,6 +846,13 @@ impl VectorQuery {
self.inner = self.inner.clone().offset(offset as usize);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search();
}
@@ -954,6 +1025,12 @@ impl HybridQuery {
self.inner_fts.offset(offset);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
self.inner_vec.order_by(ordering.clone())?;
self.inner_fts.order_by(ordering)?;
Ok(())
}
pub fn fast_search(&mut self) {
self.inner_vec.fast_search();
self.inner_fts.fast_search();

View File

@@ -171,6 +171,141 @@ impl From<lancedb::table::MergeResult> for MergeResult {
}
}
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `merge_insert`.
///
/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()`
/// classmethods, then optionally chain `with_maintained_indexes(...)` and
/// `with_writer_config_defaults(...)`.
#[pyclass(from_py_object)]
#[derive(Clone, Debug)]
pub struct LsmWriteSpec {
inner: lancedb::table::LsmWriteSpec,
}
#[pymethods]
impl LsmWriteSpec {
/// Hash-bucket sharding by the unenforced primary key column.
#[staticmethod]
pub fn bucket(column: String, num_buckets: u32) -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets),
}
}
/// Identity sharding — shard by the raw value of `column`.
#[staticmethod]
pub fn identity(column: String) -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::identity(column),
}
}
/// No sharding — every `merge_insert` call writes to a single
/// MemWAL shard.
#[staticmethod]
pub fn unsharded() -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::unsharded(),
}
}
/// Replace the list of indexes the MemWAL should keep up to date as
/// rows are appended. Each name must reference an index that
/// already exists on the table at the time `set_lsm_write_spec`
/// is called.
pub fn with_maintained_indexes(&self, indexes: Vec<String>) -> Self {
Self {
inner: self.inner.clone().with_maintained_indexes(indexes),
}
}
/// Replace the default `ShardWriter` configuration recorded in the
/// MemWAL index, so every writer starts from the same defaults.
pub fn with_writer_config_defaults(&self, defaults: HashMap<String, String>) -> Self {
Self {
inner: self.inner.clone().with_writer_config_defaults(defaults),
}
}
pub fn __repr__(&self) -> String {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket {
column,
num_buckets,
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})",
column, num_buckets, maintained_indexes, writer_config_defaults,
),
lancedb::table::LsmWriteSpec::Identity {
column,
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})",
column, maintained_indexes, writer_config_defaults,
),
lancedb::table::LsmWriteSpec::Unsharded {
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})",
maintained_indexes, writer_config_defaults,
),
}
}
/// Discriminator string identifying the variant ("bucket", "identity",
/// or "unsharded").
#[getter]
pub fn spec_type(&self) -> &'static str {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket",
lancedb::table::LsmWriteSpec::Identity { .. } => "identity",
lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded",
}
}
/// Bucket and identity variants: the sharding column. `None` for unsharded.
#[getter]
pub fn column(&self) -> Option<String> {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { column, .. }
| lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()),
lancedb::table::LsmWriteSpec::Unsharded { .. } => None,
}
}
/// Bucket variant only: the number of buckets.
#[getter]
pub fn num_buckets(&self) -> Option<u32> {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets),
_ => None,
}
}
/// Names of indexes the MemWAL should keep up to date during writes.
#[getter]
pub fn maintained_indexes(&self) -> Vec<String> {
self.inner.maintained_indexes().to_vec()
}
/// Default `ShardWriter` configuration recorded by this spec.
#[getter]
pub fn writer_config_defaults(&self) -> HashMap<String, String> {
self.inner.writer_config_defaults().clone()
}
}
impl From<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
fn from(spec: LsmWriteSpec) -> Self {
spec.inner
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddColumnsResult {
@@ -805,6 +940,37 @@ impl Table {
})
}
pub fn set_unenforced_primary_key<'a>(
self_: PyRef<'a, Self>,
columns: Vec<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.set_unenforced_primary_key(columns)
.await
.infer_error()
})
}
pub fn set_lsm_write_spec<'a>(
self_: PyRef<'a, Self>,
spec: LsmWriteSpec,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
let native_spec = lancedb::table::LsmWriteSpec::from(spec);
future_into_py(self_.py(), async move {
inner.set_lsm_write_spec(native_spec).await.infer_error()
})
}
pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.unset_lsm_write_spec().await.infer_error()
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.28.0-beta.11"
version = "0.29.1-beta.0"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -271,15 +271,26 @@ impl Scannable for WithEmbeddingsScannable {
.map_err(|e| Error::Runtime {
message: format!("Task panicked during embedding computation: {}", e),
})??;
// Cast columns to match the declared output schema. The data is
// identical but field metadata (e.g. nested nullability) may
// differ between the embedding function output and the table.
let columns: Vec<ArrayRef> = result
.columns()
// Look up columns by name (not position) so the result matches
// the output schema even when columns appear in a different
// order — e.g. `add_columns` placed a new column after the
// embedding column, but the computed batch appends embeddings
// at the end. Cast per-column because field metadata (e.g.
// nested nullability) may also differ between the embedding
// function output and the table.
let columns: Vec<ArrayRef> = output_schema
.fields()
.iter()
.enumerate()
.map(|(i, col)| {
let target_type = output_schema.field(i).data_type();
.map(|field| {
let col = result.column_by_name(field.name()).ok_or_else(|| {
Error::InvalidInput {
message: format!(
"Column '{}' required by the table schema was not present in the input batch",
field.name()
),
}
})?;
let target_type = field.data_type();
if col.data_type() == target_type {
Ok(col.clone())
} else {
@@ -964,5 +975,118 @@ mod tests {
"Expected EmbeddingFunctionNotFound"
);
}
/// Regression test for https://github.com/lancedb/lancedb/issues/3136.
///
/// When a column is added to the table after the embedding column via
/// schema evolution, the table schema becomes
/// `[..., embedding, extra]`. The input batch (without the embedding)
/// is `[..., extra]`, and `compute_embeddings_for_batch` appends the
/// embedding at the end giving `[..., extra, embedding]`. A positional
/// cast to the output schema would map `extra` onto `embedding` and
/// fail with a CastError. Columns must be matched by name.
#[tokio::test]
async fn test_with_embeddings_scannable_column_added_after_embedding() {
let input_schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new("score", DataType::Float64, true),
]));
let batch = RecordBatch::try_new(
input_schema.clone(),
vec![
Arc::new(StringArray::from(vec!["hello", "world"])) as ArrayRef,
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
],
)
.unwrap();
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
// Table schema: embedding column is BEFORE `score`, as would
// happen if `score` was added via `add_columns` after creating
// the table with an embedding on `text`.
let output_schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new(
"text_vec",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
4,
),
false,
),
Field::new("score", DataType::Float64, true),
]));
let mut scannable = WithEmbeddingsScannable::with_schema(
Box::new(batch),
vec![(embedding_def, mock_embedding)],
output_schema.clone(),
)
.unwrap();
let stream = scannable.scan_as_stream();
let results: Vec<RecordBatch> = stream.try_collect().await.unwrap();
assert_eq!(results.len(), 1);
let result_batch = &results[0];
assert_eq!(result_batch.schema(), output_schema);
assert_eq!(result_batch.num_rows(), 2);
// Position 1 must actually hold the FixedSizeList embedding —
// not the score column reinterpreted by a permissive cast.
let embedding = result_batch
.column(1)
.as_any()
.downcast_ref::<arrow_array::FixedSizeListArray>()
.expect("position 1 should be a FixedSizeList embedding");
assert_eq!(embedding.value_length(), 4);
assert_eq!(embedding.null_count(), 0);
}
/// If the input batch is missing a non-embedding column required by
/// the table schema, we should return a clear error rather than
/// silently producing a malformed batch.
#[tokio::test]
async fn test_with_embeddings_scannable_missing_required_column() {
let input_schema =
Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
let batch = RecordBatch::try_new(
input_schema,
vec![Arc::new(StringArray::from(vec!["hello", "world"])) as ArrayRef],
)
.unwrap();
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
let output_schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new(
"text_vec",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
4,
),
false,
),
Field::new("score", DataType::Float64, true),
]));
let mut scannable = WithEmbeddingsScannable::with_schema(
Box::new(batch),
vec![(embedding_def, mock_embedding)],
output_schema,
)
.unwrap();
let stream = scannable.scan_as_stream();
let results: Result<Vec<RecordBatch>> = stream.try_collect().await;
let err = results.expect_err("expected an error");
assert!(
matches!(&err, Error::InvalidInput { message } if message.contains("score")),
"expected InvalidInput about missing 'score' column, got: {err:?}"
);
}
}
}

View File

@@ -450,6 +450,10 @@ impl PermutationReader {
}
pub async fn take_offsets(&self, offsets: &[u64], selection: Select) -> Result<RecordBatch> {
if offsets.is_empty() {
return Ok(RecordBatch::new_empty(self.output_schema(selection).await?));
}
if let Some(permutation_table) = &self.permutation_table {
let offset_map = self.get_offset_map(permutation_table).await?;
let row_ids = offsets
@@ -955,4 +959,62 @@ mod tests {
.to_vec();
assert_eq!(idx_values, &all_idx_values[4997..5000]);
}
#[tokio::test]
async fn test_take_offsets_empty_identity_reader() {
let base_table = lance_datagen::gen_batch()
.col("idx", lance_datagen::array::step::<Int32Type>())
.into_mem_table("tbl", RowCount::from(10), BatchCount::from(1))
.await;
let reader = PermutationReader::identity(base_table.base_table().clone()).await;
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "idx");
}
#[tokio::test]
async fn test_take_offsets_empty_with_permutation_table() {
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
let reader = PermutationReader::try_from_tables(
base_table.base_table().clone(),
row_ids_table.base_table().clone(),
0,
)
.await
.unwrap();
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.schema().fields().len(), 2);
assert_eq!(batch.schema().field(0).name(), "idx");
assert_eq!(batch.schema().field(1).name(), "other_col");
}
#[tokio::test]
async fn test_take_offsets_empty_with_column_selection() {
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
let reader = PermutationReader::try_from_tables(
base_table.base_table().clone(),
row_ids_table.base_table().clone(),
0,
)
.await
.unwrap();
let batch = reader
.take_offsets(&[], Select::Columns(vec!["idx".to_string()]))
.await
.unwrap();
assert_eq!(batch.num_rows(), 0);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.schema().field(0).name(), "idx");
}
}

View File

@@ -11,6 +11,8 @@ use datafusion_expr::Expr;
use datafusion_physical_plan::ExecutionPlan;
use futures::{FutureExt, TryFutureExt, TryStreamExt, stream, try_join};
use half::f16;
/// Re-export Lance ColumnOrdering type for use in query ordering
pub use lance::dataset::scanner::ColumnOrdering;
use lance::dataset::{ROW_ID, scanner::DatasetRecordBatchStream};
use lance_arrow::RecordBatchExt;
use lance_datafusion::exec::execute_plan;
@@ -510,6 +512,11 @@ pub trait QueryBase {
/// the scores are converted to ranks and then normalized. If "Score", the
/// scores are normalized directly.
fn norm(self, norm: NormalizeMethod) -> Self;
/// Sort the results by the specified column(s).
///
/// This allows ordering query results by one or more columns in either ascending or descending order.
fn order_by(self, ordering: Option<Vec<ColumnOrdering>>) -> Self;
}
pub trait HasQuery {
@@ -574,6 +581,11 @@ impl<T: HasQuery> QueryBase for T {
self.mut_query().norm = Some(norm);
self
}
fn order_by(mut self, ordering: Option<Vec<ColumnOrdering>>) -> Self {
self.mut_query().order_by = ordering;
self
}
}
/// Options for controlling the execution of a query
@@ -750,6 +762,11 @@ pub struct QueryRequest {
///
/// By default, this is false (scoring columns are auto-projected for backward compatibility).
pub disable_scoring_autoprojection: bool,
/// Sort the results by the specified column(s).
///
/// This allows ordering query results by one or more columns in either ascending or descending order.
pub order_by: Option<Vec<ColumnOrdering>>,
}
impl Default for QueryRequest {
@@ -766,6 +783,7 @@ impl Default for QueryRequest {
reranker: None,
norm: None,
disable_scoring_autoprojection: false,
order_by: None,
}
}
}

View File

@@ -518,6 +518,21 @@ impl<S: HttpSend> RemoteTable<S> {
}
}
if let Some(order_by) = &params.order_by {
body["order_by"] = serde_json::Value::Array(
order_by
.iter()
.map(|o| {
serde_json::json!({
"column_name": o.column_name,
"ascending": o.ascending,
"nulls_first": o.nulls_first,
})
})
.collect(),
);
}
Ok(())
}
@@ -1652,6 +1667,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(merge_insert_response)
}
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
Err(Error::NotSupported {
message: "set_unenforced_primary_key is not supported on LanceDB cloud.".into(),
})
}
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
Err(Error::NotSupported {
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
})
}
async fn unset_lsm_write_spec(&self) -> Result<()> {
Err(Error::NotSupported {
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
})
}
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
Ok(Box::new(RemoteTags { inner: self }))
}
@@ -2078,7 +2111,7 @@ mod tests {
use crate::{
DistanceType, Error, Table,
index::{Index, IndexStatistics, IndexType, vector::IvfPqIndexBuilder},
query::{ExecutableQuery, QueryBase},
query::{ColumnOrdering, ExecutableQuery, QueryBase},
remote::ARROW_FILE_CONTENT_TYPE,
};
@@ -2988,6 +3021,18 @@ mod tests {
"distance_type": "cosine",
"bypass_vector_index": true,
"columns": ["a", "b"],
"order_by": [
{
"column_name": "score",
"ascending": false,
"nulls_first": true,
},
{
"column_name": "id",
"ascending": true,
"nulls_first": false,
}
],
"nprobes": 12,
"minimum_nprobes": 12,
"maximum_nprobes": 12,
@@ -3019,6 +3064,10 @@ mod tests {
.limit(42)
.offset(10)
.select(Select::columns(&["a", "b"]))
.order_by(Some(vec![
ColumnOrdering::desc_nulls_first("score".to_string()),
ColumnOrdering::asc_nulls_last("id".to_string()),
]))
.nearest_to(vec![0.1, 0.2, 0.3])
.unwrap()
.column("my_vector")

File diff suppressed because it is too large Load Diff

View File

@@ -268,7 +268,9 @@ mod tests {
};
use crate::query::{ExecutableQuery, QueryBase, Select};
use crate::table::add_data::NaNVectorBehavior;
use crate::table::{ColumnDefinition, ColumnKind, Table, TableDefinition, WriteOptions};
use crate::table::{
ColumnDefinition, ColumnKind, NewColumnTransform, Table, TableDefinition, WriteOptions,
};
use crate::test_utils::TestCustomError;
use crate::test_utils::embeddings::MockEmbed;
@@ -518,6 +520,225 @@ mod tests {
}
}
/// Regression test for https://github.com/lancedb/lancedb/issues/3136.
///
/// When a column is added via `add_columns` AFTER an embedding column,
/// the table schema becomes `[..., embedding, extra]`. Subsequent
/// `table.add()` calls used to fail with a CastError because columns
/// were matched positionally rather than by name.
#[tokio::test]
async fn test_add_with_embeddings_after_add_columns() {
let registry = Arc::new(MemoryRegistry::new());
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
registry.register("mock", mock_embedding).unwrap();
let conn = connect("memory://")
.embedding_registry(registry)
.execute()
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new(
"text_vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
false,
),
]));
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
let table_def = TableDefinition::new(
schema.clone(),
vec![
ColumnDefinition {
kind: ColumnKind::Physical,
},
ColumnDefinition {
kind: ColumnKind::Embedding(embedding_def),
},
],
);
let rich_schema = table_def.into_rich_schema();
let table = conn
.create_empty_table("embed_evol_test", rich_schema)
.execute()
.await
.unwrap();
// Seed a row so add_columns has data to compute against.
let seed_batch = record_batch!(("text", Utf8, ["hello"])).unwrap();
table.add(seed_batch).execute().await.unwrap();
// Add a new physical column AFTER the embedding column.
table
.add_columns(
NewColumnTransform::SqlExpressions(vec![("score".into(), "42.0".into())]),
None,
)
.await
.unwrap();
// Now add data including the new column but WITHOUT the embedding.
// The input batch column order is [text, score]; after computing the
// embedding it becomes [text, score, text_vec], but the table schema
// is [text, text_vec, score]. Columns must be matched by name.
let new_schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new("score", DataType::Float64, true),
]));
let new_batch = RecordBatch::try_new(
new_schema,
vec![
Arc::new(arrow_array::StringArray::from(vec!["foo", "bar"])),
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0])),
],
)
.unwrap();
table.add(new_batch).execute().await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 3);
let results: Vec<RecordBatch> = table
.query()
.select(Select::columns(&["text", "text_vec", "score"]))
.execute()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
for batch in &results {
// text_vec must be populated for the newly added rows too.
assert_eq!(batch.column(1).null_count(), 0);
}
}
/// Like `test_add_with_embeddings_after_add_columns`, but the column
/// added after the embedding is a nested struct rather than a scalar.
/// Verifies that name-based column matching also works when the
/// post-embedding column has a complex Arrow type.
#[tokio::test]
async fn test_add_with_embeddings_after_add_nested_columns() {
let registry = Arc::new(MemoryRegistry::new());
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
registry.register("mock", mock_embedding).unwrap();
let conn = connect("memory://")
.embedding_registry(registry)
.execute()
.await
.unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new(
"text_vec",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
false,
),
]));
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
let table_def = TableDefinition::new(
schema,
vec![
ColumnDefinition {
kind: ColumnKind::Physical,
},
ColumnDefinition {
kind: ColumnKind::Embedding(embedding_def),
},
],
);
let rich_schema = table_def.into_rich_schema();
let table = conn
.create_empty_table("embed_nested_test", rich_schema)
.execute()
.await
.unwrap();
let seed_batch = record_batch!(("text", Utf8, ["hello"])).unwrap();
table.add(seed_batch).execute().await.unwrap();
// Add a STRUCT column after the embedding column.
let meta_struct = DataType::Struct(
vec![
Field::new("source", DataType::Utf8, true),
Field::new("score", DataType::Float64, true),
]
.into(),
);
let nested_schema = Arc::new(Schema::new(vec![Field::new(
"meta",
meta_struct.clone(),
true,
)]));
table
.add_columns(NewColumnTransform::AllNulls(nested_schema), None)
.await
.unwrap();
// Insert with the nested struct present but the embedding column
// absent. The computed batch is [text, meta, text_vec], but the
// table schema is [text, text_vec, meta] — only name-based matching
// can put `meta` (a struct) in the right slot.
let source = Arc::new(arrow_array::StringArray::from(vec!["foo", "bar"]));
let score = Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0]));
let meta = Arc::new(arrow_array::StructArray::from(vec![
(
Arc::new(Field::new("source", DataType::Utf8, true)),
source as Arc<dyn arrow_array::Array>,
),
(
Arc::new(Field::new("score", DataType::Float64, true)),
score as Arc<dyn arrow_array::Array>,
),
]));
let new_schema = Arc::new(Schema::new(vec![
Field::new("text", DataType::Utf8, false),
Field::new("meta", meta_struct, true),
]));
let new_batch = RecordBatch::try_new(
new_schema,
vec![
Arc::new(arrow_array::StringArray::from(vec!["foo", "bar"])),
meta,
],
)
.unwrap();
table.add(new_batch).execute().await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 3);
let results: Vec<RecordBatch> = table
.query()
.select(Select::columns(&["text", "text_vec", "meta"]))
.execute()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 3);
for batch in &results {
assert_eq!(batch.schema().field(2).name(), "meta");
assert!(matches!(
batch.schema().field(2).data_type(),
DataType::Struct(_)
));
// text_vec must be populated for the newly added rows too.
assert_eq!(batch.column(1).null_count(), 0);
}
}
#[tokio::test]
async fn test_add_casts_to_table_schema() {
let table_schema = Arc::new(Schema::new(vec![

View File

@@ -16,6 +16,8 @@ use crate::error::{Error, Result};
use super::{BaseTable, NativeTable};
pub(crate) mod lsm;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct MergeResult {
// The commit version associated with the operation.

View File

@@ -0,0 +1,101 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! MemWAL LSM write-path spec management.
//!
//! [`set_lsm_write_spec`] installs a [`super::super::LsmWriteSpec`] on a
//! table, which selects Lance's MemWAL LSM-style write path for future
//! `merge_insert` calls. [`unset_lsm_write_spec`] removes it. The actual
//! `merge_insert` dispatch and writer are a follow-up.
use lance::dataset::mem_wal::DatasetMemWalExt;
use lance::index::DatasetIndexExt;
use crate::error::{Error, Result};
use crate::table::{LsmWriteSpec, NativeTable};
// =============================================================================
// set_lsm_write_spec
// =============================================================================
/// Install an [`LsmWriteSpec`] on the table.
///
/// The bucket / unsharded sharding spec is constructed and validated by Lance's
/// [`InitializeMemWalBuilder`](lance::dataset::mem_wal::InitializeMemWalBuilder).
#[allow(clippy::redundant_pub_crate)]
pub(crate) async fn set_lsm_write_spec(table: &NativeTable, spec: LsmWriteSpec) -> Result<()> {
table.dataset.ensure_mutable()?;
{
let dataset = table.dataset.get().await?;
if dataset.mem_wal_index_details().await?.is_some() {
return Err(Error::InvalidInput {
message: "set_lsm_write_spec: an LSM write spec is already set on this table; mutation is not supported".into(),
});
}
}
let mut dataset = (*table.dataset.get().await?).clone();
let mut builder = dataset.initialize_mem_wal();
let (maintained_indexes, writer_config_defaults) = match spec {
LsmWriteSpec::Bucket {
column,
num_buckets,
maintained_indexes,
writer_config_defaults,
} => {
builder = builder.bucket_sharding(column, num_buckets);
(maintained_indexes, writer_config_defaults)
}
LsmWriteSpec::Identity {
column,
maintained_indexes,
writer_config_defaults,
} => {
builder = builder.identity_sharding(column);
(maintained_indexes, writer_config_defaults)
}
LsmWriteSpec::Unsharded {
maintained_indexes,
writer_config_defaults,
} => {
builder = builder.unsharded();
(maintained_indexes, writer_config_defaults)
}
};
builder = builder.maintained_indexes(maintained_indexes);
for (key, value) in writer_config_defaults {
builder = builder.add_writer_config_default(key, value);
}
builder.execute().await?;
table.dataset.update(dataset);
Ok(())
}
// =============================================================================
// unset_lsm_write_spec
// =============================================================================
/// Remove the [`LsmWriteSpec`] from the table by dropping the MemWAL index.
///
/// Errors if no spec is currently set.
#[allow(clippy::redundant_pub_crate)]
pub(crate) async fn unset_lsm_write_spec(table: &NativeTable) -> Result<()> {
table.dataset.ensure_mutable()?;
{
let dataset = table.dataset.get().await?;
if dataset.mem_wal_index_details().await?.is_none() {
return Err(Error::InvalidInput {
message: "unset_lsm_write_spec: no LSM write spec is set on this table".into(),
});
}
}
let mut dataset = (*table.dataset.get().await?).clone();
dataset
.drop_index(lance_index::mem_wal::MEM_WAL_INDEX_NAME)
.await?;
table.dataset.update(dataset);
Ok(())
}

View File

@@ -0,0 +1,113 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Table-level unenforced primary key support.
//!
//! [`set_unenforced_primary_key`] records a column as the table's primary key
//! by writing Lance schema field metadata. "Unenforced" means LanceDB does not
//! check uniqueness on write; the key is metadata for features such as
//! `merge_insert` to consume.
//!
//! Only a single-column primary key is supported, and the key cannot be
//! changed once set.
use arrow_schema::DataType;
use lance_core::datatypes::{LANCE_UNENFORCED_PRIMARY_KEY, LANCE_UNENFORCED_PRIMARY_KEY_POSITION};
use crate::error::{Error, Result};
use crate::table::NativeTable;
/// Set the unenforced primary key on `table` to the single column in `columns`.
///
/// Fails if `columns` is not exactly one column (compound primary keys are not
/// supported), if the column does not exist or has an unsupported dtype, or if
/// the table already has an unenforced primary key (changing the primary key
/// is not supported).
pub(super) async fn set_unenforced_primary_key(
table: &NativeTable,
columns: &[&str],
) -> Result<()> {
table.dataset.ensure_mutable()?;
if columns.is_empty() {
return Err(Error::InvalidInput {
message: "set_unenforced_primary_key: a column is required".into(),
});
}
if columns.len() > 1 {
return Err(Error::InvalidInput {
message: format!(
"set_unenforced_primary_key: compound primary keys are not supported, got {} columns",
columns.len()
),
});
}
let column = columns[0];
let updates = {
let dataset = table.dataset.get().await?;
let schema = dataset.schema();
// The primary key is immutable once set. The Lance commit layer is the
// source of truth for this (it also covers the concurrent-writer race);
// this check just fails fast with a clear message.
if !schema.unenforced_primary_key().is_empty() {
return Err(Error::InvalidInput {
message: "set_unenforced_primary_key: an unenforced primary key is already set on this table; changing it is not supported".into(),
});
}
let field = schema.field(column).ok_or_else(|| Error::InvalidInput {
message: format!(
"set_unenforced_primary_key: column '{}' not found on table",
column
),
})?;
if !is_supported_pk_dtype(&field.data_type()) {
return Err(Error::InvalidInput {
message: format!(
"set_unenforced_primary_key: column '{}' has dtype {:?} which is not supported as a primary key. Supported: Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary",
column,
field.data_type()
),
});
}
// Position metadata is 1-indexed; `Schema::unenforced_primary_key`
// treats position 0 as a legacy "no specific position" fallback.
let mut metadata = field.metadata.clone();
metadata.remove(LANCE_UNENFORCED_PRIMARY_KEY);
metadata.insert(
LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
"1".to_string(),
);
vec![(field_id_to_u32(field.id, &field.name)?, metadata)]
};
let mut dataset = (*table.dataset.get().await?).clone();
dataset.replace_field_metadata(updates).await?;
table.dataset.update(dataset);
Ok(())
}
fn field_id_to_u32(id: i32, name: &str) -> Result<u32> {
u32::try_from(id).map_err(|_| Error::Runtime {
message: format!(
"internal: field '{}' has unexpected negative field id {}",
name, id
),
})
}
fn is_supported_pk_dtype(dtype: &DataType) -> bool {
matches!(
dtype,
DataType::Int32
| DataType::Int64
| DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::FixedSizeBinary(_)
)
}

View File

@@ -242,6 +242,10 @@ pub async fn create_plan(
scanner.disable_scoring_autoprojection();
}
if let Some(order_by) = &query.base.order_by {
scanner.order_by(Some(order_by.clone()))?;
}
Ok(scanner.create_plan().await?)
}