mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-23 15:00:39 +00:00
Compare commits
35 Commits
xuanwo/nat
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6310ed905 | ||
|
|
ccec91d957 | ||
|
|
ec82e36317 | ||
|
|
da2a1c4a2c | ||
|
|
8463a10ebe | ||
|
|
7168d64af1 | ||
|
|
403c33dff0 | ||
|
|
a0001043b6 | ||
|
|
1bb7acb74f | ||
|
|
4ce175276c | ||
|
|
4bccb43e56 | ||
|
|
d5dc4c0f06 | ||
|
|
55ae6197c1 | ||
|
|
15bd821825 | ||
|
|
cf162c8a10 | ||
|
|
2eba7ebd02 | ||
|
|
2d5298b6ee | ||
|
|
4cb9147bbf | ||
|
|
54a1982ef1 | ||
|
|
5bfde47a8e | ||
|
|
049b0c8f09 | ||
|
|
20556e23a9 | ||
|
|
01e272c0b0 | ||
|
|
ad1634a0a5 | ||
|
|
5d1c28922a | ||
|
|
53c2164b84 | ||
|
|
6286ee8192 | ||
|
|
af8ca2ad5e | ||
|
|
aac6c62459 | ||
|
|
8df2fff75f | ||
|
|
0d30b31998 | ||
|
|
6a431ff0a0 | ||
|
|
ab2c5adf5e | ||
|
|
f02c4cad90 | ||
|
|
7b74c3dd91 |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.28.0-beta.11"
|
||||
current_version = "0.30.0-beta.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
5
.github/dependabot.yml
vendored
5
.github/dependabot.yml
vendored
@@ -11,6 +11,11 @@ updates:
|
||||
schedule:
|
||||
interval: weekly
|
||||
open-pull-requests-limit: 10
|
||||
# Only update Cargo.lock, never widen/raise the version requirements in
|
||||
# Cargo.toml. The goal is keeping the lockfile (and the binaries we ship)
|
||||
# current on security fixes, not forcing our library's consumers onto
|
||||
# newer minimum versions.
|
||||
versioning-strategy: lockfile-only
|
||||
groups:
|
||||
rust-minor-patch:
|
||||
update-types:
|
||||
|
||||
5
.github/workflows/nodejs.yml
vendored
5
.github/workflows/nodejs.yml
vendored
@@ -157,7 +157,10 @@ jobs:
|
||||
npx jest --testEnvironment jest-environment-node-single-context --verbose
|
||||
macos:
|
||||
timeout-minutes: 30
|
||||
runs-on: "macos-14"
|
||||
# macos-15 ships a newer linker; the older macos-14 linker fails to insert
|
||||
# branch islands when the debug cdylib's __text section exceeds the 128 MB
|
||||
# AArch64 B/BL branch range.
|
||||
runs-on: "macos-15"
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
|
||||
2
.github/workflows/python.yml
vendored
2
.github/workflows/python.yml
vendored
@@ -205,7 +205,7 @@ jobs:
|
||||
- name: Delete wheels
|
||||
run: rm -rf target/wheels
|
||||
pydantic1x:
|
||||
timeout-minutes: 30
|
||||
timeout-minutes: 60
|
||||
runs-on: "ubuntu-24.04"
|
||||
defaults:
|
||||
run:
|
||||
|
||||
20
.github/workflows/rust.yml
vendored
20
.github/workflows/rust.yml
vendored
@@ -233,6 +233,26 @@ jobs:
|
||||
cargo update -p aws-sdk-sso --precise 1.62.0
|
||||
cargo update -p aws-sdk-ssooidc --precise 1.63.0
|
||||
cargo update -p aws-sdk-sts --precise 1.63.0
|
||||
# aws-runtime/sigv4/credential-types/types and the aws-smithy-*
|
||||
# crates bumped their MSRV to 1.91.1 in late 2026; pin to the last
|
||||
# 1.91.0-compatible versions. The order matters — each downgrade
|
||||
# only succeeds once everything that still pins it at a higher
|
||||
# version has itself been downgraded.
|
||||
cargo update -p aws-runtime --precise 1.5.12
|
||||
cargo update -p aws-types --precise 1.3.9
|
||||
cargo update -p aws-sigv4 --precise 1.3.5
|
||||
cargo update -p aws-credential-types --precise 1.2.8
|
||||
cargo update -p aws-smithy-checksums --precise 0.63.9
|
||||
cargo update -p aws-smithy-runtime --precise 1.9.3
|
||||
cargo update -p aws-smithy-http --precise 0.62.4
|
||||
cargo update -p aws-smithy-eventstream --precise 0.60.12
|
||||
cargo update -p aws-smithy-http-client --precise 1.1.3
|
||||
cargo update -p aws-smithy-observability --precise 0.1.4
|
||||
cargo update -p aws-smithy-query --precise 0.60.8
|
||||
cargo update -p aws-smithy-runtime-api --precise 1.9.1
|
||||
cargo update -p aws-smithy-async --precise 1.2.6
|
||||
cargo update -p aws-smithy-types --precise 1.3.5
|
||||
cargo update -p aws-smithy-xml --precise 0.60.11
|
||||
cargo update -p home --precise 0.5.9
|
||||
- name: cargo +${{ matrix.msrv }} check
|
||||
env:
|
||||
|
||||
28
AGENTS.md
28
AGENTS.md
@@ -17,9 +17,33 @@ Common commands:
|
||||
* Run tests: `cargo test --quiet --features remote --tests`
|
||||
* Run specific test: `cargo test --quiet --features remote -p <package_name> --test <test_name>`
|
||||
* Lint: `cargo clippy --quiet --features remote --tests --examples`
|
||||
* Format: `cargo fmt --all`
|
||||
* Format Rust: `cargo fmt --all`
|
||||
* Format Python: `ruff format .`
|
||||
* Lint Python: `ruff check .`
|
||||
* Bootstrap Python dev env: `cd python && uv run --extra tests --extra dev maturin develop --extras tests,dev`
|
||||
* Run Python tests: `cd python && uv run --extra tests pytest python/tests -vv --durations=10 -m "not slow and not s3_test"`
|
||||
* Run specific Python test: `cd python && uv run --extra tests pytest python/tests/<test_file>.py::<test_name> -q`
|
||||
|
||||
Before committing changes, run formatting.
|
||||
For Python validation, prefer the uv-managed environment declared by `python/uv.lock`.
|
||||
Do not treat system `python`, global `pytest`, or missing editable-install errors as
|
||||
final blockers; bootstrap or enter the uv environment instead. If `lancedb._lancedb`
|
||||
is missing or stale, or if Rust/PyO3 binding code changed, rebuild the Python
|
||||
extension with the bootstrap command above before running tests.
|
||||
|
||||
Before committing changes, run formatting for every language you touched. At minimum:
|
||||
|
||||
* Rust changes: run `cargo fmt --all`.
|
||||
* Python changes: run `ruff format .` and `ruff check .` from the repository root,
|
||||
and run targeted tests through `cd python && uv run ...`.
|
||||
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
|
||||
|
||||
Before creating a PR, the exact value passed to `gh pr create --title` must follow
|
||||
Conventional Commits, such as `fix: support nested field paths in native index creation`
|
||||
or `feat(python): add dataset multiprocessing support`. Do not use a plain natural
|
||||
language summary like `Support nested field paths in native index creation` as the PR
|
||||
title. The semantic-release check uses the PR title and body as the merge commit message,
|
||||
so a non-conventional PR title will fail CI. After creating a PR, read the remote PR title
|
||||
back and fix it immediately if it is not conventional.
|
||||
|
||||
## Coding tips
|
||||
|
||||
|
||||
1659
Cargo.lock
generated
1659
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.1.0-beta.2", default-features = false, "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.1.0-beta.2", default-features = false, "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.1.0-beta.2", default-features = false, "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
@@ -112,25 +112,25 @@ def fetch_remote_tags() -> List[TagInfo]:
|
||||
"api",
|
||||
"-X",
|
||||
"GET",
|
||||
f"repos/{LANCE_REPO}/git/refs/tags",
|
||||
"--paginate",
|
||||
f"repos/{LANCE_REPO}/releases",
|
||||
"--jq",
|
||||
".[].ref",
|
||||
".[].tag_name",
|
||||
"-F",
|
||||
"per_page=20",
|
||||
]
|
||||
)
|
||||
tags: List[TagInfo] = []
|
||||
for line in output.splitlines():
|
||||
ref = line.strip()
|
||||
if not ref.startswith("refs/tags/v"):
|
||||
tag = line.strip()
|
||||
if not tag.startswith("v"):
|
||||
continue
|
||||
tag = ref.split("refs/tags/")[-1]
|
||||
version = tag.lstrip("v")
|
||||
try:
|
||||
tags.append(TagInfo(tag=tag, version=version, semver=parse_semver(version)))
|
||||
except ValueError:
|
||||
continue
|
||||
if not tags:
|
||||
raise RuntimeError("No Lance tags could be parsed from GitHub API output")
|
||||
raise RuntimeError("No Lance releases could be parsed from GitHub API output")
|
||||
return tags
|
||||
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.28.0-beta.11</version>
|
||||
<version>0.30.0-beta.1</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -441,18 +441,28 @@ Open a table in the database.
|
||||
|
||||
```ts
|
||||
abstract renameTable(
|
||||
oldName,
|
||||
currentName,
|
||||
newName,
|
||||
namespacePath?): Promise<void>
|
||||
options?): Promise<void>
|
||||
```
|
||||
|
||||
Rename a table.
|
||||
|
||||
Currently only supported by LanceDB Cloud. Local OSS connections and
|
||||
namespace-backed connections (via [connectNamespace](../functions/connectNamespace.md)) reject with
|
||||
a "not supported" error.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **oldName**: `string`
|
||||
* **currentName**: `string`
|
||||
The current name of the table.
|
||||
|
||||
* **newName**: `string`
|
||||
The new name for the table.
|
||||
|
||||
* **namespacePath?**: `string`[]
|
||||
* **options?**: [`RenameTableOptions`](../interfaces/RenameTableOptions.md)
|
||||
Optional namespace paths. When
|
||||
`newNamespacePath` is omitted the table stays in `namespacePath`.
|
||||
|
||||
#### Returns
|
||||
|
||||
|
||||
@@ -343,6 +343,30 @@ This is useful for pagination.
|
||||
|
||||
***
|
||||
|
||||
### orderBy()
|
||||
|
||||
```ts
|
||||
orderBy(ordering): this
|
||||
```
|
||||
|
||||
Sort the results by the specified column(s).
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
|
||||
|
||||
#### Returns
|
||||
|
||||
`this`
|
||||
|
||||
This query builder.
|
||||
|
||||
#### Inherited from
|
||||
|
||||
`StandardQueryBase.orderBy`
|
||||
|
||||
***
|
||||
|
||||
### outputSchema()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -690,6 +690,74 @@ of the given query
|
||||
|
||||
***
|
||||
|
||||
### setLsmWriteSpec()
|
||||
|
||||
```ts
|
||||
abstract setLsmWriteSpec(spec): Promise<void>
|
||||
```
|
||||
|
||||
Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL
|
||||
LSM-style write path for future `mergeInsert` calls.
|
||||
|
||||
`LsmWriteSpec` chooses one of three sharding strategies via `specType`:
|
||||
|
||||
- `"bucket"` — hash-bucket writes by the single-column unenforced primary
|
||||
key (`column` and `numBuckets` required).
|
||||
- `"identity"` — shard by the raw value of a scalar `column`.
|
||||
- `"unsharded"` — route every write to a single shard.
|
||||
|
||||
All variants require the table to have an unenforced primary key
|
||||
([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally
|
||||
requires it to be the single column being bucketed.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md)
|
||||
The sharding spec to install.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 16,
|
||||
maintainedIndexes: ["id_idx"],
|
||||
});
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### setUnenforcedPrimaryKey()
|
||||
|
||||
```ts
|
||||
abstract setUnenforcedPrimaryKey(columns): Promise<void>
|
||||
```
|
||||
|
||||
Set the unenforced primary key for this table to a single column.
|
||||
|
||||
"Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
column is recorded in the schema as the primary key for use by features
|
||||
such as `merge_insert`. Only single-column primary keys are supported,
|
||||
and the key cannot be changed once set.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **columns**: `string` \| `string`[]
|
||||
The primary key column. A one-element
|
||||
array is also accepted; passing more than one column is rejected.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### stats()
|
||||
|
||||
```ts
|
||||
@@ -793,6 +861,23 @@ Return the table as an arrow table
|
||||
|
||||
***
|
||||
|
||||
### unsetLsmWriteSpec()
|
||||
|
||||
```ts
|
||||
abstract unsetLsmWriteSpec(): Promise<void>
|
||||
```
|
||||
|
||||
Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard
|
||||
`mergeInsert` write path.
|
||||
|
||||
Errors if no spec is currently set.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### update()
|
||||
|
||||
#### update(opts)
|
||||
|
||||
@@ -498,6 +498,30 @@ This is useful for pagination.
|
||||
|
||||
***
|
||||
|
||||
### orderBy()
|
||||
|
||||
```ts
|
||||
orderBy(ordering): this
|
||||
```
|
||||
|
||||
Sort the results by the specified column(s).
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
|
||||
|
||||
#### Returns
|
||||
|
||||
`this`
|
||||
|
||||
This query builder.
|
||||
|
||||
#### Inherited from
|
||||
|
||||
`StandardQueryBase.orderBy`
|
||||
|
||||
***
|
||||
|
||||
### outputSchema()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -51,6 +51,7 @@
|
||||
- [AlterColumnsResult](interfaces/AlterColumnsResult.md)
|
||||
- [ClientConfig](interfaces/ClientConfig.md)
|
||||
- [ColumnAlteration](interfaces/ColumnAlteration.md)
|
||||
- [ColumnOrdering](interfaces/ColumnOrdering.md)
|
||||
- [CompactionStats](interfaces/CompactionStats.md)
|
||||
- [ConnectNamespaceOptions](interfaces/ConnectNamespaceOptions.md)
|
||||
- [ConnectionOptions](interfaces/ConnectionOptions.md)
|
||||
@@ -79,12 +80,14 @@
|
||||
- [IvfRqOptions](interfaces/IvfRqOptions.md)
|
||||
- [ListNamespacesOptions](interfaces/ListNamespacesOptions.md)
|
||||
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
|
||||
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
|
||||
- [MergeResult](interfaces/MergeResult.md)
|
||||
- [OpenTableOptions](interfaces/OpenTableOptions.md)
|
||||
- [OptimizeOptions](interfaces/OptimizeOptions.md)
|
||||
- [OptimizeStats](interfaces/OptimizeStats.md)
|
||||
- [QueryExecutionOptions](interfaces/QueryExecutionOptions.md)
|
||||
- [RemovalStats](interfaces/RemovalStats.md)
|
||||
- [RenameTableOptions](interfaces/RenameTableOptions.md)
|
||||
- [RestNamespaceConfig](interfaces/RestNamespaceConfig.md)
|
||||
- [RetryConfig](interfaces/RetryConfig.md)
|
||||
- [ScannableOptions](interfaces/ScannableOptions.md)
|
||||
@@ -102,6 +105,7 @@
|
||||
- [UpdateResult](interfaces/UpdateResult.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
- [WriteExecutionOptions](interfaces/WriteExecutionOptions.md)
|
||||
- [WriteProgress](interfaces/WriteProgress.md)
|
||||
|
||||
## Type Aliases
|
||||
|
||||
|
||||
@@ -19,3 +19,39 @@ mode: "append" | "overwrite";
|
||||
If "append" (the default) then the new data will be added to the table
|
||||
|
||||
If "overwrite" then the new data will replace the existing data in the table.
|
||||
|
||||
***
|
||||
|
||||
### progress()
|
||||
|
||||
```ts
|
||||
progress: (progress) => void;
|
||||
```
|
||||
|
||||
Optional callback invoked periodically with write progress.
|
||||
|
||||
The callback is fired once per batch written and once more with
|
||||
`done: true` when the write completes. Calls are dispatched
|
||||
asynchronously to the JS event loop and never block the write — a slow
|
||||
callback will queue events rather than back-pressure the writer.
|
||||
|
||||
Errors thrown from the callback are logged with `console.warn` and
|
||||
swallowed — they do not abort the write.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **progress**: [`WriteProgress`](WriteProgress.md)
|
||||
|
||||
#### Returns
|
||||
|
||||
`void`
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
await table.add(data, {
|
||||
progress: (p) => {
|
||||
console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
31
docs/src/js/interfaces/ColumnOrdering.md
Normal file
31
docs/src/js/interfaces/ColumnOrdering.md
Normal file
@@ -0,0 +1,31 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / ColumnOrdering
|
||||
|
||||
# Interface: ColumnOrdering
|
||||
|
||||
## Properties
|
||||
|
||||
### ascending?
|
||||
|
||||
```ts
|
||||
optional ascending: boolean;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### columnName
|
||||
|
||||
```ts
|
||||
columnName: string;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### nullsFirst?
|
||||
|
||||
```ts
|
||||
optional nullsFirst: boolean;
|
||||
```
|
||||
64
docs/src/js/interfaces/LsmWriteSpec.md
Normal file
64
docs/src/js/interfaces/LsmWriteSpec.md
Normal file
@@ -0,0 +1,64 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / LsmWriteSpec
|
||||
|
||||
# Interface: LsmWriteSpec
|
||||
|
||||
Specification selecting Lance's MemWAL LSM-style write path for
|
||||
`mergeInsert`.
|
||||
|
||||
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
`column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
required.
|
||||
|
||||
## Properties
|
||||
|
||||
### column?
|
||||
|
||||
```ts
|
||||
optional column: string;
|
||||
```
|
||||
|
||||
Bucket and identity variants: the sharding column.
|
||||
|
||||
***
|
||||
|
||||
### maintainedIndexes?
|
||||
|
||||
```ts
|
||||
optional maintainedIndexes: string[];
|
||||
```
|
||||
|
||||
Names of indexes the MemWAL should keep up to date during writes.
|
||||
|
||||
***
|
||||
|
||||
### numBuckets?
|
||||
|
||||
```ts
|
||||
optional numBuckets: number;
|
||||
```
|
||||
|
||||
Bucket variant: the number of buckets, in `[1, 1024]`.
|
||||
|
||||
***
|
||||
|
||||
### specType
|
||||
|
||||
```ts
|
||||
specType: "bucket" | "identity" | "unsharded";
|
||||
```
|
||||
|
||||
One of `"bucket"`, `"identity"`, or `"unsharded"`.
|
||||
|
||||
***
|
||||
|
||||
### writerConfigDefaults?
|
||||
|
||||
```ts
|
||||
optional writerConfigDefaults: Record<string, string>;
|
||||
```
|
||||
|
||||
Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
29
docs/src/js/interfaces/RenameTableOptions.md
Normal file
29
docs/src/js/interfaces/RenameTableOptions.md
Normal file
@@ -0,0 +1,29 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / RenameTableOptions
|
||||
|
||||
# Interface: RenameTableOptions
|
||||
|
||||
## Properties
|
||||
|
||||
### namespacePath?
|
||||
|
||||
```ts
|
||||
optional namespacePath: string[];
|
||||
```
|
||||
|
||||
The namespace path of the table being renamed. Defaults to the root
|
||||
namespace (`[]`) when omitted.
|
||||
|
||||
***
|
||||
|
||||
### newNamespacePath?
|
||||
|
||||
```ts
|
||||
optional newNamespacePath: string[];
|
||||
```
|
||||
|
||||
The namespace path to move the table to as part of the rename. When
|
||||
omitted the table stays in `namespacePath`.
|
||||
84
docs/src/js/interfaces/WriteProgress.md
Normal file
84
docs/src/js/interfaces/WriteProgress.md
Normal file
@@ -0,0 +1,84 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / WriteProgress
|
||||
|
||||
# Interface: WriteProgress
|
||||
|
||||
Progress snapshot for a write operation, delivered to the `progress`
|
||||
callback passed to [Table.add](../classes/Table.md#add).
|
||||
|
||||
## Properties
|
||||
|
||||
### activeTasks
|
||||
|
||||
```ts
|
||||
activeTasks: number;
|
||||
```
|
||||
|
||||
Number of parallel write tasks currently in flight.
|
||||
|
||||
***
|
||||
|
||||
### done
|
||||
|
||||
```ts
|
||||
done: boolean;
|
||||
```
|
||||
|
||||
`true` for the final callback; `false` otherwise.
|
||||
|
||||
***
|
||||
|
||||
### elapsedSeconds
|
||||
|
||||
```ts
|
||||
elapsedSeconds: number;
|
||||
```
|
||||
|
||||
Wall-clock seconds since the write started.
|
||||
|
||||
***
|
||||
|
||||
### outputBytes
|
||||
|
||||
```ts
|
||||
outputBytes: number;
|
||||
```
|
||||
|
||||
Number of bytes written so far.
|
||||
|
||||
***
|
||||
|
||||
### outputRows
|
||||
|
||||
```ts
|
||||
outputRows: number;
|
||||
```
|
||||
|
||||
Number of rows written so far.
|
||||
|
||||
***
|
||||
|
||||
### totalRows?
|
||||
|
||||
```ts
|
||||
optional totalRows: number;
|
||||
```
|
||||
|
||||
Total rows expected, when the input source reports it.
|
||||
|
||||
Always set on the final callback (the one with `done: true`), falling
|
||||
back to the actual number of rows written when the source could not
|
||||
report a row count up front.
|
||||
|
||||
***
|
||||
|
||||
### totalTasks
|
||||
|
||||
```ts
|
||||
totalTasks: number;
|
||||
```
|
||||
|
||||
Total number of parallel write tasks (the write parallelism).
|
||||
@@ -166,6 +166,12 @@ lists the indices that LanceDb supports.
|
||||
|
||||
::: lancedb.index.IvfFlat
|
||||
|
||||
::: lancedb.index.IvfSq
|
||||
|
||||
::: lancedb.index.IvfRq
|
||||
|
||||
::: lancedb.index.HnswFlat
|
||||
|
||||
::: lancedb.table.IndexStatistics
|
||||
|
||||
## Querying (Asynchronous)
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.28.0-beta.11</version>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.28.0-beta.11</version>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.0.0-beta.7</lance-core.version>
|
||||
<lance-core.version>7.1.0-beta.2</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.28.0-beta.11"
|
||||
version = "0.30.0-beta.1"
|
||||
publish = false
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
|
||||
@@ -47,6 +47,14 @@ describe("given a connection", () => {
|
||||
await db.close();
|
||||
expect(db.isOpen()).toBe(false);
|
||||
await expect(db.tableNames()).rejects.toThrow("Connection is closed");
|
||||
await expect(db.renameTable("a", "b")).rejects.toThrow(
|
||||
"Connection is closed",
|
||||
);
|
||||
});
|
||||
|
||||
it("should report renameTable as unsupported on an OSS connection", async () => {
|
||||
await db.createTable("a", [{ id: 1 }]);
|
||||
await expect(db.renameTable("a", "b")).rejects.toThrow(/not supported/);
|
||||
});
|
||||
it("should be able to create a table from an object arg `createTable(options)`, or args `createTable(name, data, options)`", async () => {
|
||||
let tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
|
||||
@@ -81,16 +89,6 @@ describe("given a connection", () => {
|
||||
await db.createTable("test4", [{ id: 1 }, { id: 2 }]);
|
||||
});
|
||||
|
||||
it("should expose renameTable and reject on OSS listing DB", async () => {
|
||||
await db.createTable("old_name", [{ id: 1 }]);
|
||||
|
||||
await expect(db.renameTable("old_name", "new_name")).rejects.toThrow(
|
||||
"rename_table is not supported in LanceDB OSS",
|
||||
);
|
||||
|
||||
await expect(db.tableNames()).resolves.toEqual(["old_name"]);
|
||||
});
|
||||
|
||||
it("should fail if creating table twice, unless overwrite is true", async () => {
|
||||
let tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
|
||||
await expect(tbl.countRows()).resolves.toBe(2);
|
||||
|
||||
@@ -109,3 +109,209 @@ describe("Query outputSchema", () => {
|
||||
expect(schema.fields.length).toBe(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Query orderBy", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
let table: Table;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
const db = await connect(tmpDir.name);
|
||||
|
||||
// Create table with numeric data for sorting
|
||||
const schema = new Schema([
|
||||
new Field("id", new Int64(), true),
|
||||
new Field("score", new Float32(), true),
|
||||
new Field("name", new Utf8(), true),
|
||||
]);
|
||||
|
||||
const data = makeArrowTable(
|
||||
[
|
||||
{ id: 1n, score: 3.5, name: "charlie" },
|
||||
{ id: 2n, score: 1.2, name: "alice" },
|
||||
{ id: 3n, score: 2.8, name: "bob" },
|
||||
{ id: 4n, score: 0.5, name: "david" },
|
||||
{ id: 5n, score: 4.1, name: "eve" },
|
||||
],
|
||||
{ schema },
|
||||
);
|
||||
table = await db.createTable("test", data);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
tmpDir.removeCallback();
|
||||
});
|
||||
|
||||
it("should sort by single column ascending", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score", ascending: true, nullsFirst: false })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify ascending order
|
||||
expect(results[0].score).toBeCloseTo(0.5, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(1.2, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[3].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[4].score).toBeCloseTo(4.1, 0.001);
|
||||
});
|
||||
|
||||
it("should sort by single column descending", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score", ascending: false, nullsFirst: false })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify descending order
|
||||
expect(results[0].score).toBeCloseTo(4.1, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[3].score).toBeCloseTo(1.2, 0.001);
|
||||
expect(results[4].score).toBeCloseTo(0.5, 0.001);
|
||||
});
|
||||
|
||||
it("should use ascending as default direction", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score" })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify ascending order (default)
|
||||
expect(results[0].score).toBeCloseTo(0.5, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(1.2, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[3].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[4].score).toBeCloseTo(4.1, 0.001);
|
||||
});
|
||||
|
||||
it("should sort by string column", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "name" })
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify alphabetical order
|
||||
expect(results[0].name).toBe("alice");
|
||||
expect(results[1].name).toBe("bob");
|
||||
expect(results[2].name).toBe("charlie");
|
||||
expect(results[3].name).toBe("david");
|
||||
expect(results[4].name).toBe("eve");
|
||||
});
|
||||
|
||||
it("should support method chaining with where", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.where("score > 2.0")
|
||||
.orderBy({ columnName: "score" })
|
||||
.toArray();
|
||||
expect(results.length).toBe(3);
|
||||
// Verify filtered and sorted
|
||||
expect(results[0].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(4.1, 0.001);
|
||||
});
|
||||
|
||||
it("should support method chaining with limit", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score", ascending: false })
|
||||
.limit(3)
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(3);
|
||||
// Verify top 3 in descending order
|
||||
expect(results[0].score).toBeCloseTo(4.1, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
});
|
||||
|
||||
it("should support method chaining with offset", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "score" })
|
||||
.offset(2)
|
||||
.limit(2)
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(2);
|
||||
// Verify results skip first 2 and take next 2
|
||||
expect(results[0].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
});
|
||||
|
||||
it("should support method chaining with select", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.orderBy({ columnName: "name" })
|
||||
.select(["name", "score"])
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(5);
|
||||
// Verify only selected columns are present
|
||||
expect(Object.keys(results[0])).toEqual(["name", "score"]);
|
||||
expect(Object.keys(results[4])).toEqual(["name", "score"]);
|
||||
// Verify sorted by name
|
||||
expect(results[0].name).toBe("alice");
|
||||
expect(results[4].name).toBe("eve");
|
||||
});
|
||||
|
||||
it("should support complex method chaining", async () => {
|
||||
const results = await table
|
||||
.query()
|
||||
.where("score > 1.0")
|
||||
.orderBy({ columnName: "score", ascending: false })
|
||||
.limit(3)
|
||||
.select(["id", "score", "name"])
|
||||
.toArray();
|
||||
|
||||
expect(results.length).toBe(3);
|
||||
// Verify filtered, sorted, limited, and projected
|
||||
expect(results[0].score).toBeCloseTo(4.1, 0.001);
|
||||
expect(results[1].score).toBeCloseTo(3.5, 0.001);
|
||||
expect(results[2].score).toBeCloseTo(2.8, 0.001);
|
||||
expect(Object.keys(results[0])).toEqual(["id", "score", "name"]);
|
||||
});
|
||||
|
||||
it("should support multi-column ordering and null placement", async () => {
|
||||
const schema = new Schema([
|
||||
new Field("group", new Int64(), true),
|
||||
new Field("score", new Float32(), true),
|
||||
new Field("name", new Utf8(), true),
|
||||
]);
|
||||
|
||||
const data = makeArrowTable(
|
||||
[
|
||||
{ group: 1n, score: null, name: "z" },
|
||||
{ group: 1n, score: 1.0, name: "b" },
|
||||
{ group: 1n, score: 1.0, name: "a" },
|
||||
{ group: 2n, score: 0.5, name: "c" },
|
||||
],
|
||||
{ schema },
|
||||
);
|
||||
const nullTable = await (await connect(tmpDir.name)).createTable(
|
||||
"test_multi_order",
|
||||
data,
|
||||
{ mode: "overwrite" },
|
||||
);
|
||||
|
||||
const results = await nullTable
|
||||
.query()
|
||||
.orderBy([
|
||||
{ columnName: "group", ascending: true, nullsFirst: false },
|
||||
{ columnName: "score", ascending: true, nullsFirst: true },
|
||||
{ columnName: "name", ascending: true, nullsFirst: false },
|
||||
])
|
||||
.toArray();
|
||||
|
||||
expect(results.map((r) => [r.group, r.score, r.name])).toEqual([
|
||||
[1n, null, "z"],
|
||||
[1n, 1.0, "a"],
|
||||
[1n, 1.0, "b"],
|
||||
[2n, 0.5, "c"],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -617,4 +617,68 @@ describe("remote connection", () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("renameTable", () => {
|
||||
async function captureRenameRequest(
|
||||
call: (db: Connection) => Promise<void>,
|
||||
): Promise<{ url: string; body: Record<string, unknown> }> {
|
||||
let captured: { url: string; body: Record<string, unknown> } | undefined;
|
||||
await withMockDatabase((req, res) => {
|
||||
let raw = "";
|
||||
req.on("data", (chunk) => {
|
||||
raw += chunk;
|
||||
});
|
||||
req.on("end", () => {
|
||||
captured = {
|
||||
url: req.url ?? "",
|
||||
body: raw ? JSON.parse(raw) : {},
|
||||
};
|
||||
res.writeHead(200, { "Content-Type": "application/json" }).end("");
|
||||
});
|
||||
}, call);
|
||||
if (!captured) {
|
||||
throw new Error("mock server never saw a request");
|
||||
}
|
||||
return captured;
|
||||
}
|
||||
|
||||
it("sends rename request for a table in the root namespace", async () => {
|
||||
const { url, body } = await captureRenameRequest(async (db) => {
|
||||
await db.renameTable("table1", "table2");
|
||||
});
|
||||
expect(url).toBe("/v1/table/table1/rename/");
|
||||
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
|
||||
expect(body).toEqual({ new_table_name: "table2" });
|
||||
});
|
||||
|
||||
it("omits new_namespace when only the current namespace is supplied", async () => {
|
||||
// Safe-default check: passing namespacePath alone must not send
|
||||
// `new_namespace`, so the server keeps the table in its current
|
||||
// namespace instead of silently moving it to root.
|
||||
const { url, body } = await captureRenameRequest(async (db) => {
|
||||
await db.renameTable("table1", "table2", {
|
||||
namespacePath: ["ns1"],
|
||||
});
|
||||
});
|
||||
expect(url).toBe("/v1/table/ns1$table1/rename/");
|
||||
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
|
||||
expect(body).toEqual({ new_table_name: "table2" });
|
||||
});
|
||||
|
||||
it("includes new_namespace in the body for a cross-namespace rename", async () => {
|
||||
const { url, body } = await captureRenameRequest(async (db) => {
|
||||
await db.renameTable("table1", "table2", {
|
||||
namespacePath: ["ns1"],
|
||||
newNamespacePath: ["ns2"],
|
||||
});
|
||||
});
|
||||
expect(url).toBe("/v1/table/ns1$table1/rename/");
|
||||
expect(body).toEqual({
|
||||
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
|
||||
new_table_name: "table2",
|
||||
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
|
||||
new_namespace: ["ns2"],
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
List,
|
||||
Schema,
|
||||
SchemaLike,
|
||||
Struct,
|
||||
Type,
|
||||
Uint8,
|
||||
Utf8,
|
||||
@@ -115,6 +116,48 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
await expect(table.countRows()).resolves.toBe(1);
|
||||
});
|
||||
|
||||
it("should invoke the progress callback", async () => {
|
||||
const events: import("../lancedb").WriteProgress[] = [];
|
||||
await table.add([{ id: 1 }, { id: 2 }, { id: 3 }], {
|
||||
progress: (p) => events.push(p),
|
||||
});
|
||||
|
||||
expect(events.length).toBeGreaterThan(0);
|
||||
const last = events[events.length - 1];
|
||||
expect(last.done).toBe(true);
|
||||
// Earlier callbacks must have done=false.
|
||||
for (const ev of events.slice(0, -1)) {
|
||||
expect(ev.done).toBe(false);
|
||||
}
|
||||
// outputRows reflects the rows added in this call, not table size.
|
||||
expect(last.outputRows).toBe(3);
|
||||
// The input source (an array) reports a row count, so totalRows is set.
|
||||
expect(last.totalRows).toBe(3);
|
||||
// outputRows is monotonic.
|
||||
for (let i = 1; i < events.length; i++) {
|
||||
expect(events[i].outputRows).toBeGreaterThanOrEqual(
|
||||
events[i - 1].outputRows,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
it("should swallow errors thrown from the progress callback", async () => {
|
||||
const warn = jest
|
||||
.spyOn(console, "warn")
|
||||
.mockImplementation(() => undefined);
|
||||
try {
|
||||
const res = await table.add([{ id: 1 }, { id: 2 }], {
|
||||
progress: () => {
|
||||
throw new Error("callback bomb");
|
||||
},
|
||||
});
|
||||
expect(res.version).toBeGreaterThan(0);
|
||||
expect(warn).toHaveBeenCalled();
|
||||
} finally {
|
||||
warn.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("should let me close the table", async () => {
|
||||
expect(table.isOpen()).toBe(true);
|
||||
table.close();
|
||||
@@ -738,6 +781,113 @@ describe("When creating an index", () => {
|
||||
expect(indices2.length).toBe(0);
|
||||
});
|
||||
|
||||
it("should create and search a nested vector index", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const nestedSchema = new Schema([
|
||||
new Field("id", new Int32(), true),
|
||||
new Field(
|
||||
"image",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
const nestedTable = await db.createTable(
|
||||
"nested_vector",
|
||||
makeArrowTable(
|
||||
Array.from({ length: 300 }, (_, id) => ({
|
||||
id,
|
||||
image: { embedding: [id, id + 1] },
|
||||
})),
|
||||
{ schema: nestedSchema },
|
||||
),
|
||||
);
|
||||
|
||||
await nestedTable.createIndex("image.embedding", {
|
||||
name: "image_embedding_idx",
|
||||
});
|
||||
const indices = await nestedTable.listIndices();
|
||||
expect(indices).toContainEqual({
|
||||
name: "image_embedding_idx",
|
||||
indexType: "IvfPq",
|
||||
columns: ["image.embedding"],
|
||||
});
|
||||
|
||||
const explicit = await nestedTable
|
||||
.query()
|
||||
.nearestTo([0.0, 1.0])
|
||||
.column("image.embedding")
|
||||
.limit(1)
|
||||
.toArray();
|
||||
const inferred = await nestedTable
|
||||
.query()
|
||||
.nearestTo([0.0, 1.0])
|
||||
.limit(1)
|
||||
.toArray();
|
||||
expect(inferred[0].id).toEqual(explicit[0].id);
|
||||
});
|
||||
|
||||
it("should report multiple nested vector candidates", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const nestedSchema = new Schema([
|
||||
new Field(
|
||||
"image",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
new Field(
|
||||
"text",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
const nestedTable = await db.createTable(
|
||||
"multiple_nested_vectors",
|
||||
makeArrowTable(
|
||||
[
|
||||
{
|
||||
image: { embedding: [0.0, 1.0] },
|
||||
text: { embedding: [2.0, 3.0] },
|
||||
},
|
||||
],
|
||||
{ schema: nestedSchema },
|
||||
),
|
||||
);
|
||||
|
||||
await expect(
|
||||
nestedTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
|
||||
).rejects.toThrow(/image\.embedding.*text\.embedding/);
|
||||
});
|
||||
|
||||
it("should report when no default vector column exists", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const noVectorTable = await db.createTable(
|
||||
"no_vector",
|
||||
makeArrowTable([{ id: 0, label: "cat" }]),
|
||||
);
|
||||
|
||||
await expect(
|
||||
noVectorTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
|
||||
).rejects.toThrow(/No vector column/);
|
||||
});
|
||||
|
||||
it("should wait for index readiness", async () => {
|
||||
// Create an index and then wait for it to be ready
|
||||
await tbl.createIndex("vec");
|
||||
@@ -2348,3 +2498,130 @@ describe("when creating a table with Float32Array vectors", () => {
|
||||
expect((fsl.children[0].type as Float32).precision).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("setUnenforcedPrimaryKey", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
it("sets a single-column primary key (string or one-element array)", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const schema = new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
]);
|
||||
const t1 = await conn.createEmptyTable("t1", schema);
|
||||
await t1.setUnenforcedPrimaryKey("id");
|
||||
|
||||
const t2 = await conn.createEmptyTable("t2", schema);
|
||||
await t2.setUnenforcedPrimaryKey(["id"]);
|
||||
});
|
||||
|
||||
it("rejects a compound primary key", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
new arrow.Field("name", new arrow.Utf8(), false),
|
||||
]),
|
||||
);
|
||||
await expect(
|
||||
table.setUnenforcedPrimaryKey(["id", "name"]),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
|
||||
it("rejects changing the primary key once set", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Int64(), false),
|
||||
new arrow.Field("name", new arrow.Utf8(), false),
|
||||
]),
|
||||
);
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow();
|
||||
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
async function makeTable(conn: Connection): Promise<Table> {
|
||||
return await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]),
|
||||
);
|
||||
}
|
||||
|
||||
it("installs and removes a bucket spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 4,
|
||||
});
|
||||
await table.unsetLsmWriteSpec();
|
||||
// A second unset errors — there is no spec left to remove.
|
||||
await expect(table.unsetLsmWriteSpec()).rejects.toThrow();
|
||||
// A fresh spec can be installed after unset.
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 8,
|
||||
});
|
||||
});
|
||||
|
||||
it("installs an unsharded spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({ specType: "unsharded" });
|
||||
await table.unsetLsmWriteSpec();
|
||||
});
|
||||
|
||||
it("installs an identity spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
await table.setLsmWriteSpec({ specType: "identity", column: "id" });
|
||||
await table.unsetLsmWriteSpec();
|
||||
});
|
||||
|
||||
it("rejects an invalid spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await makeTable(conn);
|
||||
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
// num_buckets out of range.
|
||||
await expect(
|
||||
table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 0,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
// Column mismatch.
|
||||
await expect(
|
||||
table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "missing",
|
||||
numBuckets: 4,
|
||||
}),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -38,5 +38,14 @@ test("filtering examples", async () => {
|
||||
// --8<-- [start:sql_search]
|
||||
await tbl.query().where("id = 10").limit(10).toArray();
|
||||
// --8<-- [end:sql_search]
|
||||
|
||||
// --8<-- [start:orderby_search]
|
||||
await tbl
|
||||
.query()
|
||||
.where("id > 10")
|
||||
.orderBy({ columnName: "id", ascending: false })
|
||||
.limit(5)
|
||||
.toArray();
|
||||
// --8<-- [end:orderby_search]
|
||||
});
|
||||
});
|
||||
|
||||
@@ -144,6 +144,19 @@ export interface DropNamespaceOptions {
|
||||
behavior?: "restrict" | "cascade";
|
||||
}
|
||||
|
||||
export interface RenameTableOptions {
|
||||
/**
|
||||
* The namespace path of the table being renamed. Defaults to the root
|
||||
* namespace (`[]`) when omitted.
|
||||
*/
|
||||
namespacePath?: string[];
|
||||
/**
|
||||
* The namespace path to move the table to as part of the rename. When
|
||||
* omitted the table stays in `namespacePath`.
|
||||
*/
|
||||
newNamespacePath?: string[];
|
||||
}
|
||||
|
||||
/**
|
||||
* A LanceDB Connection that allows you to open tables and create new ones.
|
||||
*
|
||||
@@ -296,12 +309,6 @@ export abstract class Connection {
|
||||
*/
|
||||
abstract dropTable(name: string, namespacePath?: string[]): Promise<void>;
|
||||
|
||||
abstract renameTable(
|
||||
oldName: string,
|
||||
newName: string,
|
||||
namespacePath?: string[],
|
||||
): Promise<void>;
|
||||
|
||||
/**
|
||||
* Drop all tables in the database.
|
||||
* @param {string[]} namespacePath The namespace path to drop tables from (defaults to root namespace).
|
||||
@@ -397,6 +404,24 @@ export abstract class Connection {
|
||||
isShallow?: boolean;
|
||||
},
|
||||
): Promise<Table>;
|
||||
|
||||
/**
|
||||
* Rename a table.
|
||||
*
|
||||
* Currently only supported by LanceDB Cloud. Local OSS connections and
|
||||
* namespace-backed connections (via {@link connectNamespace}) reject with
|
||||
* a "not supported" error.
|
||||
*
|
||||
* @param {string} currentName - The current name of the table.
|
||||
* @param {string} newName - The new name for the table.
|
||||
* @param {RenameTableOptions} options - Optional namespace paths. When
|
||||
* `newNamespacePath` is omitted the table stays in `namespacePath`.
|
||||
*/
|
||||
abstract renameTable(
|
||||
currentName: string,
|
||||
newName: string,
|
||||
options?: RenameTableOptions,
|
||||
): Promise<void>;
|
||||
}
|
||||
|
||||
/** @hideconstructor */
|
||||
@@ -615,14 +640,6 @@ export class LocalConnection extends Connection {
|
||||
return this.inner.dropTable(name, namespacePath ?? []);
|
||||
}
|
||||
|
||||
async renameTable(
|
||||
oldName: string,
|
||||
newName: string,
|
||||
namespacePath?: string[],
|
||||
): Promise<void> {
|
||||
return this.inner.renameTable(oldName, newName, namespacePath ?? []);
|
||||
}
|
||||
|
||||
async dropAllTables(namespacePath?: string[]): Promise<void> {
|
||||
return this.inner.dropAllTables(namespacePath ?? []);
|
||||
}
|
||||
@@ -665,6 +682,19 @@ export class LocalConnection extends Connection {
|
||||
options?.behavior,
|
||||
);
|
||||
}
|
||||
|
||||
async renameTable(
|
||||
currentName: string,
|
||||
newName: string,
|
||||
options?: RenameTableOptions,
|
||||
): Promise<void> {
|
||||
return this.inner.renameTable(
|
||||
currentName,
|
||||
newName,
|
||||
options?.namespacePath ?? [],
|
||||
options?.newNamespacePath,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -71,6 +71,7 @@ export {
|
||||
CreateNamespaceResponse,
|
||||
DropNamespaceResponse,
|
||||
DescribeNamespaceResponse,
|
||||
RenameTableOptions,
|
||||
} from "./connection";
|
||||
|
||||
export { Session } from "./native.js";
|
||||
@@ -82,6 +83,7 @@ export {
|
||||
VectorQuery,
|
||||
TakeQuery,
|
||||
QueryExecutionOptions,
|
||||
ColumnOrdering,
|
||||
FullTextSearchOptions,
|
||||
RecordBatchIterator,
|
||||
FullTextQuery,
|
||||
@@ -112,6 +114,8 @@ export {
|
||||
UpdateOptions,
|
||||
OptimizeOptions,
|
||||
Version,
|
||||
WriteProgress,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
} from "./table";
|
||||
|
||||
|
||||
@@ -79,6 +79,12 @@ export interface QueryExecutionOptions {
|
||||
timeoutMs?: number;
|
||||
}
|
||||
|
||||
export interface ColumnOrdering {
|
||||
columnName: string;
|
||||
ascending?: boolean;
|
||||
nullsFirst?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Options that control the behavior of a full text search
|
||||
*/
|
||||
@@ -417,6 +423,21 @@ export class StandardQueryBase<
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort the results by the specified column(s).
|
||||
* @returns This query builder.
|
||||
*/
|
||||
orderBy(ordering: ColumnOrdering | ColumnOrdering[]): this {
|
||||
const orderings = Array.isArray(ordering) ? ordering : [ordering];
|
||||
const normalized = orderings.map((o) => ({
|
||||
columnName: o.columnName,
|
||||
ascending: o.ascending ?? true,
|
||||
nullsFirst: o.nullsFirst ?? false,
|
||||
}));
|
||||
this.doCall((inner) => inner.orderBy(normalized));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip searching un-indexed data. This can make search faster, but will miss
|
||||
* any data that is not yet indexed.
|
||||
|
||||
@@ -46,6 +46,33 @@ import { sanitizeType } from "./sanitize";
|
||||
import { IntoSql, toSQL } from "./util";
|
||||
export { IndexConfig } from "./native";
|
||||
|
||||
/**
|
||||
* Progress snapshot for a write operation, delivered to the `progress`
|
||||
* callback passed to {@link Table.add}.
|
||||
*/
|
||||
export interface WriteProgress {
|
||||
/** Number of rows written so far. */
|
||||
outputRows: number;
|
||||
/** Number of bytes written so far. */
|
||||
outputBytes: number;
|
||||
/**
|
||||
* Total rows expected, when the input source reports it.
|
||||
*
|
||||
* Always set on the final callback (the one with `done: true`), falling
|
||||
* back to the actual number of rows written when the source could not
|
||||
* report a row count up front.
|
||||
*/
|
||||
totalRows?: number;
|
||||
/** Wall-clock seconds since the write started. */
|
||||
elapsedSeconds: number;
|
||||
/** Number of parallel write tasks currently in flight. */
|
||||
activeTasks: number;
|
||||
/** Total number of parallel write tasks (the write parallelism). */
|
||||
totalTasks: number;
|
||||
/** `true` for the final callback; `false` otherwise. */
|
||||
done: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for adding data to a table.
|
||||
*/
|
||||
@@ -56,6 +83,28 @@ export interface AddDataOptions {
|
||||
* If "overwrite" then the new data will replace the existing data in the table.
|
||||
*/
|
||||
mode: "append" | "overwrite";
|
||||
|
||||
/**
|
||||
* Optional callback invoked periodically with write progress.
|
||||
*
|
||||
* The callback is fired once per batch written and once more with
|
||||
* `done: true` when the write completes. Calls are dispatched
|
||||
* asynchronously to the JS event loop and never block the write — a slow
|
||||
* callback will queue events rather than back-pressure the writer.
|
||||
*
|
||||
* Errors thrown from the callback are logged with `console.warn` and
|
||||
* swallowed — they do not abort the write.
|
||||
*
|
||||
* @example
|
||||
* ```ts
|
||||
* await table.add(data, {
|
||||
* progress: (p) => {
|
||||
* console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
|
||||
* },
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
progress: (progress: WriteProgress) => void;
|
||||
}
|
||||
|
||||
export interface UpdateOptions {
|
||||
@@ -106,6 +155,27 @@ export interface Version {
|
||||
metadata: Record<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specification selecting Lance's MemWAL LSM-style write path for
|
||||
* `mergeInsert`.
|
||||
*
|
||||
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
* `column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
* required.
|
||||
*/
|
||||
export interface LsmWriteSpec {
|
||||
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
|
||||
specType: "bucket" | "identity" | "unsharded";
|
||||
/** Bucket and identity variants: the sharding column. */
|
||||
column?: string;
|
||||
/** Bucket variant: the number of buckets, in `[1, 1024]`. */
|
||||
numBuckets?: number;
|
||||
/** Names of indexes the MemWAL should keep up to date during writes. */
|
||||
maintainedIndexes?: string[];
|
||||
/** Default `ShardWriter` configuration recorded in the MemWAL index. */
|
||||
writerConfigDefaults?: Record<string, string>;
|
||||
}
|
||||
|
||||
/**
|
||||
* A Table is a collection of Records in a LanceDB Database.
|
||||
*
|
||||
@@ -449,6 +519,54 @@ export abstract class Table {
|
||||
* containing the new version number of the table after dropping the columns.
|
||||
*/
|
||||
abstract dropColumns(columnNames: string[]): Promise<DropColumnsResult>;
|
||||
/**
|
||||
* Set the unenforced primary key for this table to a single column.
|
||||
*
|
||||
* "Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
* column is recorded in the schema as the primary key for use by features
|
||||
* such as `merge_insert`. Only single-column primary keys are supported,
|
||||
* and the key cannot be changed once set.
|
||||
* @param {string | string[]} columns The primary key column. A one-element
|
||||
* array is also accepted; passing more than one column is rejected.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
|
||||
/**
|
||||
* Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL
|
||||
* LSM-style write path for future `mergeInsert` calls.
|
||||
*
|
||||
* `LsmWriteSpec` chooses one of three sharding strategies via `specType`:
|
||||
*
|
||||
* - `"bucket"` — hash-bucket writes by the single-column unenforced primary
|
||||
* key (`column` and `numBuckets` required).
|
||||
* - `"identity"` — shard by the raw value of a scalar `column`.
|
||||
* - `"unsharded"` — route every write to a single shard.
|
||||
*
|
||||
* All variants require the table to have an unenforced primary key
|
||||
* ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally
|
||||
* requires it to be the single column being bucketed.
|
||||
* @param {LsmWriteSpec} spec The sharding spec to install.
|
||||
* @returns {Promise<void>}
|
||||
* @example
|
||||
* ```ts
|
||||
* await table.setUnenforcedPrimaryKey("id");
|
||||
* await table.setLsmWriteSpec({
|
||||
* specType: "bucket",
|
||||
* column: "id",
|
||||
* numBuckets: 16,
|
||||
* maintainedIndexes: ["id_idx"],
|
||||
* });
|
||||
* ```
|
||||
*/
|
||||
abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise<void>;
|
||||
/**
|
||||
* Remove the {@link LsmWriteSpec} from this table, reverting to the standard
|
||||
* `mergeInsert` write path.
|
||||
*
|
||||
* Errors if no spec is currently set.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract unsetLsmWriteSpec(): Promise<void>;
|
||||
/** Retrieve the version of the table */
|
||||
|
||||
abstract version(): Promise<number>;
|
||||
@@ -636,7 +754,20 @@ export class LocalTable extends Table {
|
||||
const schema = await this.schema();
|
||||
|
||||
const buffer = await fromDataToBuffer(data, undefined, schema);
|
||||
return await this.inner.add(buffer, mode);
|
||||
// Wrap the user callback so a thrown error doesn't surface as an
|
||||
// unhandled exception (the callback fires from a napi threadsafe
|
||||
// function — exceptions there crash the process).
|
||||
const userProgress = options?.progress;
|
||||
const progress = userProgress
|
||||
? (p: WriteProgress) => {
|
||||
try {
|
||||
userProgress(p);
|
||||
} catch (e) {
|
||||
console.warn("Table.add progress callback threw:", e);
|
||||
}
|
||||
}
|
||||
: undefined;
|
||||
return await this.inner.add(buffer, mode, progress);
|
||||
}
|
||||
|
||||
async update(
|
||||
@@ -897,6 +1028,19 @@ export class LocalTable extends Table {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
|
||||
async setUnenforcedPrimaryKey(columns: string | string[]): Promise<void> {
|
||||
const cols = typeof columns === "string" ? [columns] : columns;
|
||||
return await this.inner.setUnenforcedPrimaryKey(cols);
|
||||
}
|
||||
|
||||
async setLsmWriteSpec(spec: LsmWriteSpec): Promise<void> {
|
||||
return await this.inner.setLsmWriteSpec(spec);
|
||||
}
|
||||
|
||||
async unsetLsmWriteSpec(): Promise<void> {
|
||||
return await this.inner.unsetLsmWriteSpec();
|
||||
}
|
||||
|
||||
async version(): Promise<number> {
|
||||
return await this.inner.version();
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
11029
nodejs/package-lock.json
generated
Normal file
11029
nodejs/package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.28.0-beta.11",
|
||||
"version": "0.30.0-beta.1",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -328,20 +328,6 @@ impl Connection {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn rename_table(
|
||||
&self,
|
||||
old_name: String,
|
||||
new_name: String,
|
||||
namespace_path: Option<Vec<String>>,
|
||||
) -> napi::Result<()> {
|
||||
let ns = namespace_path.unwrap_or_default();
|
||||
self.get_inner()?
|
||||
.rename_table(&old_name, &new_name, &ns, &ns)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn drop_all_tables(&self, namespace_path: Option<Vec<String>>) -> napi::Result<()> {
|
||||
let ns = namespace_path.unwrap_or_default();
|
||||
@@ -473,4 +459,23 @@ impl Connection {
|
||||
transaction_id: resp.transaction_id,
|
||||
})
|
||||
}
|
||||
|
||||
/// Rename a table. `current_namespace_path` and `new_namespace_path` default to
|
||||
/// the root namespace when omitted; the caller is expected to either pass both
|
||||
/// or pass neither.
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn rename_table(
|
||||
&self,
|
||||
current_name: String,
|
||||
new_name: String,
|
||||
current_namespace_path: Option<Vec<String>>,
|
||||
new_namespace_path: Option<Vec<String>>,
|
||||
) -> napi::Result<()> {
|
||||
let cur_ns = current_namespace_path.unwrap_or_default();
|
||||
let new_ns = new_namespace_path.unwrap_or_default();
|
||||
self.get_inner()?
|
||||
.rename_table(¤t_name, &new_name, &cur_ns, &new_ns)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,12 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::NapiErrorExt;
|
||||
use crate::error::convert_error;
|
||||
use crate::iterator::RecordBatchIterator;
|
||||
use crate::rerankers::RerankHybridCallbackArgs;
|
||||
use crate::rerankers::Reranker;
|
||||
use crate::util::{parse_distance_type, schema_to_buffer};
|
||||
use arrow_array::{
|
||||
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
|
||||
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
|
||||
@@ -19,16 +25,27 @@ use lancedb::query::QueryBase;
|
||||
use lancedb::query::QueryExecutionOptions;
|
||||
use lancedb::query::Select;
|
||||
use lancedb::query::TakeQuery as LanceDbTakeQuery;
|
||||
use lancedb::query::VectorQuery as LanceDbVectorQuery;
|
||||
use lancedb::query::{ColumnOrdering as LanceDbColumnOrdering, VectorQuery as LanceDbVectorQuery};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
|
||||
use crate::error::NapiErrorExt;
|
||||
use crate::error::convert_error;
|
||||
use crate::iterator::RecordBatchIterator;
|
||||
use crate::rerankers::RerankHybridCallbackArgs;
|
||||
use crate::rerankers::Reranker;
|
||||
use crate::util::{parse_distance_type, schema_to_buffer};
|
||||
#[napi(object)]
|
||||
pub struct ColumnOrdering {
|
||||
pub ascending: bool,
|
||||
pub nulls_first: bool,
|
||||
pub column_name: String,
|
||||
}
|
||||
|
||||
impl From<ColumnOrdering> for LanceDbColumnOrdering {
|
||||
fn from(value: ColumnOrdering) -> Self {
|
||||
match (value.ascending, value.nulls_first) {
|
||||
(true, true) => Self::asc_nulls_first(value.column_name),
|
||||
(true, false) => Self::asc_nulls_last(value.column_name),
|
||||
(false, true) => Self::desc_nulls_first(value.column_name),
|
||||
(false, false) => Self::desc_nulls_last(value.column_name),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
|
||||
let buf = arrow_buffer::Buffer::from(data.to_vec());
|
||||
@@ -128,6 +145,18 @@ impl Query {
|
||||
self.inner = self.inner.clone().with_row_id();
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
|
||||
let ordering = ordering.map(|ordering| {
|
||||
ordering
|
||||
.into_iter()
|
||||
.map(LanceDbColumnOrdering::from)
|
||||
.collect()
|
||||
});
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn output_schema(&self) -> napi::Result<Buffer> {
|
||||
let schema = self.inner.output_schema().await.default_error()?;
|
||||
@@ -328,6 +357,18 @@ impl VectorQuery {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
|
||||
let ordering = ordering.map(|ordering| {
|
||||
ordering
|
||||
.into_iter()
|
||||
.map(LanceDbColumnOrdering::from)
|
||||
.collect()
|
||||
});
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn output_schema(&self) -> napi::Result<Buffer> {
|
||||
let schema = self.inner.output_schema().await.default_error()?;
|
||||
|
||||
@@ -9,6 +9,7 @@ use lancedb::table::{
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
|
||||
use napi_derive::napi;
|
||||
|
||||
use crate::error::NapiErrorExt;
|
||||
@@ -67,8 +68,16 @@ impl Table {
|
||||
schema_to_buffer(&schema)
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn add(&self, buf: Buffer, mode: String) -> napi::Result<AddResult> {
|
||||
#[napi(
|
||||
catch_unwind,
|
||||
ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void"
|
||||
)]
|
||||
pub async fn add(
|
||||
&self,
|
||||
buf: Buffer,
|
||||
mode: String,
|
||||
progress_callback: Option<ProgressFn>,
|
||||
) -> napi::Result<AddResult> {
|
||||
let batches = ipc_file_to_batches(buf.to_vec())
|
||||
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
|
||||
let batches = batches
|
||||
@@ -92,6 +101,19 @@ impl Table {
|
||||
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
|
||||
};
|
||||
|
||||
if let Some(tsfn) = progress_callback {
|
||||
op = op.progress(move |p| {
|
||||
// NonBlocking: dispatch onto the JS event loop without
|
||||
// blocking the writer thread. With napi-rs's default
|
||||
// unbounded queue, events are not dropped — a slow JS
|
||||
// callback will just queue them.
|
||||
tsfn.call(
|
||||
WriteProgressInfo::from(p),
|
||||
ThreadsafeFunctionCallMode::NonBlocking,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
let res = op.execute().await.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
@@ -344,6 +366,31 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn set_unenforced_primary_key(&self, columns: Vec<String>) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.set_unenforced_primary_key(columns)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> {
|
||||
let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?;
|
||||
self.inner_ref()?
|
||||
.set_lsm_write_spec(native_spec)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.unset_lsm_write_spec()
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn version(&self) -> napi::Result<i64> {
|
||||
self.inner_ref()?
|
||||
@@ -538,6 +585,63 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `mergeInsert`.
|
||||
///
|
||||
/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For
|
||||
/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`,
|
||||
/// `column` is required.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LsmWriteSpec {
|
||||
/// One of `"bucket"`, `"identity"`, or `"unsharded"`.
|
||||
pub spec_type: String,
|
||||
/// Bucket and identity variants: the sharding column.
|
||||
pub column: Option<String>,
|
||||
/// Bucket variant: the number of buckets, in `[1, 1024]`.
|
||||
pub num_buckets: Option<u32>,
|
||||
/// Names of indexes the MemWAL should keep up to date during writes.
|
||||
pub maintained_indexes: Option<Vec<String>>,
|
||||
/// Default `ShardWriter` configuration recorded in the MemWAL index.
|
||||
pub writer_config_defaults: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
impl TryFrom<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
|
||||
type Error = napi::Error;
|
||||
|
||||
fn try_from(value: LsmWriteSpec) -> napi::Result<Self> {
|
||||
let maintained = value.maintained_indexes.unwrap_or_default();
|
||||
let writer_config_defaults = value.writer_config_defaults.unwrap_or_default();
|
||||
let spec = match value.spec_type.as_str() {
|
||||
"bucket" => {
|
||||
let column = value.column.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec bucket requires `column`")
|
||||
})?;
|
||||
let num_buckets = value.num_buckets.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`")
|
||||
})?;
|
||||
Self::bucket(column, num_buckets)
|
||||
}
|
||||
"identity" => {
|
||||
let column = value.column.ok_or_else(|| {
|
||||
napi::Error::from_reason("LsmWriteSpec identity requires `column`")
|
||||
})?;
|
||||
Self::identity(column)
|
||||
}
|
||||
"unsharded" => Self::unsharded(),
|
||||
other => {
|
||||
return Err(napi::Error::from_reason(format!(
|
||||
"LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'",
|
||||
other
|
||||
)));
|
||||
}
|
||||
};
|
||||
Ok(spec
|
||||
.with_maintained_indexes(maintained)
|
||||
.with_writer_config_defaults(writer_config_defaults))
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics about a compaction operation.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -572,6 +676,44 @@ pub struct OptimizeStats {
|
||||
pub prune: RemovalStats,
|
||||
}
|
||||
|
||||
/// Progress snapshot for a write operation, delivered to the JS callback
|
||||
/// passed to `Table.add`.
|
||||
#[napi(object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WriteProgressInfo {
|
||||
/// Number of rows written so far.
|
||||
pub output_rows: i64,
|
||||
/// Number of bytes written so far.
|
||||
pub output_bytes: i64,
|
||||
/// Total rows expected, if the input source reports it.
|
||||
/// Always set on the final callback (where `done` is `true`).
|
||||
pub total_rows: Option<i64>,
|
||||
/// Wall-clock seconds since monitoring started.
|
||||
pub elapsed_seconds: f64,
|
||||
/// Number of parallel write tasks currently in flight.
|
||||
pub active_tasks: i64,
|
||||
/// Total number of parallel write tasks (the write parallelism).
|
||||
pub total_tasks: i64,
|
||||
/// `true` for the final callback; `false` otherwise.
|
||||
pub done: bool,
|
||||
}
|
||||
|
||||
impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo {
|
||||
fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self {
|
||||
Self {
|
||||
output_rows: p.output_rows() as i64,
|
||||
output_bytes: p.output_bytes() as i64,
|
||||
total_rows: p.total_rows().map(|n| n as i64),
|
||||
elapsed_seconds: p.elapsed().as_secs_f64(),
|
||||
active_tasks: p.active_tasks() as i64,
|
||||
total_tasks: p.total_tasks() as i64,
|
||||
done: p.done(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ProgressFn = ThreadsafeFunction<WriteProgressInfo, (), WriteProgressInfo, Status, false>;
|
||||
|
||||
/// A definition of a column alteration. The alteration changes the column at
|
||||
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
|
||||
/// and to have the data type `data_type`. At least one of `rename` or `nullable`
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.31.0-beta.11"
|
||||
current_version = "0.33.0-beta.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -4,16 +4,26 @@ code is in the `src/` directory and the Python bindings are in the `lancedb/` di
|
||||
|
||||
Common commands:
|
||||
|
||||
* Bootstrap dev env: `uv run --extra tests --extra dev maturin develop --extras tests,dev`
|
||||
* Build: `make develop`
|
||||
* Format: `make format`
|
||||
* Lint: `make check`
|
||||
* Fix lints: `make fix`
|
||||
* Test: `make test`
|
||||
* Doc test: `make doctest`
|
||||
* Test: `uv run --extra tests pytest python/tests -vv --durations=10 -m "not slow and not s3_test"`
|
||||
* Run specific test: `uv run --extra tests pytest python/tests/<test_file>.py::<test_name> -q`
|
||||
* Doc test: `uv run --extra tests pytest --doctest-modules python/lancedb`
|
||||
|
||||
Use the uv-managed environment declared by `uv.lock` for Python validation. Do
|
||||
not treat system `python`, global `pytest`, or missing editable-install errors
|
||||
as final blockers; bootstrap or enter the uv environment instead. `make test`
|
||||
and `make doctest` assume the development environment is already prepared.
|
||||
|
||||
Before committing changes, run lints and then formatting.
|
||||
|
||||
When you change the Rust code, you will need to recompile the Python bindings: `make develop`.
|
||||
When you change the Rust code, PyO3 binding code, or see a missing/stale
|
||||
`lancedb._lancedb`, recompile the Python bindings with
|
||||
`uv run --extra tests --extra dev maturin develop --extras tests,dev` before
|
||||
running tests.
|
||||
|
||||
When you export new types from Rust to Python, you must manually update `python/lancedb/_lancedb.pyi`
|
||||
with the corresponding type hints. You can run `pyright` to check for type errors in the Python code.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.31.0-beta.11"
|
||||
version = "0.33.0-beta.1"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
|
||||
@@ -147,6 +147,13 @@ def connect(
|
||||
>>> db = lancedb.connect("s3://my-bucket/lancedb",
|
||||
... storage_options={"aws_access_key_id": "***"})
|
||||
|
||||
For tests and temporary data, use an in-memory database:
|
||||
|
||||
>>> db = lancedb.connect("memory://")
|
||||
|
||||
In-memory databases are not persisted. Tables are dropped when the last
|
||||
connection or table handle referencing them is closed.
|
||||
|
||||
Connect to LanceDB cloud:
|
||||
|
||||
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
|
||||
@@ -378,6 +385,8 @@ async def connect_async(
|
||||
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
|
||||
... storage_options={
|
||||
... "aws_access_key_id": "***"})
|
||||
... # For tests and temporary data, use an in-memory database
|
||||
... db = await lancedb.connect_async("memory://")
|
||||
... # Connect to LanceDB cloud
|
||||
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
|
||||
... client_config={
|
||||
|
||||
@@ -217,6 +217,9 @@ class Table:
|
||||
async def uri(self) -> str: ...
|
||||
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
|
||||
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
|
||||
async def unset_lsm_write_spec(self) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
@@ -255,6 +258,11 @@ class RecordBatchStream:
|
||||
def __aiter__(self) -> "RecordBatchStream": ...
|
||||
async def __anext__(self) -> pa.RecordBatch: ...
|
||||
|
||||
class ColumnOrdering(TypedDict):
|
||||
column_name: str
|
||||
ascending: bool
|
||||
nulls_first: bool
|
||||
|
||||
class Query:
|
||||
def where(self, filter: str): ...
|
||||
def where_expr(self, expr: PyExpr): ...
|
||||
@@ -268,6 +276,7 @@ class Query:
|
||||
def postfilter(self): ...
|
||||
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
|
||||
def nearest_to_text(self, query: dict) -> FTSQuery: ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
async def output_schema(self) -> pa.Schema: ...
|
||||
async def execute(
|
||||
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
|
||||
@@ -296,6 +305,7 @@ class FTSQuery:
|
||||
def get_query(self) -> str: ...
|
||||
def add_query_vector(self, query_vec: pa.Array) -> None: ...
|
||||
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
async def output_schema(self) -> pa.Schema: ...
|
||||
async def execute(
|
||||
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
|
||||
@@ -321,6 +331,7 @@ class VectorQuery:
|
||||
def maximum_nprobes(self, maximum_nprobes: int): ...
|
||||
def bypass_vector_index(self): ...
|
||||
def nearest_to_text(self, query: dict) -> HybridQuery: ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
def to_query_request(self) -> PyQueryRequest: ...
|
||||
|
||||
class HybridQuery:
|
||||
@@ -339,6 +350,7 @@ class HybridQuery:
|
||||
def minimum_nprobes(self, minimum_nprobes: int): ...
|
||||
def maximum_nprobes(self, maximum_nprobes: int): ...
|
||||
def bypass_vector_index(self): ...
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
|
||||
def to_vector_query(self) -> VectorQuery: ...
|
||||
def to_fts_query(self) -> FTSQuery: ...
|
||||
def get_limit(self) -> int: ...
|
||||
@@ -368,6 +380,7 @@ class PyQueryRequest:
|
||||
bypass_vector_index: Optional[bool]
|
||||
postfilter: Optional[bool]
|
||||
norm: Optional[str]
|
||||
order_by: Optional[List[ColumnOrdering]]
|
||||
|
||||
class CompactionStats:
|
||||
fragments_removed: int
|
||||
@@ -408,6 +421,37 @@ class MergeResult:
|
||||
num_deleted_rows: int
|
||||
num_attempts: int
|
||||
|
||||
class LsmWriteSpec:
|
||||
"""Specification selecting Lance's MemWAL LSM-style write path for
|
||||
`merge_insert`."""
|
||||
|
||||
@staticmethod
|
||||
def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ...
|
||||
@staticmethod
|
||||
def identity(column: str) -> "LsmWriteSpec": ...
|
||||
@staticmethod
|
||||
def unsharded() -> "LsmWriteSpec": ...
|
||||
def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec":
|
||||
"""Return a copy of this spec asking the MemWAL to keep the named
|
||||
indexes up to date as rows are appended."""
|
||||
...
|
||||
def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec":
|
||||
"""Return a copy of this spec recording the given default
|
||||
`ShardWriter` configuration in the MemWAL index."""
|
||||
...
|
||||
@property
|
||||
def spec_type(self) -> str:
|
||||
"""One of 'bucket', 'identity', or 'unsharded'."""
|
||||
...
|
||||
@property
|
||||
def column(self) -> Optional[str]: ...
|
||||
@property
|
||||
def num_buckets(self) -> Optional[int]: ...
|
||||
@property
|
||||
def maintained_indexes(self) -> List[str]: ...
|
||||
@property
|
||||
def writer_config_defaults(self) -> Dict[str, str]: ...
|
||||
|
||||
class AddColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -8,7 +8,17 @@ from abc import abstractmethod
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterable,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
if sys.version_info >= (3, 12):
|
||||
from typing import override
|
||||
@@ -313,7 +323,7 @@ class DBConnection(EnforceOverrides):
|
||||
>>> data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
|
||||
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
|
||||
>>> db.create_table("my_table", data)
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> db["my_table"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -334,7 +344,7 @@ class DBConnection(EnforceOverrides):
|
||||
... "long": [-122.7, -74.1]
|
||||
... })
|
||||
>>> db.create_table("table2", data)
|
||||
LanceTable(name='table2', version=1, ...)
|
||||
LanceTable(name='table2', ...)
|
||||
>>> db["table2"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -357,7 +367,7 @@ class DBConnection(EnforceOverrides):
|
||||
... pa.field("long", pa.float32())
|
||||
... ])
|
||||
>>> db.create_table("table3", data, schema = custom_schema)
|
||||
LanceTable(name='table3', version=1, ...)
|
||||
LanceTable(name='table3', ...)
|
||||
>>> db["table3"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -391,7 +401,7 @@ class DBConnection(EnforceOverrides):
|
||||
... pa.field("price", pa.float32()),
|
||||
... ])
|
||||
>>> db.create_table("table4", make_batches(), schema=schema)
|
||||
LanceTable(name='table4', version=1, ...)
|
||||
LanceTable(name='table4', ...)
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
@@ -568,15 +578,15 @@ class LanceDBConnection(DBConnection):
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> db.create_table("my_table", data=[{"vector": [1.1, 1.2], "b": 2},
|
||||
... {"vector": [0.5, 1.3], "b": 4}])
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> db.create_table("another_table", data=[{"vector": [0.4, 0.4], "b": 6}])
|
||||
LanceTable(name='another_table', version=1, ...)
|
||||
LanceTable(name='another_table', ...)
|
||||
>>> sorted(db.table_names())
|
||||
['another_table', 'my_table']
|
||||
>>> len(db)
|
||||
2
|
||||
>>> db["my_table"]
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> "my_table" in db
|
||||
True
|
||||
>>> db.drop_table("my_table")
|
||||
@@ -847,11 +857,20 @@ class LanceDBConnection(DBConnection):
|
||||
)
|
||||
)
|
||||
|
||||
def _all_table_names(self) -> Generator[str, None, None]:
|
||||
page_token = None
|
||||
while True:
|
||||
response = self.list_tables(page_token=page_token)
|
||||
yield from response.tables
|
||||
page_token = response.page_token
|
||||
if not page_token:
|
||||
return
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.table_names())
|
||||
return sum(1 for _ in self._all_table_names())
|
||||
|
||||
def __contains__(self, name: str) -> bool:
|
||||
return name in self.table_names()
|
||||
return name in self._all_table_names()
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
|
||||
@@ -968,22 +968,32 @@ class Permutation:
|
||||
new.transform_fn = transform
|
||||
return new
|
||||
|
||||
def take_offsets(self, offsets: list[int]) -> Any:
|
||||
"""
|
||||
Take rows from the permutation by offset
|
||||
|
||||
The returned value is passed through the permutation's current transform,
|
||||
so `with_format` and `with_transform` affect this method in the same way
|
||||
they affect iteration.
|
||||
"""
|
||||
|
||||
async def do_take_offsets():
|
||||
return await self.reader.take_offsets(offsets, selection=self.selection)
|
||||
|
||||
batch = LOOP.run(do_take_offsets())
|
||||
return self.transform_fn(batch)
|
||||
|
||||
def __getitem__(self, index: int) -> Any:
|
||||
"""
|
||||
Returns a single row from the permutation by offset
|
||||
"""
|
||||
return self.__getitems__([index])
|
||||
return self.take_offsets([index])
|
||||
|
||||
def __getitems__(self, indices: list[int]) -> Any:
|
||||
"""
|
||||
Returns rows from the permutation by offset
|
||||
"""
|
||||
|
||||
async def do_getitems():
|
||||
return await self.reader.take_offsets(indices, selection=self.selection)
|
||||
|
||||
batch = LOOP.run(do_getitems())
|
||||
return self.transform_fn(batch)
|
||||
return self.take_offsets(indices)
|
||||
|
||||
@deprecated(details="Use with_skip instead")
|
||||
def skip(self, skip: int) -> "Permutation":
|
||||
|
||||
@@ -3,12 +3,14 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from datetime import timedelta
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
@@ -17,41 +19,40 @@ from typing import (
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
Any,
|
||||
)
|
||||
|
||||
import asyncio
|
||||
import deprecation
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
import pyarrow.compute as pc
|
||||
import pydantic
|
||||
from typing_extensions import Annotated
|
||||
|
||||
from lancedb.pydantic import PYDANTIC_VERSION
|
||||
from lancedb._lancedb import fts_query_to_json
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.pydantic import PYDANTIC_VERSION
|
||||
|
||||
from . import __version__
|
||||
from .arrow import AsyncRecordBatchReader
|
||||
from .dependencies import pandas as pd
|
||||
from .expr import Expr
|
||||
from .rerankers.base import Reranker
|
||||
from .rerankers.rrf import RRFReranker
|
||||
from .rerankers.util import check_reranker_result
|
||||
from .util import flatten_columns
|
||||
from .expr import Expr
|
||||
from lancedb._lancedb import fts_query_to_json
|
||||
from typing_extensions import Annotated
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import sys
|
||||
|
||||
import PIL
|
||||
import polars as pl
|
||||
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import FTSQuery as LanceFTSQuery
|
||||
from ._lancedb import HybridQuery as LanceHybridQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from ._lancedb import TakeQuery as LanceTakeQuery
|
||||
from ._lancedb import PyQueryRequest
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import TakeQuery as LanceTakeQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from .common import VEC
|
||||
from .pydantic import LanceModel
|
||||
from .table import Table
|
||||
@@ -92,6 +93,12 @@ def ensure_vector_query(
|
||||
return val
|
||||
|
||||
|
||||
class ColumnOrdering(pydantic.BaseModel):
|
||||
column_name: str
|
||||
ascending: bool = True
|
||||
nulls_first: bool = False
|
||||
|
||||
|
||||
class FullTextQueryType(str, Enum):
|
||||
MATCH = "match"
|
||||
MATCH_PHRASE = "match_phrase"
|
||||
@@ -504,6 +511,8 @@ class Query(pydantic.BaseModel):
|
||||
# Bypass the vector index and use a brute force search
|
||||
bypass_vector_index: Optional[bool] = None
|
||||
|
||||
order_by: Optional[List[ColumnOrdering]] = None
|
||||
|
||||
@classmethod
|
||||
def from_inner(cls, req: PyQueryRequest) -> Self:
|
||||
query = cls()
|
||||
@@ -524,6 +533,8 @@ class Query(pydantic.BaseModel):
|
||||
query.refine_factor = req.refine_factor
|
||||
query.bypass_vector_index = req.bypass_vector_index
|
||||
query.postfilter = req.postfilter
|
||||
if req.order_by is not None:
|
||||
query.order_by = [ColumnOrdering(**o) for o in req.order_by]
|
||||
if req.full_text_search is not None:
|
||||
query.full_text_query = FullTextSearchQuery(
|
||||
columns=None,
|
||||
@@ -572,9 +583,22 @@ class LanceQueryBuilder(ABC):
|
||||
If "auto", the query type is inferred based on the query.
|
||||
vector_column_name: str
|
||||
The name of the vector column to use for vector search.
|
||||
ordering_field_name: Optional[str]
|
||||
.. deprecated:: 0.27.0
|
||||
Use ``order_by()`` method instead.
|
||||
fts_columns: Optional[Union[str, List[str]]]
|
||||
The columns to search in for full text search.
|
||||
fast_search: bool
|
||||
Skip flat search of unindexed data.
|
||||
"""
|
||||
if ordering_field_name is not None:
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"ordering_field_name is deprecated, use .order_by() method instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
# Check hybrid search first as it supports empty query pattern
|
||||
if query_type == "hybrid":
|
||||
# hybrid fts and vector query
|
||||
@@ -671,6 +695,7 @@ class LanceQueryBuilder(ABC):
|
||||
self._text = None
|
||||
self._ef = None
|
||||
self._bypass_vector_index = None
|
||||
self._order_by = None
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.3.1",
|
||||
@@ -694,6 +719,7 @@ class LanceQueryBuilder(ABC):
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
*,
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Execute the query and return the results as a pandas DataFrame.
|
||||
@@ -711,9 +737,12 @@ class LanceQueryBuilder(ABC):
|
||||
timeout: Optional[timedelta]
|
||||
The maximum time to wait for the query to complete.
|
||||
If None, wait indefinitely.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten)
|
||||
return tbl.to_pandas()
|
||||
return tbl.to_pandas(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def to_arrow(self, *, timeout: Optional[timedelta] = None) -> pa.Table:
|
||||
@@ -947,6 +976,24 @@ class LanceQueryBuilder(ABC):
|
||||
""" # noqa: E501
|
||||
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
|
||||
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
|
||||
"""
|
||||
Set the ordering for the results.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ordering: Optional[List[ColumnOrdering]]
|
||||
The ordering to use for the results. If None, then the default ordering
|
||||
will be used.
|
||||
|
||||
Returns
|
||||
-------
|
||||
LanceQueryBuilder
|
||||
The LanceQueryBuilder object.
|
||||
"""
|
||||
self._order_by = ordering
|
||||
return self
|
||||
|
||||
def analyze_plan(self) -> str:
|
||||
"""
|
||||
Run the query and return its execution plan with runtime metrics.
|
||||
@@ -1314,6 +1361,7 @@ class LanceVectorQueryBuilder(LanceQueryBuilder):
|
||||
fast_search=self._fast_search,
|
||||
ef=self._ef,
|
||||
bypass_vector_index=self._bypass_vector_index,
|
||||
order_by=self._order_by,
|
||||
)
|
||||
|
||||
def to_batches(
|
||||
@@ -1465,7 +1513,9 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
|
||||
super().__init__(table)
|
||||
self._query = query
|
||||
self._phrase_query = False
|
||||
self.ordering_field_name = ordering_field_name
|
||||
# Deprecated compatibility parameter. Native FTS ordering is now
|
||||
# configured through order_by(); LanceQueryBuilder.create emits the warning.
|
||||
_ = ordering_field_name
|
||||
self._reranker = None
|
||||
self._fast_search = fast_search
|
||||
if isinstance(fts_columns, str):
|
||||
@@ -1514,6 +1564,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
|
||||
),
|
||||
offset=self._offset,
|
||||
fast_search=self._fast_search,
|
||||
order_by=self._order_by,
|
||||
)
|
||||
|
||||
def output_schema(self) -> pa.Schema:
|
||||
@@ -1579,6 +1630,7 @@ class LanceEmptyQueryBuilder(LanceQueryBuilder):
|
||||
limit=self._limit,
|
||||
with_row_id=self._with_row_id,
|
||||
offset=self._offset,
|
||||
order_by=self._order_by,
|
||||
)
|
||||
|
||||
def output_schema(self) -> pa.Schema:
|
||||
@@ -2305,6 +2357,7 @@ class AsyncQueryBase(object):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Execute the query and collect the results into a pandas DataFrame.
|
||||
@@ -2337,10 +2390,13 @@ class AsyncQueryBase(object):
|
||||
The maximum time to wait for the query to complete.
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
return (
|
||||
flatten_columns(await self.to_arrow(timeout=timeout), flatten)
|
||||
).to_pandas()
|
||||
).to_pandas(**kwargs)
|
||||
|
||||
async def to_polars(
|
||||
self,
|
||||
@@ -2502,6 +2558,27 @@ class AsyncStandardQuery(AsyncQueryBase):
|
||||
self._inner.offset(offset)
|
||||
return self
|
||||
|
||||
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
|
||||
"""
|
||||
Set the ordering for the results.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ordering: Optional[List[ColumnOrdering]]
|
||||
The ordering to use for the results. If None, then the default ordering
|
||||
will be used.
|
||||
"""
|
||||
if ordering is None:
|
||||
self._inner.order_by(None)
|
||||
else:
|
||||
self._inner.order_by(
|
||||
[
|
||||
o.model_dump() if hasattr(o, "model_dump") else o.dict()
|
||||
for o in ordering
|
||||
]
|
||||
)
|
||||
return self
|
||||
|
||||
def fast_search(self) -> Self:
|
||||
"""
|
||||
Skip searching un-indexed data.
|
||||
@@ -3272,16 +3349,18 @@ class BaseQueryBuilder(object):
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
"""
|
||||
async_iter = LOOP.run(self._inner.execute(max_batch_length, timeout))
|
||||
async_reader = LOOP.run(
|
||||
self._inner.to_batches(max_batch_length=max_batch_length, timeout=timeout)
|
||||
)
|
||||
|
||||
def iter_sync():
|
||||
try:
|
||||
while True:
|
||||
yield LOOP.run(async_iter.__anext__())
|
||||
yield LOOP.run(async_reader.__anext__())
|
||||
except StopAsyncIteration:
|
||||
return
|
||||
|
||||
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
|
||||
return pa.RecordBatchReader.from_batches(async_reader.schema, iter_sync())
|
||||
|
||||
def to_arrow(self, timeout: Optional[timedelta] = None) -> pa.Table:
|
||||
"""
|
||||
@@ -3321,6 +3400,7 @@ class BaseQueryBuilder(object):
|
||||
self,
|
||||
flatten: Optional[Union[int, bool]] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
**kwargs,
|
||||
) -> "pd.DataFrame":
|
||||
"""
|
||||
Execute the query and collect the results into a pandas DataFrame.
|
||||
@@ -3353,8 +3433,11 @@ class BaseQueryBuilder(object):
|
||||
The maximum time to wait for the query to complete.
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
**kwargs
|
||||
Forwarded to pyarrow.Table.to_pandas after query execution and
|
||||
optional flattening.
|
||||
"""
|
||||
return LOOP.run(self._inner.to_pandas(flatten, timeout))
|
||||
return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs))
|
||||
|
||||
def to_polars(
|
||||
self,
|
||||
|
||||
@@ -14,6 +14,7 @@ from lancedb._lancedb import (
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
LsmWriteSpec,
|
||||
MergeResult,
|
||||
UpdateResult,
|
||||
)
|
||||
@@ -39,7 +40,7 @@ from lancedb.embeddings import EmbeddingFunctionRegistry
|
||||
from lancedb.table import _normalize_progress
|
||||
|
||||
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder
|
||||
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
|
||||
from ..table import AsyncTable, BlobMode, IndexStatistics, Query, Table, Tags
|
||||
from ..types import BaseTokenizerType
|
||||
|
||||
|
||||
@@ -100,7 +101,7 @@ class RemoteTable(Table):
|
||||
"""to_arrow() is not yet supported on LanceDB cloud."""
|
||||
raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.")
|
||||
|
||||
def to_pandas(self):
|
||||
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs):
|
||||
"""to_pandas() is not yet supported on LanceDB cloud."""
|
||||
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
|
||||
|
||||
@@ -655,6 +656,18 @@ class RemoteTable(Table):
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.set_lsm_write_spec(spec))
|
||||
|
||||
def unset_lsm_write_spec(self) -> None:
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def drop_index(self, index_name: str):
|
||||
return LOOP.run(self._table.drop_index(index_name))
|
||||
|
||||
|
||||
@@ -87,6 +87,8 @@ from .util import (
|
||||
)
|
||||
from .index import lang_mapping
|
||||
|
||||
BlobMode = Literal["lazy", "bytes", "descriptions"]
|
||||
|
||||
_MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera")
|
||||
_MODEL_BACKED_TOKENIZER_ERRORS = (
|
||||
"unknown base tokenizer",
|
||||
@@ -154,6 +156,7 @@ if TYPE_CHECKING:
|
||||
AlterColumnsResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
MergeResult,
|
||||
UpdateResult,
|
||||
)
|
||||
@@ -759,14 +762,22 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def to_pandas(self) -> "pandas.DataFrame":
|
||||
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pandas.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how blob columns are returned for backends that support
|
||||
Lance blob-aware pandas conversion.
|
||||
**kwargs
|
||||
Forwarded to PyArrow / Lance pandas conversion.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
return self.to_arrow().to_pandas()
|
||||
return self.to_arrow().to_pandas(**kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def to_arrow(self) -> pa.Table:
|
||||
@@ -2167,7 +2178,7 @@ class LanceTable(Table):
|
||||
return LOOP.run(self._table.count_rows(filter))
|
||||
|
||||
def __repr__(self) -> str:
|
||||
val = f"{self.__class__.__name__}(name={self.name!r}, version={self.version}"
|
||||
val = f"{self.__class__.__name__}(name={self.name!r}"
|
||||
if self._conn.read_consistency_interval is not None:
|
||||
val += ", read_consistency_interval={!r}".format(
|
||||
self._conn.read_consistency_interval
|
||||
@@ -2182,14 +2193,27 @@ class LanceTable(Table):
|
||||
"""Return the first n rows of the table."""
|
||||
return LOOP.run(self._table.head(n))
|
||||
|
||||
def to_pandas(self) -> "pd.DataFrame":
|
||||
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how Lance blob columns are returned.
|
||||
**kwargs
|
||||
Forwarded to Lance pandas conversion.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
return self.to_arrow().to_pandas()
|
||||
if blob_mode == "lazy" and (
|
||||
self._namespace_client is not None
|
||||
or get_uri_scheme(self._dataset_path) == "memory"
|
||||
):
|
||||
return self.to_arrow().to_pandas(**kwargs)
|
||||
|
||||
return self.to_lance().to_pandas(blob_mode=blob_mode, **kwargs)
|
||||
|
||||
def to_arrow(self) -> pa.Table:
|
||||
"""Return the table as a pyarrow Table.
|
||||
@@ -2518,11 +2542,6 @@ class LanceTable(Table):
|
||||
"at a time. To search over multiple text fields, create a "
|
||||
"separate FTS index for each field."
|
||||
)
|
||||
if "." in field_names:
|
||||
raise ValueError(
|
||||
"Native FTS indexes can only be created on top-level fields. "
|
||||
f"Received nested field path: {field_names!r}."
|
||||
)
|
||||
|
||||
if tokenizer_name is None:
|
||||
tokenizer_configs = {
|
||||
@@ -3263,6 +3282,21 @@ class LanceTable(Table):
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
|
||||
"""Set the unenforced primary key. See
|
||||
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
|
||||
return LOOP.run(self._table.set_unenforced_primary_key(columns))
|
||||
|
||||
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Install an LsmWriteSpec. See
|
||||
[`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.set_lsm_write_spec(spec))
|
||||
|
||||
def unset_lsm_write_spec(self) -> None:
|
||||
"""Remove the LsmWriteSpec. See
|
||||
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
"""
|
||||
Check if the table is using the new v2 manifest paths.
|
||||
@@ -3808,6 +3842,69 @@ class AsyncTable:
|
||||
Any attempt to use the table after it has been closed will raise an error."""
|
||||
return self._inner.close()
|
||||
|
||||
async def set_unenforced_primary_key(
|
||||
self, columns: Union[str, Iterable[str]]
|
||||
) -> None:
|
||||
"""Set the unenforced primary key for this table to the given
|
||||
ordered list of columns.
|
||||
|
||||
"Unenforced" means LanceDB does not check uniqueness on writes; the
|
||||
columns are recorded in the schema as the primary key so that
|
||||
features such as `merge_insert` can use them. Calling this again
|
||||
replaces any previously-set primary key.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
columns : str or Iterable[str]
|
||||
Either a single column name (single-column key) or an ordered
|
||||
iterable of column names (composite key). Each column dtype
|
||||
must be one of: int32, int64, utf8, large_utf8, binary,
|
||||
large_binary, fixed_size_binary.
|
||||
"""
|
||||
if isinstance(columns, str):
|
||||
columns = [columns]
|
||||
else:
|
||||
columns = list(columns)
|
||||
await self._inner.set_unenforced_primary_key(columns)
|
||||
|
||||
async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
|
||||
"""Install an LsmWriteSpec on this table.
|
||||
|
||||
The spec selects Lance's MemWAL LSM-style write path for future
|
||||
`merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding
|
||||
strategies:
|
||||
|
||||
- ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by
|
||||
the single-column unenforced primary key.
|
||||
- ``LsmWriteSpec.identity(column)`` — shard by the raw value of a
|
||||
scalar column.
|
||||
- ``LsmWriteSpec.unsharded()`` — route every write to a single shard.
|
||||
|
||||
All variants require the table to have an unenforced primary key set
|
||||
via [`set_unenforced_primary_key`]; bucket sharding additionally
|
||||
requires it to be the single column being bucketed.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
spec : LsmWriteSpec
|
||||
The sharding spec to install.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> from lancedb._lancedb import LsmWriteSpec
|
||||
>>> # table.set_unenforced_primary_key("id")
|
||||
>>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16))
|
||||
"""
|
||||
await self._inner.set_lsm_write_spec(spec)
|
||||
|
||||
async def unset_lsm_write_spec(self) -> None:
|
||||
"""Remove the LsmWriteSpec from this table.
|
||||
|
||||
Reverts to the standard `merge_insert` write path. Errors if no spec
|
||||
is currently set.
|
||||
"""
|
||||
await self._inner.unset_lsm_write_spec()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table."""
|
||||
@@ -3866,14 +3963,39 @@ class AsyncTable:
|
||||
"""
|
||||
return AsyncQuery(self._inner.query())
|
||||
|
||||
async def to_pandas(self) -> "pd.DataFrame":
|
||||
async def _to_lance(self, **kwargs) -> lance.LanceDataset:
|
||||
try:
|
||||
import lance
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"The lance library is required to use this function. "
|
||||
"Please install with `pip install pylance`."
|
||||
)
|
||||
|
||||
return lance.dataset(
|
||||
await self.uri(),
|
||||
version=await self.version(),
|
||||
storage_options=await self.latest_storage_options(),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
async def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
blob_mode: str, default "lazy"
|
||||
Controls how Lance blob columns are returned.
|
||||
**kwargs
|
||||
Forwarded to PyArrow / Lance pandas conversion.
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame
|
||||
"""
|
||||
return (await self.to_arrow()).to_pandas()
|
||||
if blob_mode == "lazy":
|
||||
return (await self.to_arrow()).to_pandas(**kwargs)
|
||||
return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs)
|
||||
|
||||
async def to_arrow(self) -> pa.Table:
|
||||
"""Return the table as a pyarrow Table.
|
||||
@@ -4512,6 +4634,8 @@ class AsyncTable:
|
||||
async_query = async_query.fast_search()
|
||||
if query.with_row_id:
|
||||
async_query = async_query.with_row_id()
|
||||
if query.order_by:
|
||||
async_query = async_query.order_by(query.order_by)
|
||||
|
||||
if query.vector:
|
||||
async_query = async_query.nearest_to(query.vector).distance_range(
|
||||
|
||||
@@ -10,7 +10,7 @@ import pathlib
|
||||
import warnings
|
||||
from datetime import date, datetime
|
||||
from functools import singledispatch
|
||||
from typing import Tuple, Union, Optional, Any
|
||||
from typing import Tuple, Union, Optional, Any, List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import numpy as np
|
||||
@@ -189,7 +189,33 @@ def flatten_columns(tbl: pa.Table, flatten: Optional[Union[int, bool]] = None):
|
||||
return tbl
|
||||
|
||||
|
||||
def inf_vector_column_query(schema: pa.Schema) -> str:
|
||||
def _format_field_path(path: List[str]) -> str:
|
||||
def format_segment(segment: str) -> str:
|
||||
if all(char.isalnum() or char == "_" for char in segment):
|
||||
return segment
|
||||
return f"`{segment.replace('`', '``')}`"
|
||||
|
||||
return ".".join(format_segment(segment) for segment in path)
|
||||
|
||||
|
||||
def _iter_vector_columns(
|
||||
field: pa.Field, path: List[str], dim: Optional[int] = None
|
||||
) -> List[str]:
|
||||
field_path = [*path, field.name]
|
||||
if is_vector_column(field.type):
|
||||
vector_dim = infer_vector_column_dim(field.type)
|
||||
if dim is None or vector_dim == dim:
|
||||
return [_format_field_path(field_path)]
|
||||
return []
|
||||
if pa.types.is_struct(field.type):
|
||||
columns = []
|
||||
for idx in range(field.type.num_fields):
|
||||
columns.extend(_iter_vector_columns(field.type.field(idx), field_path, dim))
|
||||
return columns
|
||||
return []
|
||||
|
||||
|
||||
def inf_vector_column_query(schema: pa.Schema, dim: Optional[int] = None) -> str:
|
||||
"""
|
||||
Get the vector column name
|
||||
|
||||
@@ -202,26 +228,21 @@ def inf_vector_column_query(schema: pa.Schema) -> str:
|
||||
-------
|
||||
str: the vector column name.
|
||||
"""
|
||||
vector_col_name = ""
|
||||
vector_col_count = 0
|
||||
for field_name in schema.names:
|
||||
field = schema.field(field_name)
|
||||
if is_vector_column(field.type):
|
||||
vector_col_count += 1
|
||||
if vector_col_count > 1:
|
||||
raise ValueError(
|
||||
"Schema has more than one vector column. "
|
||||
"Please specify the vector column name "
|
||||
"for vector search"
|
||||
)
|
||||
elif vector_col_count == 1:
|
||||
vector_col_name = field_name
|
||||
if vector_col_count == 0:
|
||||
vector_col_names = []
|
||||
for field in schema:
|
||||
vector_col_names.extend(_iter_vector_columns(field, [], dim))
|
||||
if len(vector_col_names) > 1:
|
||||
raise ValueError(
|
||||
"Schema has more than one vector column. "
|
||||
"Please specify the vector column name "
|
||||
f"for vector search. Candidates: {vector_col_names}"
|
||||
)
|
||||
if len(vector_col_names) == 0:
|
||||
raise ValueError(
|
||||
"There is no vector column in the data. "
|
||||
"Please specify the vector column name for vector search"
|
||||
)
|
||||
return vector_col_name
|
||||
return vector_col_names[0]
|
||||
|
||||
|
||||
def is_vector_column(data_type: pa.DataType) -> bool:
|
||||
@@ -247,6 +268,29 @@ def is_vector_column(data_type: pa.DataType) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def infer_vector_column_dim(data_type: pa.DataType) -> Optional[int]:
|
||||
if pa.types.is_fixed_size_list(data_type):
|
||||
return data_type.list_size
|
||||
if pa.types.is_list(data_type):
|
||||
return infer_vector_column_dim(data_type.value_type)
|
||||
return None
|
||||
|
||||
|
||||
def _query_vector_dim(query: Optional[Any]) -> Optional[int]:
|
||||
if query is None:
|
||||
return None
|
||||
if isinstance(query, np.ndarray):
|
||||
if query.ndim == 0:
|
||||
return None
|
||||
return query.shape[-1]
|
||||
if isinstance(query, list) and query:
|
||||
first = query[0]
|
||||
if isinstance(first, (list, tuple, np.ndarray)):
|
||||
return len(first)
|
||||
return len(query)
|
||||
return None
|
||||
|
||||
|
||||
def infer_vector_column_name(
|
||||
schema: pa.Schema,
|
||||
query_type: str,
|
||||
@@ -262,7 +306,9 @@ def infer_vector_column_name(
|
||||
|
||||
if query is not None or query_type == "hybrid":
|
||||
try:
|
||||
vector_column_name = inf_vector_column_query(schema)
|
||||
vector_column_name = inf_vector_column_query(
|
||||
schema, dim=_query_vector_dim(query)
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
segmenter:
|
||||
mode: "normal"
|
||||
dictionary:
|
||||
path: "./python/tests/models/lindera/ipadic/main"
|
||||
dictionary: "./python/tests/models/lindera/ipadic/main"
|
||||
|
||||
Binary file not shown.
@@ -6,6 +6,7 @@ import re
|
||||
import sys
|
||||
from datetime import timedelta
|
||||
import os
|
||||
from types import SimpleNamespace
|
||||
|
||||
import lancedb
|
||||
import numpy as np
|
||||
@@ -188,6 +189,43 @@ def test_table_names(tmp_db: lancedb.DBConnection):
|
||||
assert len(result) == 3
|
||||
|
||||
|
||||
def test_db_contains_and_len_include_all_table_name_pages(tmp_db: lancedb.DBConnection):
|
||||
for idx in range(20):
|
||||
tmp_db.create_table(f"table_{idx}", data=[{"id": idx}])
|
||||
|
||||
assert len(tmp_db) == 20
|
||||
for idx in range(20):
|
||||
assert f"table_{idx}" in tmp_db
|
||||
assert "does_not_exist" not in tmp_db
|
||||
|
||||
|
||||
def test_db_contains_stops_after_matching_table_page(
|
||||
tmp_db: lancedb.DBConnection, monkeypatch
|
||||
):
|
||||
calls = []
|
||||
pages = {
|
||||
None: SimpleNamespace(tables=["table_0", "table_1"], page_token="next"),
|
||||
"next": SimpleNamespace(tables=["table_2"], page_token=None),
|
||||
}
|
||||
|
||||
def list_tables(*, page_token=None, **_kwargs):
|
||||
calls.append(page_token)
|
||||
return pages[page_token]
|
||||
|
||||
monkeypatch.setattr(tmp_db, "list_tables", list_tables)
|
||||
|
||||
assert "table_1" in tmp_db
|
||||
assert calls == [None]
|
||||
|
||||
calls.clear()
|
||||
assert "table_2" in tmp_db
|
||||
assert calls == [None, "next"]
|
||||
|
||||
calls.clear()
|
||||
assert len(tmp_db) == 3
|
||||
assert calls == [None, "next"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_table_names_async(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
|
||||
@@ -29,6 +29,7 @@ from lancedb.query import (
|
||||
MultiMatchQuery,
|
||||
PhraseQuery,
|
||||
BooleanQuery,
|
||||
ColumnOrdering,
|
||||
Occur,
|
||||
LanceFtsQueryBuilder,
|
||||
)
|
||||
@@ -116,8 +117,7 @@ def lindera_ipadic(language_model_home):
|
||||
config_path.write_text(
|
||||
"segmenter:\n"
|
||||
' mode: "normal"\n'
|
||||
" dictionary:\n"
|
||||
f' path: "{extracted_model.resolve().as_posix()}"\n',
|
||||
f' dictionary: "{extracted_model.resolve().as_posix()}"\n',
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
@@ -500,6 +500,36 @@ async def test_search_fts_specify_column_async(async_table):
|
||||
pass
|
||||
|
||||
|
||||
def test_search_order_by_descending(table):
|
||||
table.create_fts_index("text")
|
||||
rows = (
|
||||
table.search("puppy")
|
||||
.order_by([ColumnOrdering(column_name="count", ascending=False)])
|
||||
.limit(20)
|
||||
.select(["text", "count"])
|
||||
.to_list()
|
||||
)
|
||||
|
||||
for r in rows:
|
||||
assert "puppy" in r["text"]
|
||||
assert sorted(rows, key=lambda x: x["count"], reverse=True) == rows
|
||||
|
||||
|
||||
def test_search_order_by_ascending(table):
|
||||
table.create_fts_index("text")
|
||||
rows = (
|
||||
table.search("puppy")
|
||||
.order_by([ColumnOrdering(column_name="count", ascending=True)])
|
||||
.limit(20)
|
||||
.select(["text", "count"])
|
||||
.to_list()
|
||||
)
|
||||
|
||||
for r in rows:
|
||||
assert "puppy" in r["text"]
|
||||
assert sorted(rows, key=lambda x: x["count"]) == rows
|
||||
|
||||
|
||||
def test_create_index_from_table(tmp_path, table):
|
||||
table.create_fts_index("text")
|
||||
df = table.search("puppy").limit(5).select(["text"]).to_pandas()
|
||||
@@ -533,8 +563,111 @@ def test_create_index_multiple_columns(tmp_path, table):
|
||||
|
||||
|
||||
def test_nested_schema(tmp_path, table):
|
||||
with pytest.raises(ValueError, match="top-level fields"):
|
||||
table.create_fts_index("nested.text")
|
||||
table.create_fts_index("nested.text", with_position=True)
|
||||
indices = table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type == "FTS"
|
||||
assert indices[0].columns == ["nested.text"]
|
||||
|
||||
results = (
|
||||
table.search("puppy", query_type="fts", fts_columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
results = table.search(MatchQuery("puppy", "nested.text")).limit(5).to_list()
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
phrase_results = (
|
||||
table.search(PhraseQuery("puppy runs", "nested.text")).limit(5).to_list()
|
||||
)
|
||||
assert len(phrase_results) > 0
|
||||
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
|
||||
|
||||
hybrid_results = (
|
||||
table.search(query_type="hybrid", fts_columns="nested.text")
|
||||
.vector([0 for _ in range(128)])
|
||||
.text("puppy")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_schema_async(async_table):
|
||||
await async_table.create_index("nested.text", config=FTS(with_position=True))
|
||||
indices = await async_table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type == "FTS"
|
||||
assert indices[0].columns == ["nested.text"]
|
||||
|
||||
results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text("puppy", columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text(MatchQuery("puppy", "nested.text"))
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
phrase_results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text(PhraseQuery("puppy runs", "nested.text"))
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(phrase_results) > 0
|
||||
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
|
||||
|
||||
hybrid_results = await (
|
||||
async_table.query()
|
||||
.nearest_to([0 for _ in range(128)])
|
||||
.nearest_to_text("puppy", columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
|
||||
def test_nested_schema_rejects_invalid_fts_fields(tmp_path):
|
||||
db = ldb.connect(tmp_path)
|
||||
data = pa.table(
|
||||
{
|
||||
"payload": pa.array(
|
||||
[
|
||||
{"text": "puppy runs", "count": 1},
|
||||
{"text": "car drives", "count": 2},
|
||||
]
|
||||
),
|
||||
"vector": pa.array(
|
||||
[[0.1, 0.1], [0.2, 0.2]],
|
||||
type=pa.list_(pa.float32(), list_size=2),
|
||||
),
|
||||
}
|
||||
)
|
||||
table = db.create_table("test", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="FTS index cannot be created.*payload"):
|
||||
table.create_fts_index("payload")
|
||||
|
||||
with pytest.raises(ValueError, match="FTS index cannot be created.*count"):
|
||||
table.create_fts_index("payload.count")
|
||||
|
||||
with pytest.raises(ValueError, match="Field path `payload.missing` not found"):
|
||||
table.create_fts_index("payload.missing")
|
||||
|
||||
|
||||
def test_search_index_with_filter(table):
|
||||
|
||||
@@ -105,6 +105,46 @@ async def test_create_scalar_index(some_table: AsyncTable):
|
||||
assert len(indices) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
|
||||
metadata_type = pa.struct(
|
||||
[
|
||||
pa.field("user_id", pa.int32()),
|
||||
pa.field("user.id", pa.int32()),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_arrays(
|
||||
[
|
||||
pa.array([1, 2, 3], type=pa.int32()),
|
||||
pa.array(
|
||||
[
|
||||
{"user_id": 10, "user.id": 100},
|
||||
{"user_id": 20, "user.id": 200},
|
||||
{"user_id": 30, "user.id": 300},
|
||||
],
|
||||
type=metadata_type,
|
||||
),
|
||||
],
|
||||
names=["user_id", "metadata"],
|
||||
)
|
||||
table = await db_async.create_table("nested_scalar_index", data)
|
||||
|
||||
await table.create_index("user_id", config=BTree(), name="top_user_id_idx")
|
||||
await table.create_index(
|
||||
"metadata.user_id", config=BTree(), name="nested_user_id_idx"
|
||||
)
|
||||
await table.create_index(
|
||||
"metadata.`user.id`", config=BTree(), name="escaped_user_id_idx"
|
||||
)
|
||||
|
||||
columns_by_name = {
|
||||
index.name: index.columns for index in await table.list_indices()
|
||||
}
|
||||
assert columns_by_name["top_user_id_idx"] == ["user_id"]
|
||||
assert columns_by_name["nested_user_id_idx"] == ["metadata.user_id"]
|
||||
assert columns_by_name["escaped_user_id_idx"] == ["metadata.`user.id`"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
|
||||
await some_table.create_index("fsb", config=BTree())
|
||||
|
||||
149
python/python/tests/test_lsm_write_spec.py
Normal file
149
python/python/tests/test_lsm_write_spec.py
Normal file
@@ -0,0 +1,149 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for installing and clearing an LsmWriteSpec via
|
||||
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
from lancedb._lancedb import LsmWriteSpec
|
||||
|
||||
SCHEMA = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.utf8(), nullable=False),
|
||||
pa.field("v", pa.int32(), nullable=False),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _batch(ids, vs):
|
||||
return pa.RecordBatch.from_arrays(
|
||||
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
|
||||
schema=SCHEMA,
|
||||
)
|
||||
|
||||
|
||||
def _reader(ids, vs):
|
||||
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
|
||||
|
||||
|
||||
def _make_table(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _reader(["seed"], [0]))
|
||||
return db, table
|
||||
|
||||
|
||||
def test_set_lsm_write_spec_validates(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# No PK set yet.
|
||||
with pytest.raises(Exception, match="primary key"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# Column mismatch.
|
||||
with pytest.raises(Exception, match="match"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
|
||||
|
||||
# Out-of-range num_buckets.
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
|
||||
|
||||
# Happy path then mutation rejected.
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
with pytest.raises(Exception, match="mutation"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
||||
|
||||
|
||||
def test_unset_lsm_write_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# unset errors when no spec is set.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
# Install a spec, then remove it; afterwards a fresh spec can be set.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
table.unset_lsm_write_spec()
|
||||
# A second unset errors — there is no spec left to remove.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
table.unset_lsm_write_spec()
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
|
||||
|
||||
|
||||
def test_set_unsharded_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
# Lance MemWAL still requires a primary key on the dataset; Unsharded
|
||||
# just skips per-row hashing.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_lsm_write_spec_repr():
|
||||
s = LsmWriteSpec.bucket("id", 4)
|
||||
assert s.spec_type == "bucket"
|
||||
assert s.column == "id"
|
||||
assert s.num_buckets == 4
|
||||
assert s.maintained_indexes == []
|
||||
assert "bucket" in repr(s)
|
||||
assert "id" in repr(s)
|
||||
assert "4" in repr(s)
|
||||
|
||||
u = LsmWriteSpec.unsharded()
|
||||
assert u.spec_type == "unsharded"
|
||||
assert u.column is None
|
||||
assert u.num_buckets is None
|
||||
assert "unsharded" in repr(u)
|
||||
|
||||
|
||||
def test_lsm_write_spec_with_maintained_indexes():
|
||||
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
|
||||
assert s.maintained_indexes == ["idx_a", "idx_b"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_set_unset_lsm_write_spec(tmp_path):
|
||||
db = await lancedb.connect_async(
|
||||
tmp_path, read_consistency_interval=timedelta(seconds=0)
|
||||
)
|
||||
table = await db.create_table(
|
||||
"t",
|
||||
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
|
||||
)
|
||||
|
||||
await table.set_unenforced_primary_key("id")
|
||||
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
await table.unset_lsm_write_spec()
|
||||
# A second unset errors.
|
||||
with pytest.raises(Exception, match="no LSM write spec"):
|
||||
await table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_set_identity_spec(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
# Identity sharding still requires an unenforced primary key on the
|
||||
# table; it shards by the raw value of the given column.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
|
||||
def test_lsm_write_spec_identity_and_writer_config_defaults():
|
||||
s = LsmWriteSpec.identity("v")
|
||||
assert s.spec_type == "identity"
|
||||
assert s.column == "v"
|
||||
assert s.num_buckets is None
|
||||
assert "identity" in repr(s)
|
||||
|
||||
s = s.with_writer_config_defaults({"durable_write": "false"})
|
||||
assert s.writer_config_defaults == {"durable_write": "false"}
|
||||
assert "durable_write" in repr(s)
|
||||
@@ -1080,3 +1080,29 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
|
||||
"""Test __getitems__ with an out-of-range offset raises an error."""
|
||||
with pytest.raises(Exception):
|
||||
some_permutation.__getitems__([999999])
|
||||
|
||||
|
||||
def test_take_offsets(some_permutation: Permutation):
|
||||
result = some_permutation.take_offsets([0, 1, 2])
|
||||
|
||||
assert isinstance(result, list)
|
||||
assert "id" in result[0]
|
||||
assert "value" in result[0]
|
||||
assert len(result) == 3
|
||||
|
||||
|
||||
def test_take_offsets_empty_identity_permutation(mem_db):
|
||||
tbl = mem_db.create_table(
|
||||
"test_table", pa.table({"id": range(10), "value": range(10)})
|
||||
)
|
||||
permutation = Permutation.identity(tbl)
|
||||
|
||||
result = permutation.take_offsets([])
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_take_offsets_empty_permutation(some_permutation: Permutation):
|
||||
result = some_permutation.take_offsets([])
|
||||
|
||||
assert result == []
|
||||
|
||||
79
python/python/tests/test_primary_key.py
Normal file
79
python/python/tests/test_primary_key.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for Table.set_unenforced_primary_key."""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
|
||||
|
||||
def _empty_table(path, schema):
|
||||
db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0))
|
||||
return db.create_table("t", schema=schema)
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path):
|
||||
schema = pa.schema([pa.field("id", pa.int64(), nullable=False)])
|
||||
|
||||
# Bare string.
|
||||
table = _empty_table(tmp_path / "s", schema)
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# One-element list.
|
||||
table = _empty_table(tmp_path / "l", schema)
|
||||
table.set_unenforced_primary_key(["id"])
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path,
|
||||
pa.schema(
|
||||
[
|
||||
pa.field("a", pa.utf8(), nullable=False),
|
||||
pa.field("b", pa.int64(), nullable=False),
|
||||
]
|
||||
),
|
||||
)
|
||||
# Compound keys are not supported.
|
||||
with pytest.raises(Exception, match="compound"):
|
||||
table.set_unenforced_primary_key(["a", "b"])
|
||||
# Empty input.
|
||||
with pytest.raises(Exception, match="required"):
|
||||
table.set_unenforced_primary_key([])
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_is_immutable(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path,
|
||||
pa.schema(
|
||||
[
|
||||
pa.field("a", pa.utf8(), nullable=False),
|
||||
pa.field("b", pa.int64(), nullable=False),
|
||||
]
|
||||
),
|
||||
)
|
||||
table.set_unenforced_primary_key("a")
|
||||
# The primary key cannot be changed or re-set once installed.
|
||||
with pytest.raises(Exception, match="already set"):
|
||||
table.set_unenforced_primary_key("b")
|
||||
with pytest.raises(Exception, match="already set"):
|
||||
table.set_unenforced_primary_key("a")
|
||||
|
||||
|
||||
def test_set_unenforced_primary_key_validates(tmp_path):
|
||||
table = _empty_table(
|
||||
tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)])
|
||||
)
|
||||
# Unknown column.
|
||||
with pytest.raises(Exception, match="not found"):
|
||||
table.set_unenforced_primary_key("nonexistent")
|
||||
|
||||
# Unsupported dtype (Float32 not in the supported set).
|
||||
bad = _empty_table(
|
||||
tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)])
|
||||
)
|
||||
with pytest.raises(Exception, match="not supported"):
|
||||
bad.set_unenforced_primary_key("id")
|
||||
@@ -25,6 +25,7 @@ from lancedb.query import (
|
||||
AsyncHybridQuery,
|
||||
AsyncQueryBase,
|
||||
AsyncVectorQuery,
|
||||
ColumnOrdering,
|
||||
LanceVectorQueryBuilder,
|
||||
MatchQuery,
|
||||
PhraseQuery,
|
||||
@@ -164,6 +165,87 @@ def test_offset(table):
|
||||
assert len(results_with_offset.to_pandas()) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_to_pandas_kwargs(table, table_async):
|
||||
sync_df = (
|
||||
LanceVectorQueryBuilder(table, [0, 0], "vector")
|
||||
.select(["id"])
|
||||
.limit(1)
|
||||
.to_pandas(split_blocks=True)
|
||||
)
|
||||
assert sync_df["id"].tolist() == [1]
|
||||
|
||||
async_df = await (
|
||||
table_async.query().select(["id"]).limit(2).to_pandas(split_blocks=True)
|
||||
)
|
||||
assert async_df["id"].tolist() == [1, 2]
|
||||
|
||||
|
||||
def test_order_by_plain_query(mem_db):
|
||||
table = mem_db.create_table(
|
||||
"test_order_by",
|
||||
pa.table(
|
||||
{
|
||||
"group": [1, 1, 1, 2],
|
||||
"score": [None, 1.0, 1.0, 0.5],
|
||||
"name": ["z", "b", "a", "c"],
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
res = (
|
||||
table.search()
|
||||
.order_by(
|
||||
[
|
||||
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
|
||||
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
|
||||
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
|
||||
]
|
||||
)
|
||||
.to_arrow()
|
||||
)
|
||||
|
||||
assert res.select(["group", "score", "name"]).to_pylist() == [
|
||||
{"group": 1, "score": None, "name": "z"},
|
||||
{"group": 1, "score": 1.0, "name": "a"},
|
||||
{"group": 1, "score": 1.0, "name": "b"},
|
||||
{"group": 2, "score": 0.5, "name": "c"},
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_order_by_async_query(mem_db_async: AsyncConnection):
|
||||
table = await mem_db_async.create_table(
|
||||
"test_order_by_async",
|
||||
pa.table(
|
||||
{
|
||||
"group": [1, 1, 1, 2],
|
||||
"score": [None, 1.0, 1.0, 0.5],
|
||||
"name": ["z", "b", "a", "c"],
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
res = await (
|
||||
table.query()
|
||||
.order_by(
|
||||
[
|
||||
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
|
||||
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
|
||||
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
|
||||
]
|
||||
)
|
||||
.to_arrow()
|
||||
)
|
||||
|
||||
assert res.select(["group", "score", "name"]).to_pylist() == [
|
||||
{"group": 1, "score": None, "name": "z"},
|
||||
{"group": 1, "score": 1.0, "name": "a"},
|
||||
{"group": 1, "score": 1.0, "name": "b"},
|
||||
{"group": 2, "score": 0.5, "name": "c"},
|
||||
]
|
||||
|
||||
|
||||
def test_query_builder(table):
|
||||
rs = (
|
||||
LanceVectorQueryBuilder(table, [0, 0], "vector")
|
||||
@@ -1430,6 +1512,37 @@ def test_take_queries(tmp_path):
|
||||
]
|
||||
|
||||
|
||||
def test_take_queries_to_batches(tmp_path):
|
||||
# Regression test for the sync take-query path: `to_batches` previously
|
||||
# raised ``AttributeError: 'AsyncTakeQuery' object has no attribute
|
||||
# 'execute'`` because the inherited ``BaseQueryBuilder.to_batches`` called
|
||||
# ``execute`` on the async wrapper instead of the native query.
|
||||
db = lancedb.connect(tmp_path)
|
||||
data = pa.table({"idx": list(range(100)), "label": [str(i) for i in range(100)]})
|
||||
table = db.create_table("test", data)
|
||||
|
||||
# Take by offset → to_batches
|
||||
rs = list(table.take_offsets([5, 2, 17]).to_batches())
|
||||
assert all(isinstance(b, pa.RecordBatch) for b in rs)
|
||||
assert sum(b.num_rows for b in rs) == 3
|
||||
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
|
||||
|
||||
# Take by row id → to_batches
|
||||
rs = list(table.take_row_ids([5, 2, 17]).to_batches())
|
||||
assert all(isinstance(b, pa.RecordBatch) for b in rs)
|
||||
assert sum(b.num_rows for b in rs) == 3
|
||||
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
|
||||
|
||||
# Take with select projection → to_batches preserves the projection
|
||||
rs = list(table.take_row_ids([5, 2, 17]).select(["label"]).to_batches())
|
||||
assert all(b.schema.names == ["label"] for b in rs)
|
||||
assert sorted(v for b in rs for v in b.column("label").to_pylist()) == [
|
||||
"17",
|
||||
"2",
|
||||
"5",
|
||||
]
|
||||
|
||||
|
||||
def test_getitems(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
data = pa.table(
|
||||
|
||||
@@ -16,6 +16,7 @@ from packaging.version import Version
|
||||
|
||||
import lancedb
|
||||
from lancedb.conftest import MockTextEmbeddingFunction
|
||||
from lancedb.query import ColumnOrdering
|
||||
from lancedb.remote import ClientConfig
|
||||
from lancedb.remote.errors import HttpError, RetryError
|
||||
import pytest
|
||||
@@ -268,6 +269,25 @@ def test_table_unimplemented_functions():
|
||||
table.to_pandas()
|
||||
|
||||
|
||||
def test_table_to_pandas_not_supported():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=create":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"{}")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.create_table("test", [{"id": 1}])
|
||||
with pytest.raises(NotImplementedError):
|
||||
table.to_pandas()
|
||||
with pytest.raises(NotImplementedError):
|
||||
table.to_pandas(blob_mode="bytes", split_blocks=True)
|
||||
|
||||
|
||||
def test_table_add_in_threadpool():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/insert/":
|
||||
@@ -342,6 +362,22 @@ def test_table_create_indices():
|
||||
schema=dict(
|
||||
fields=[
|
||||
dict(name="id", type={"type": "int64"}, nullable=False),
|
||||
dict(name="text", type={"type": "string"}, nullable=False),
|
||||
dict(
|
||||
name="vector",
|
||||
type={
|
||||
"type": "fixed_size_list",
|
||||
"fields": [
|
||||
dict(
|
||||
name="item",
|
||||
type={"type": "float"},
|
||||
nullable=True,
|
||||
)
|
||||
],
|
||||
"length": 2,
|
||||
},
|
||||
nullable=False,
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
@@ -660,6 +696,18 @@ def test_query_sync_maximal():
|
||||
"ef": None,
|
||||
"filter": "id > 0",
|
||||
"columns": ["id", "name"],
|
||||
"order_by": [
|
||||
{
|
||||
"column_name": "score",
|
||||
"ascending": False,
|
||||
"nulls_first": True,
|
||||
},
|
||||
{
|
||||
"column_name": "id",
|
||||
"ascending": True,
|
||||
"nulls_first": False,
|
||||
},
|
||||
],
|
||||
"vector_column": "vector2",
|
||||
"fast_search": True,
|
||||
"with_row_id": True,
|
||||
@@ -677,6 +725,14 @@ def test_query_sync_maximal():
|
||||
.refine_factor(10)
|
||||
.nprobes(5)
|
||||
.where("id > 0", prefilter=True)
|
||||
.order_by(
|
||||
[
|
||||
ColumnOrdering(
|
||||
column_name="score", ascending=False, nulls_first=True
|
||||
),
|
||||
ColumnOrdering(column_name="id", ascending=True, nulls_first=False),
|
||||
]
|
||||
)
|
||||
.with_row_id(True)
|
||||
.select(["id", "name"])
|
||||
.to_list()
|
||||
|
||||
@@ -33,7 +33,7 @@ def test_basic(mem_db: DBConnection):
|
||||
table = mem_db.create_table("test", data=data)
|
||||
|
||||
assert table.name == "test"
|
||||
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
|
||||
assert "LanceTable(name='test', _conn=LanceDBConnection(" in repr(table)
|
||||
expected_schema = pa.schema(
|
||||
{
|
||||
"vector": pa.list_(pa.float32(), 2),
|
||||
@@ -47,6 +47,85 @@ def test_basic(mem_db: DBConnection):
|
||||
assert table.to_arrow() == expected_data
|
||||
|
||||
|
||||
def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": [1, 2], "text": ["one", "two"]})
|
||||
table = tmp_db.create_table("test_to_pandas_old_call", data=data)
|
||||
|
||||
expected = data.to_pandas()
|
||||
pd.testing.assert_frame_equal(table.to_pandas(), expected)
|
||||
|
||||
|
||||
def test_table_to_pandas_blob_bytes(tmp_db: DBConnection):
|
||||
pytest.importorskip("lance")
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data)
|
||||
|
||||
df = table.to_pandas(blob_mode="bytes")
|
||||
|
||||
assert df["blob"].tolist() == [b"hello", b"world"]
|
||||
|
||||
|
||||
def test_table_to_pandas_kwargs(tmp_db: DBConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": pa.array([1, 2], pa.int64())})
|
||||
table = tmp_db.create_table("test_to_pandas_kwargs", data=data)
|
||||
|
||||
df = table.to_pandas(types_mapper=pd.ArrowDtype)
|
||||
|
||||
assert str(df["id"].dtype) == "int64[pyarrow]"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection):
|
||||
pytest.importorskip("lance")
|
||||
data = pa.table(
|
||||
{
|
||||
"id": pa.array([1, 2], pa.int64()),
|
||||
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
|
||||
},
|
||||
schema=pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64()),
|
||||
pa.field(
|
||||
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
table = await tmp_db_async.create_table(
|
||||
"test_async_to_pandas_blob_bytes", data=data
|
||||
)
|
||||
|
||||
df = await table.to_pandas(blob_mode="bytes")
|
||||
|
||||
assert df["blob"].tolist() == [b"hello", b"world"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_table_to_pandas_kwargs(tmp_db_async: AsyncConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": pa.array([1, 2], pa.int64())})
|
||||
table = await tmp_db_async.create_table("test_async_to_pandas_kwargs", data=data)
|
||||
|
||||
df = await table.to_pandas(types_mapper=pd.ArrowDtype)
|
||||
|
||||
assert str(df["id"].dtype) == "int64[pyarrow]"
|
||||
|
||||
|
||||
def test_create_table_infers_large_int_vectors(mem_db: DBConnection):
|
||||
data = [{"vector": [0, 300]}]
|
||||
|
||||
@@ -1811,6 +1890,59 @@ def test_create_scalar_index(mem_db: DBConnection):
|
||||
assert scalar_index.name == "custom_y_index"
|
||||
|
||||
|
||||
def test_create_index_nested_field_paths(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("metadata", pa.struct([pa.field("user_id", pa.int32())])),
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{
|
||||
"metadata": {"user_id": i},
|
||||
"image": {"embedding": [float(i), float(i + 1)]},
|
||||
}
|
||||
for i in range(256)
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_index_paths", data=data)
|
||||
|
||||
table.create_scalar_index("metadata.user_id", name="metadata_user_id_idx")
|
||||
table.create_index(
|
||||
vector_column_name="image.embedding",
|
||||
num_partitions=1,
|
||||
num_sub_vectors=1,
|
||||
name="image_embedding_idx",
|
||||
)
|
||||
|
||||
indices = sorted(table.list_indices(), key=lambda idx: idx.name)
|
||||
assert [(idx.name, idx.index_type, idx.columns) for idx in indices] == [
|
||||
("image_embedding_idx", "IvfPq", ["image.embedding"]),
|
||||
("metadata_user_id_idx", "BTree", ["metadata.user_id"]),
|
||||
]
|
||||
|
||||
vector_results = (
|
||||
table.search([0.0, 1.0], vector_column_name="image.embedding")
|
||||
.limit(1)
|
||||
.to_list()
|
||||
)
|
||||
assert len(vector_results) == 1
|
||||
assert vector_results[0]["metadata"]["user_id"] == 0
|
||||
|
||||
default_vector_results = table.search([0.0, 1.0]).limit(1).to_list()
|
||||
assert len(default_vector_results) == 1
|
||||
assert default_vector_results[0]["metadata"]["user_id"] == 0
|
||||
|
||||
filtered_results = table.search().where("metadata.user_id = 42").limit(1).to_list()
|
||||
assert len(filtered_results) == 1
|
||||
assert filtered_results[0]["metadata"]["user_id"] == 42
|
||||
|
||||
|
||||
def test_empty_query(mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
"my_table",
|
||||
@@ -1885,6 +2017,74 @@ def test_search_with_schema_inf_multiple_vector(mem_db: DBConnection):
|
||||
table.search(q).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_search_infers_single_nested_vector(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int32()),
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{"id": 0, "image": {"embedding": [0.0, 1.0]}},
|
||||
{"id": 1, "image": {"embedding": [10.0, 11.0]}},
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_default_search", data=data)
|
||||
|
||||
result = table.search([0.0, 1.0]).limit(1).to_list()
|
||||
assert result[0]["id"] == 0
|
||||
|
||||
|
||||
def test_search_nested_vector_multiple_candidates(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
pa.field(
|
||||
"text",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{
|
||||
"image": {"embedding": [0.0, 1.0]},
|
||||
"text": {"embedding": [2.0, 3.0]},
|
||||
}
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_multiple_candidates", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="image.embedding.*text.embedding"):
|
||||
table.search([0.0, 1.0]).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_search_nested_vector_no_candidates(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int32()),
|
||||
pa.field("metadata", pa.struct([pa.field("label", pa.string())])),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[{"id": 0, "metadata": {"label": "cat"}}],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_no_candidates", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="no vector column"):
|
||||
table.search([0.0, 1.0]).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_compact_cleanup(tmp_db: DBConnection):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
|
||||
@@ -15,8 +15,8 @@ use pyo3::{
|
||||
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult,
|
||||
Table, UpdateResult,
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -52,6 +52,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
m.add_class::<DeleteResult>()?;
|
||||
m.add_class::<DropColumnsResult>()?;
|
||||
m.add_class::<UpdateResult>()?;
|
||||
|
||||
@@ -23,7 +23,7 @@ use lancedb::query::QueryBase;
|
||||
use lancedb::query::QueryExecutionOptions;
|
||||
use lancedb::query::QueryFilter;
|
||||
use lancedb::query::{
|
||||
ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
|
||||
ColumnOrdering, ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
|
||||
VectorQuery as LanceDbVectorQuery,
|
||||
};
|
||||
use lancedb::table::AnyQuery;
|
||||
@@ -207,6 +207,48 @@ impl<'py> IntoPyObject<'py> for PyLanceDB<FtsQuery> {
|
||||
#[derive(Clone)]
|
||||
pub struct PyQueryVectors(Vec<Arc<dyn Array>>);
|
||||
|
||||
#[derive(Clone, FromPyObject)]
|
||||
#[pyo3(from_item_all)]
|
||||
pub struct PyColumnOrdering {
|
||||
pub column_name: String,
|
||||
pub ascending: bool,
|
||||
pub nulls_first: bool,
|
||||
}
|
||||
|
||||
impl From<ColumnOrdering> for PyColumnOrdering {
|
||||
fn from(ordering: ColumnOrdering) -> Self {
|
||||
Self {
|
||||
column_name: ordering.column_name,
|
||||
ascending: ordering.ascending,
|
||||
nulls_first: ordering.nulls_first,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PyColumnOrdering> for ColumnOrdering {
|
||||
fn from(ordering: PyColumnOrdering) -> Self {
|
||||
Self {
|
||||
column_name: ordering.column_name,
|
||||
ascending: ordering.ascending,
|
||||
nulls_first: ordering.nulls_first,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> IntoPyObject<'py> for PyColumnOrdering {
|
||||
type Target = PyDict;
|
||||
type Output = Bound<'py, Self::Target>;
|
||||
type Error = PyErr;
|
||||
|
||||
fn into_pyobject(self, py: pyo3::Python<'py>) -> PyResult<Self::Output> {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("column_name", self.column_name)?;
|
||||
dict.set_item("ascending", self.ascending)?;
|
||||
dict.set_item("nulls_first", self.nulls_first)?;
|
||||
Ok(dict)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'py> IntoPyObject<'py> for PyQueryVectors {
|
||||
type Target = PyList;
|
||||
type Output = Bound<'py, Self::Target>;
|
||||
@@ -246,6 +288,7 @@ pub struct PyQueryRequest {
|
||||
pub bypass_vector_index: Option<bool>,
|
||||
pub postfilter: Option<bool>,
|
||||
pub norm: Option<String>,
|
||||
pub order_by: Option<Vec<PyColumnOrdering>>,
|
||||
}
|
||||
|
||||
impl From<AnyQuery> for PyQueryRequest {
|
||||
@@ -273,6 +316,9 @@ impl From<AnyQuery> for PyQueryRequest {
|
||||
bypass_vector_index: None,
|
||||
postfilter: None,
|
||||
norm: None,
|
||||
order_by: query_request
|
||||
.order_by
|
||||
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
|
||||
},
|
||||
AnyQuery::VectorQuery(vector_query) => Self {
|
||||
limit: vector_query.base.limit,
|
||||
@@ -297,6 +343,10 @@ impl From<AnyQuery> for PyQueryRequest {
|
||||
bypass_vector_index: Some(!vector_query.use_index),
|
||||
postfilter: Some(!vector_query.base.prefilter),
|
||||
norm: vector_query.base.norm.map(|n| n.to_string()),
|
||||
order_by: vector_query
|
||||
.base
|
||||
.order_by
|
||||
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -475,6 +525,13 @@ impl Query {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
let ordering =
|
||||
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[pyo3(signature = ())]
|
||||
pub fn output_schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
@@ -647,6 +704,13 @@ impl FTSQuery {
|
||||
self.inner = self.inner.clone().offset(offset as usize);
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
let ordering =
|
||||
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fast_search(&mut self) {
|
||||
self.inner = self.inner.clone().fast_search();
|
||||
}
|
||||
@@ -782,6 +846,13 @@ impl VectorQuery {
|
||||
self.inner = self.inner.clone().offset(offset as usize);
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
let ordering =
|
||||
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
|
||||
self.inner = self.inner.clone().order_by(ordering);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fast_search(&mut self) {
|
||||
self.inner = self.inner.clone().fast_search();
|
||||
}
|
||||
@@ -954,6 +1025,12 @@ impl HybridQuery {
|
||||
self.inner_fts.offset(offset);
|
||||
}
|
||||
|
||||
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
|
||||
self.inner_vec.order_by(ordering.clone())?;
|
||||
self.inner_fts.order_by(ordering)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn fast_search(&mut self) {
|
||||
self.inner_vec.fast_search();
|
||||
self.inner_fts.fast_search();
|
||||
|
||||
@@ -171,6 +171,141 @@ impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
}
|
||||
}
|
||||
|
||||
/// Specification selecting Lance's MemWAL LSM-style write path for
|
||||
/// `merge_insert`.
|
||||
///
|
||||
/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()`
|
||||
/// classmethods, then optionally chain `with_maintained_indexes(...)` and
|
||||
/// `with_writer_config_defaults(...)`.
|
||||
#[pyclass(from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LsmWriteSpec {
|
||||
inner: lancedb::table::LsmWriteSpec,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl LsmWriteSpec {
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
#[staticmethod]
|
||||
pub fn bucket(column: String, num_buckets: u32) -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets),
|
||||
}
|
||||
}
|
||||
|
||||
/// Identity sharding — shard by the raw value of `column`.
|
||||
#[staticmethod]
|
||||
pub fn identity(column: String) -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::identity(column),
|
||||
}
|
||||
}
|
||||
|
||||
/// No sharding — every `merge_insert` call writes to a single
|
||||
/// MemWAL shard.
|
||||
#[staticmethod]
|
||||
pub fn unsharded() -> Self {
|
||||
Self {
|
||||
inner: lancedb::table::LsmWriteSpec::unsharded(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the list of indexes the MemWAL should keep up to date as
|
||||
/// rows are appended. Each name must reference an index that
|
||||
/// already exists on the table at the time `set_lsm_write_spec`
|
||||
/// is called.
|
||||
pub fn with_maintained_indexes(&self, indexes: Vec<String>) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone().with_maintained_indexes(indexes),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the default `ShardWriter` configuration recorded in the
|
||||
/// MemWAL index, so every writer starts from the same defaults.
|
||||
pub fn with_writer_config_defaults(&self, defaults: HashMap<String, String>) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone().with_writer_config_defaults(defaults),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
column, num_buckets, maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
lancedb::table::LsmWriteSpec::Identity {
|
||||
column,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
column, maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
lancedb::table::LsmWriteSpec::Unsharded {
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => format!(
|
||||
"LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})",
|
||||
maintained_indexes, writer_config_defaults,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Discriminator string identifying the variant ("bucket", "identity",
|
||||
/// or "unsharded").
|
||||
#[getter]
|
||||
pub fn spec_type(&self) -> &'static str {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket",
|
||||
lancedb::table::LsmWriteSpec::Identity { .. } => "identity",
|
||||
lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded",
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket and identity variants: the sharding column. `None` for unsharded.
|
||||
#[getter]
|
||||
pub fn column(&self) -> Option<String> {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { column, .. }
|
||||
| lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()),
|
||||
lancedb::table::LsmWriteSpec::Unsharded { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket variant only: the number of buckets.
|
||||
#[getter]
|
||||
pub fn num_buckets(&self) -> Option<u32> {
|
||||
match &self.inner {
|
||||
lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Names of indexes the MemWAL should keep up to date during writes.
|
||||
#[getter]
|
||||
pub fn maintained_indexes(&self) -> Vec<String> {
|
||||
self.inner.maintained_indexes().to_vec()
|
||||
}
|
||||
|
||||
/// Default `ShardWriter` configuration recorded by this spec.
|
||||
#[getter]
|
||||
pub fn writer_config_defaults(&self) -> HashMap<String, String> {
|
||||
self.inner.writer_config_defaults().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
|
||||
fn from(spec: LsmWriteSpec) -> Self {
|
||||
spec.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AddColumnsResult {
|
||||
@@ -805,6 +940,37 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_unenforced_primary_key<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
columns: Vec<String>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner
|
||||
.set_unenforced_primary_key(columns)
|
||||
.await
|
||||
.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_lsm_write_spec<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
spec: LsmWriteSpec,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
let native_spec = lancedb::table::LsmWriteSpec::from(spec);
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.set_lsm_write_spec(native_spec).await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.unset_lsm_write_spec().await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.28.0-beta.11"
|
||||
version = "0.30.0-beta.1"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
@@ -104,6 +104,7 @@ datafusion.workspace = true
|
||||
http-body = "1" # Matching reqwest
|
||||
rstest = "0.23.0"
|
||||
test-log = "0.2"
|
||||
serial_test = "3"
|
||||
|
||||
|
||||
[features]
|
||||
|
||||
@@ -271,15 +271,26 @@ impl Scannable for WithEmbeddingsScannable {
|
||||
.map_err(|e| Error::Runtime {
|
||||
message: format!("Task panicked during embedding computation: {}", e),
|
||||
})??;
|
||||
// Cast columns to match the declared output schema. The data is
|
||||
// identical but field metadata (e.g. nested nullability) may
|
||||
// differ between the embedding function output and the table.
|
||||
let columns: Vec<ArrayRef> = result
|
||||
.columns()
|
||||
// Look up columns by name (not position) so the result matches
|
||||
// the output schema even when columns appear in a different
|
||||
// order — e.g. `add_columns` placed a new column after the
|
||||
// embedding column, but the computed batch appends embeddings
|
||||
// at the end. Cast per-column because field metadata (e.g.
|
||||
// nested nullability) may also differ between the embedding
|
||||
// function output and the table.
|
||||
let columns: Vec<ArrayRef> = output_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, col)| {
|
||||
let target_type = output_schema.field(i).data_type();
|
||||
.map(|field| {
|
||||
let col = result.column_by_name(field.name()).ok_or_else(|| {
|
||||
Error::InvalidInput {
|
||||
message: format!(
|
||||
"Column '{}' required by the table schema was not present in the input batch",
|
||||
field.name()
|
||||
),
|
||||
}
|
||||
})?;
|
||||
let target_type = field.data_type();
|
||||
if col.data_type() == target_type {
|
||||
Ok(col.clone())
|
||||
} else {
|
||||
@@ -964,5 +975,118 @@ mod tests {
|
||||
"Expected EmbeddingFunctionNotFound"
|
||||
);
|
||||
}
|
||||
|
||||
/// Regression test for https://github.com/lancedb/lancedb/issues/3136.
|
||||
///
|
||||
/// When a column is added to the table after the embedding column via
|
||||
/// schema evolution, the table schema becomes
|
||||
/// `[..., embedding, extra]`. The input batch (without the embedding)
|
||||
/// is `[..., extra]`, and `compute_embeddings_for_batch` appends the
|
||||
/// embedding at the end giving `[..., extra, embedding]`. A positional
|
||||
/// cast to the output schema would map `extra` onto `embedding` and
|
||||
/// fail with a CastError. Columns must be matched by name.
|
||||
#[tokio::test]
|
||||
async fn test_with_embeddings_scannable_column_added_after_embedding() {
|
||||
let input_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
input_schema.clone(),
|
||||
vec![
|
||||
Arc::new(StringArray::from(vec!["hello", "world"])) as ArrayRef,
|
||||
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0])) as ArrayRef,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
|
||||
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
|
||||
|
||||
// Table schema: embedding column is BEFORE `score`, as would
|
||||
// happen if `score` was added via `add_columns` after creating
|
||||
// the table with an embedding on `text`.
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new(
|
||||
"text_vec",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
4,
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let mut scannable = WithEmbeddingsScannable::with_schema(
|
||||
Box::new(batch),
|
||||
vec![(embedding_def, mock_embedding)],
|
||||
output_schema.clone(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let stream = scannable.scan_as_stream();
|
||||
let results: Vec<RecordBatch> = stream.try_collect().await.unwrap();
|
||||
assert_eq!(results.len(), 1);
|
||||
|
||||
let result_batch = &results[0];
|
||||
assert_eq!(result_batch.schema(), output_schema);
|
||||
assert_eq!(result_batch.num_rows(), 2);
|
||||
// Position 1 must actually hold the FixedSizeList embedding —
|
||||
// not the score column reinterpreted by a permissive cast.
|
||||
let embedding = result_batch
|
||||
.column(1)
|
||||
.as_any()
|
||||
.downcast_ref::<arrow_array::FixedSizeListArray>()
|
||||
.expect("position 1 should be a FixedSizeList embedding");
|
||||
assert_eq!(embedding.value_length(), 4);
|
||||
assert_eq!(embedding.null_count(), 0);
|
||||
}
|
||||
|
||||
/// If the input batch is missing a non-embedding column required by
|
||||
/// the table schema, we should return a clear error rather than
|
||||
/// silently producing a malformed batch.
|
||||
#[tokio::test]
|
||||
async fn test_with_embeddings_scannable_missing_required_column() {
|
||||
let input_schema =
|
||||
Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
input_schema,
|
||||
vec![Arc::new(StringArray::from(vec!["hello", "world"])) as ArrayRef],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
|
||||
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
|
||||
|
||||
let output_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new(
|
||||
"text_vec",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
4,
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]));
|
||||
|
||||
let mut scannable = WithEmbeddingsScannable::with_schema(
|
||||
Box::new(batch),
|
||||
vec![(embedding_def, mock_embedding)],
|
||||
output_schema,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let stream = scannable.scan_as_stream();
|
||||
let results: Result<Vec<RecordBatch>> = stream.try_collect().await;
|
||||
let err = results.expect_err("expected an error");
|
||||
assert!(
|
||||
matches!(&err, Error::InvalidInput { message } if message.contains("score")),
|
||||
"expected InvalidInput about missing 'score' column, got: {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -450,6 +450,10 @@ impl PermutationReader {
|
||||
}
|
||||
|
||||
pub async fn take_offsets(&self, offsets: &[u64], selection: Select) -> Result<RecordBatch> {
|
||||
if offsets.is_empty() {
|
||||
return Ok(RecordBatch::new_empty(self.output_schema(selection).await?));
|
||||
}
|
||||
|
||||
if let Some(permutation_table) = &self.permutation_table {
|
||||
let offset_map = self.get_offset_map(permutation_table).await?;
|
||||
let row_ids = offsets
|
||||
@@ -955,4 +959,62 @@ mod tests {
|
||||
.to_vec();
|
||||
assert_eq!(idx_values, &all_idx_values[4997..5000]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_take_offsets_empty_identity_reader() {
|
||||
let base_table = lance_datagen::gen_batch()
|
||||
.col("idx", lance_datagen::array::step::<Int32Type>())
|
||||
.into_mem_table("tbl", RowCount::from(10), BatchCount::from(1))
|
||||
.await;
|
||||
|
||||
let reader = PermutationReader::identity(base_table.base_table().clone()).await;
|
||||
|
||||
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
|
||||
|
||||
assert_eq!(batch.num_rows(), 0);
|
||||
assert_eq!(batch.num_columns(), 1);
|
||||
assert_eq!(batch.schema().field(0).name(), "idx");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_take_offsets_empty_with_permutation_table() {
|
||||
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
|
||||
|
||||
let reader = PermutationReader::try_from_tables(
|
||||
base_table.base_table().clone(),
|
||||
row_ids_table.base_table().clone(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = reader.take_offsets(&[], Select::All).await.unwrap();
|
||||
|
||||
assert_eq!(batch.num_rows(), 0);
|
||||
assert_eq!(batch.schema().fields().len(), 2);
|
||||
assert_eq!(batch.schema().field(0).name(), "idx");
|
||||
assert_eq!(batch.schema().field(1).name(), "other_col");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_take_offsets_empty_with_column_selection() {
|
||||
let (base_table, row_ids_table, _) = setup_permutation_tables(5).await;
|
||||
|
||||
let reader = PermutationReader::try_from_tables(
|
||||
base_table.base_table().clone(),
|
||||
row_ids_table.base_table().clone(),
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let batch = reader
|
||||
.take_offsets(&[], Select::Columns(vec!["idx".to_string()]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(batch.num_rows(), 0);
|
||||
assert_eq!(batch.num_columns(), 1);
|
||||
assert_eq!(batch.schema().field(0).name(), "idx");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,17 +23,12 @@ impl VectorIndex {
|
||||
.fields
|
||||
.iter()
|
||||
.map(|field_id| {
|
||||
manifest
|
||||
.schema
|
||||
.field_by_id(*field_id)
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"field {field_id} of index {} must exist in schema",
|
||||
index.name
|
||||
)
|
||||
})
|
||||
.name
|
||||
.clone()
|
||||
manifest.schema.field_path(*field_id).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"field {field_id} of index {} must exist in schema",
|
||||
index.name
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
|
||||
@@ -11,6 +11,8 @@ use datafusion_expr::Expr;
|
||||
use datafusion_physical_plan::ExecutionPlan;
|
||||
use futures::{FutureExt, TryFutureExt, TryStreamExt, stream, try_join};
|
||||
use half::f16;
|
||||
/// Re-export Lance ColumnOrdering type for use in query ordering
|
||||
pub use lance::dataset::scanner::ColumnOrdering;
|
||||
use lance::dataset::{ROW_ID, scanner::DatasetRecordBatchStream};
|
||||
use lance_arrow::RecordBatchExt;
|
||||
use lance_datafusion::exec::execute_plan;
|
||||
@@ -510,6 +512,11 @@ pub trait QueryBase {
|
||||
/// the scores are converted to ranks and then normalized. If "Score", the
|
||||
/// scores are normalized directly.
|
||||
fn norm(self, norm: NormalizeMethod) -> Self;
|
||||
|
||||
/// Sort the results by the specified column(s).
|
||||
///
|
||||
/// This allows ordering query results by one or more columns in either ascending or descending order.
|
||||
fn order_by(self, ordering: Option<Vec<ColumnOrdering>>) -> Self;
|
||||
}
|
||||
|
||||
pub trait HasQuery {
|
||||
@@ -574,6 +581,11 @@ impl<T: HasQuery> QueryBase for T {
|
||||
self.mut_query().norm = Some(norm);
|
||||
self
|
||||
}
|
||||
|
||||
fn order_by(mut self, ordering: Option<Vec<ColumnOrdering>>) -> Self {
|
||||
self.mut_query().order_by = ordering;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Options for controlling the execution of a query
|
||||
@@ -750,6 +762,11 @@ pub struct QueryRequest {
|
||||
///
|
||||
/// By default, this is false (scoring columns are auto-projected for backward compatibility).
|
||||
pub disable_scoring_autoprojection: bool,
|
||||
|
||||
/// Sort the results by the specified column(s).
|
||||
///
|
||||
/// This allows ordering query results by one or more columns in either ascending or descending order.
|
||||
pub order_by: Option<Vec<ColumnOrdering>>,
|
||||
}
|
||||
|
||||
impl Default for QueryRequest {
|
||||
@@ -766,6 +783,7 @@ impl Default for QueryRequest {
|
||||
reranker: None,
|
||||
norm: None,
|
||||
disable_scoring_autoprojection: false,
|
||||
order_by: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -888,6 +888,7 @@ pub mod test_utils {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serial_test::serial;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
@@ -1143,6 +1144,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_none() {
|
||||
let config = ClientConfig::default();
|
||||
// Clear env vars that might be set from other tests
|
||||
@@ -1155,6 +1157,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_from_env() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
@@ -1169,6 +1172,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_from_env_key() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
@@ -1189,6 +1193,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_direct_takes_precedence() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
@@ -1206,6 +1211,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_empty_env_ignored() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
|
||||
@@ -27,7 +27,9 @@ use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AnyQuery, Filter, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
|
||||
use crate::utils::{
|
||||
resolve_arrow_field_path, supported_btree_data_type, supported_vector_data_type,
|
||||
};
|
||||
use crate::{DistanceType, Error};
|
||||
use crate::{
|
||||
error::Result,
|
||||
@@ -518,6 +520,21 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(order_by) = ¶ms.order_by {
|
||||
body["order_by"] = serde_json::Value::Array(
|
||||
order_by
|
||||
.iter()
|
||||
.map(|o| {
|
||||
serde_json::json!({
|
||||
"column_name": o.column_name,
|
||||
"ascending": o.ascending,
|
||||
"nulls_first": o.nulls_first,
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1511,8 +1528,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
});
|
||||
}
|
||||
};
|
||||
let schema = self.schema().await?;
|
||||
let (canonical_column, field) = resolve_arrow_field_path(&schema, &column)?;
|
||||
let mut body = serde_json::json!({
|
||||
"column": column
|
||||
"column": canonical_column
|
||||
});
|
||||
|
||||
// Add name parameter if provided (for backwards compatibility, only include if Some)
|
||||
@@ -1547,12 +1566,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Index::LabelList(p) => ("LABEL_LIST", Some(to_json(p)?)),
|
||||
Index::FTS(p) => ("FTS", Some(to_json(p)?)),
|
||||
Index::Auto => {
|
||||
let schema = self.schema().await?;
|
||||
let field = schema
|
||||
.field_with_name(&column)
|
||||
.map_err(|_| Error::InvalidInput {
|
||||
message: format!("Column {} not found in schema", column),
|
||||
})?;
|
||||
if supported_vector_data_type(field.data_type()) {
|
||||
body[METRIC_TYPE_KEY] =
|
||||
serde_json::Value::String(DistanceType::L2.to_string().to_lowercase());
|
||||
@@ -1652,6 +1665,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(merge_insert_response)
|
||||
}
|
||||
|
||||
async fn set_unenforced_primary_key(&self, _columns: &[&str]) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_unenforced_primary_key is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_lsm_write_spec(&self, _spec: crate::table::LsmWriteSpec) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "set_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn unset_lsm_write_spec(&self) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "unset_lsm_write_spec is not supported on LanceDB cloud.".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
@@ -1831,16 +1862,26 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
let schema = self.schema().await?;
|
||||
|
||||
// Make request to get stats for each index, so we get the index type.
|
||||
// This is a bit inefficient, but it's the only way to get the index type.
|
||||
let mut futures = Vec::with_capacity(body.indexes.len());
|
||||
for index in body.indexes {
|
||||
let columns = index
|
||||
.columns
|
||||
.iter()
|
||||
.map(|column| {
|
||||
resolve_arrow_field_path(&schema, column)
|
||||
.map(|(canonical_column, _)| canonical_column)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let future = async move {
|
||||
match self.index_stats(&index.index_name).await {
|
||||
Ok(Some(stats)) => Ok(Some(IndexConfig {
|
||||
name: index.index_name,
|
||||
index_type: stats.index_type,
|
||||
columns: index.columns,
|
||||
columns,
|
||||
})),
|
||||
Ok(None) => Ok(None), // The index must have been deleted since we listed it.
|
||||
Err(e) => Err(e),
|
||||
@@ -2078,7 +2119,7 @@ mod tests {
|
||||
use crate::{
|
||||
DistanceType, Error, Table,
|
||||
index::{Index, IndexStatistics, IndexType, vector::IvfPqIndexBuilder},
|
||||
query::{ExecutableQuery, QueryBase},
|
||||
query::{ColumnOrdering, ExecutableQuery, QueryBase},
|
||||
remote::ARROW_FILE_CONTENT_TYPE,
|
||||
};
|
||||
|
||||
@@ -2282,6 +2323,38 @@ mod tests {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn nested_index_schema() -> Schema {
|
||||
let vector_type =
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 8);
|
||||
Schema::new(vec![
|
||||
Field::new(
|
||||
"metadata",
|
||||
DataType::Struct(vec![Field::new("user_id", DataType::Int32, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(vec![Field::new("embedding", vector_type, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"payload",
|
||||
DataType::Struct(vec![Field::new("text", DataType::Utf8, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"meta-data",
|
||||
DataType::Struct(vec![Field::new("user-id", DataType::Int32, false)].into()),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"literal",
|
||||
DataType::Struct(vec![Field::new("a.b", DataType::Int32, false)].into()),
|
||||
false,
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case("", 0)]
|
||||
#[case("{}", 0)]
|
||||
@@ -2988,6 +3061,18 @@ mod tests {
|
||||
"distance_type": "cosine",
|
||||
"bypass_vector_index": true,
|
||||
"columns": ["a", "b"],
|
||||
"order_by": [
|
||||
{
|
||||
"column_name": "score",
|
||||
"ascending": false,
|
||||
"nulls_first": true,
|
||||
},
|
||||
{
|
||||
"column_name": "id",
|
||||
"ascending": true,
|
||||
"nulls_first": false,
|
||||
}
|
||||
],
|
||||
"nprobes": 12,
|
||||
"minimum_nprobes": 12,
|
||||
"maximum_nprobes": 12,
|
||||
@@ -3019,6 +3104,10 @@ mod tests {
|
||||
.limit(42)
|
||||
.offset(10)
|
||||
.select(Select::columns(&["a", "b"]))
|
||||
.order_by(Some(vec![
|
||||
ColumnOrdering::desc_nulls_first("score".to_string()),
|
||||
ColumnOrdering::asc_nulls_last("id".to_string()),
|
||||
]))
|
||||
.nearest_to(vec![0.1, 0.2, 0.3])
|
||||
.unwrap()
|
||||
.column("my_vector")
|
||||
@@ -3032,6 +3121,59 @@ mod tests {
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_vector_nested_field_path() {
|
||||
let expected_data = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
|
||||
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
|
||||
)
|
||||
.unwrap();
|
||||
let expected_data_ref = expected_data.clone();
|
||||
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = serde_json::json!({
|
||||
"vector_column": "image.embedding",
|
||||
"prefilter": true,
|
||||
"k": 10,
|
||||
"nprobes": 20,
|
||||
"minimum_nprobes": 20,
|
||||
"maximum_nprobes": 20,
|
||||
"lower_bound": Option::<f32>::None,
|
||||
"upper_bound": Option::<f32>::None,
|
||||
"ef": Option::<usize>::None,
|
||||
"refine_factor": Option::<u32>::None,
|
||||
"version": null,
|
||||
});
|
||||
expected_body["vector"] = vec![0.1f32, 0.2, 0.3].into();
|
||||
assert_eq!(body, expected_body);
|
||||
|
||||
let response_body = write_ipc_file(&expected_data_ref);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, ARROW_FILE_CONTENT_TYPE)
|
||||
.body(response_body)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let _ = table
|
||||
.query()
|
||||
.nearest_to(vec![0.1, 0.2, 0.3])
|
||||
.unwrap()
|
||||
.column("image.embedding")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_query_fts() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
@@ -3113,7 +3255,7 @@ mod tests {
|
||||
"query": {
|
||||
"match": {
|
||||
"terms": "hello world",
|
||||
"column": "a",
|
||||
"column": "payload.text",
|
||||
"boost": 1.0,
|
||||
"fuzziness": 0,
|
||||
"max_expansions": 50,
|
||||
@@ -3147,7 +3289,7 @@ mod tests {
|
||||
.query()
|
||||
.full_text_search(FullTextSearchQuery::new_query(
|
||||
MatchQuery::new("hello world".to_owned())
|
||||
.with_column(Some("a".to_owned()))
|
||||
.with_column(Some("payload.text".to_owned()))
|
||||
.into(),
|
||||
))
|
||||
.with_row_id()
|
||||
@@ -3418,32 +3560,152 @@ mod tests {
|
||||
for (index_type, expected_body, index) in cases {
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(request.url().path(), "/v1/table/my_table/create_index/");
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = expected_body.clone();
|
||||
expected_body["column"] = "a".into();
|
||||
expected_body[INDEX_TYPE_KEY] = index_type.into();
|
||||
match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => {
|
||||
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap()
|
||||
}
|
||||
"/v1/table/my_table/create_index/" => {
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
let mut expected_body = expected_body.clone();
|
||||
expected_body["column"] = "a".into();
|
||||
expected_body[INDEX_TYPE_KEY] = index_type.into();
|
||||
|
||||
assert_eq!(body, expected_body);
|
||||
assert_eq!(body, expected_body);
|
||||
|
||||
http::Response::builder().status(200).body("{}").unwrap()
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body("{}".to_string())
|
||||
.unwrap()
|
||||
}
|
||||
path => panic!("Unexpected path: {}", path),
|
||||
}
|
||||
});
|
||||
|
||||
table.create_index(&["a"], index).execute().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index_nested_field_paths() {
|
||||
let schema = nested_index_schema();
|
||||
let expected_requests = Arc::new(vec![
|
||||
json!({
|
||||
"column": "metadata.user_id",
|
||||
"index_type": "BTREE",
|
||||
}),
|
||||
json!({
|
||||
"column": "image.embedding",
|
||||
"index_type": "IVF_PQ",
|
||||
"metric_type": "l2",
|
||||
}),
|
||||
{
|
||||
let mut body = serde_json::to_value(InvertedIndexParams::default()).unwrap();
|
||||
body["column"] = "payload.text".into();
|
||||
body["index_type"] = "FTS".into();
|
||||
body
|
||||
},
|
||||
json!({
|
||||
"column": "`meta-data`.`user-id`",
|
||||
"index_type": "BTREE",
|
||||
}),
|
||||
json!({
|
||||
"column": "literal.`a.b`",
|
||||
"index_type": "BTREE",
|
||||
}),
|
||||
]);
|
||||
let request_idx = Arc::new(AtomicUsize::new(0));
|
||||
let table = Table::new_with_handler("my_table", {
|
||||
let schema = schema.clone();
|
||||
let expected_requests = expected_requests.clone();
|
||||
let request_idx = request_idx.clone();
|
||||
move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap(),
|
||||
"/v1/table/my_table/create_index/" => {
|
||||
assert_eq!(
|
||||
request.headers().get("Content-Type").unwrap(),
|
||||
JSON_CONTENT_TYPE
|
||||
);
|
||||
let idx = request_idx.fetch_add(1, Ordering::SeqCst);
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert_eq!(body, expected_requests[idx]);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body("{}".to_string())
|
||||
.unwrap()
|
||||
}
|
||||
path => panic!("Unexpected path: {}", path),
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
table
|
||||
.create_index(&["Metadata.USER_ID"], Index::BTree(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["Image.Embedding"], Index::Auto)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["Payload.Text"], Index::FTS(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["`META-DATA`.`USER-ID`"], Index::BTree(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table
|
||||
.create_index(&["literal.`A.B`"], Index::BTree(Default::default()))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(request_idx.load(Ordering::SeqCst), expected_requests.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_indices() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
let schema = Schema::new(vec![
|
||||
Field::new(
|
||||
"vector",
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 8),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"metadata",
|
||||
DataType::Struct(vec![Field::new("my.column", DataType::Utf8, true)].into()),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
|
||||
let response_body = match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => {
|
||||
return http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap();
|
||||
}
|
||||
"/v1/table/my_table/index/list/" => {
|
||||
serde_json::json!({
|
||||
"indexes": [
|
||||
@@ -3456,7 +3718,7 @@ mod tests {
|
||||
{
|
||||
"index_name": "my_idx",
|
||||
"index_uuid": "34255f64-5717-4562-b3fc-2c963f66afa6",
|
||||
"columns": ["my_column"],
|
||||
"columns": ["metadata.`my.column`"],
|
||||
"index_status": "done",
|
||||
},
|
||||
]
|
||||
@@ -3495,7 +3757,7 @@ mod tests {
|
||||
IndexConfig {
|
||||
name: "my_idx".into(),
|
||||
index_type: IndexType::LabelList,
|
||||
columns: vec!["my_column".into()],
|
||||
columns: vec!["metadata.`my.column`".into()],
|
||||
},
|
||||
];
|
||||
assert_eq!(indices, expected);
|
||||
@@ -3963,6 +4225,20 @@ mod tests {
|
||||
assert_eq!(request.method(), "POST");
|
||||
|
||||
let response_body = match request.url().path() {
|
||||
"/v1/table/my_table/describe/" => {
|
||||
let schema = Schema::new(vec![
|
||||
Field::new(
|
||||
"vector",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
8,
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new("my_column", DataType::Utf8, false),
|
||||
]);
|
||||
serde_json::from_str::<serde_json::Value>(&describe_response(&schema)).unwrap()
|
||||
}
|
||||
"/v1/table/my_table/index/list/" => {
|
||||
serde_json::json!({
|
||||
"indexes": [
|
||||
@@ -4124,13 +4400,23 @@ mod tests {
|
||||
assert_eq!(value["index_type"], "IVF_PQ");
|
||||
}
|
||||
|
||||
http::Response::builder().status(200).body("").unwrap()
|
||||
}
|
||||
"/v1/table/dev$users/describe/" => {
|
||||
// Needed for schema check in Auto index type
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 1, "schema": {"fields": [{"name": "embedding", "type": {"type": "list", "item": {"type": "float32"}}, "nullable": false}]}}"#)
|
||||
.body("".to_string())
|
||||
.unwrap()
|
||||
}
|
||||
"/v1/table/dev$users/describe/" => {
|
||||
let schema = Schema::new(vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, true)),
|
||||
8,
|
||||
),
|
||||
false,
|
||||
)]);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(describe_response(&schema))
|
||||
.unwrap()
|
||||
}
|
||||
_ => {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -268,7 +268,9 @@ mod tests {
|
||||
};
|
||||
use crate::query::{ExecutableQuery, QueryBase, Select};
|
||||
use crate::table::add_data::NaNVectorBehavior;
|
||||
use crate::table::{ColumnDefinition, ColumnKind, Table, TableDefinition, WriteOptions};
|
||||
use crate::table::{
|
||||
ColumnDefinition, ColumnKind, NewColumnTransform, Table, TableDefinition, WriteOptions,
|
||||
};
|
||||
use crate::test_utils::TestCustomError;
|
||||
use crate::test_utils::embeddings::MockEmbed;
|
||||
|
||||
@@ -518,6 +520,225 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Regression test for https://github.com/lancedb/lancedb/issues/3136.
|
||||
///
|
||||
/// When a column is added via `add_columns` AFTER an embedding column,
|
||||
/// the table schema becomes `[..., embedding, extra]`. Subsequent
|
||||
/// `table.add()` calls used to fail with a CastError because columns
|
||||
/// were matched positionally rather than by name.
|
||||
#[tokio::test]
|
||||
async fn test_add_with_embeddings_after_add_columns() {
|
||||
let registry = Arc::new(MemoryRegistry::new());
|
||||
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
|
||||
registry.register("mock", mock_embedding).unwrap();
|
||||
|
||||
let conn = connect("memory://")
|
||||
.embedding_registry(registry)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new(
|
||||
"text_vec",
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
|
||||
false,
|
||||
),
|
||||
]));
|
||||
|
||||
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
|
||||
let table_def = TableDefinition::new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
ColumnDefinition {
|
||||
kind: ColumnKind::Physical,
|
||||
},
|
||||
ColumnDefinition {
|
||||
kind: ColumnKind::Embedding(embedding_def),
|
||||
},
|
||||
],
|
||||
);
|
||||
let rich_schema = table_def.into_rich_schema();
|
||||
|
||||
let table = conn
|
||||
.create_empty_table("embed_evol_test", rich_schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Seed a row so add_columns has data to compute against.
|
||||
let seed_batch = record_batch!(("text", Utf8, ["hello"])).unwrap();
|
||||
table.add(seed_batch).execute().await.unwrap();
|
||||
|
||||
// Add a new physical column AFTER the embedding column.
|
||||
table
|
||||
.add_columns(
|
||||
NewColumnTransform::SqlExpressions(vec![("score".into(), "42.0".into())]),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Now add data including the new column but WITHOUT the embedding.
|
||||
// The input batch column order is [text, score]; after computing the
|
||||
// embedding it becomes [text, score, text_vec], but the table schema
|
||||
// is [text, text_vec, score]. Columns must be matched by name.
|
||||
let new_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]));
|
||||
let new_batch = RecordBatch::try_new(
|
||||
new_schema,
|
||||
vec![
|
||||
Arc::new(arrow_array::StringArray::from(vec!["foo", "bar"])),
|
||||
Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0])),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
table.add(new_batch).execute().await.unwrap();
|
||||
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 3);
|
||||
|
||||
let results: Vec<RecordBatch> = table
|
||||
.query()
|
||||
.select(Select::columns(&["text", "text_vec", "score"]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
|
||||
assert_eq!(total_rows, 3);
|
||||
for batch in &results {
|
||||
// text_vec must be populated for the newly added rows too.
|
||||
assert_eq!(batch.column(1).null_count(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
/// Like `test_add_with_embeddings_after_add_columns`, but the column
|
||||
/// added after the embedding is a nested struct rather than a scalar.
|
||||
/// Verifies that name-based column matching also works when the
|
||||
/// post-embedding column has a complex Arrow type.
|
||||
#[tokio::test]
|
||||
async fn test_add_with_embeddings_after_add_nested_columns() {
|
||||
let registry = Arc::new(MemoryRegistry::new());
|
||||
let mock_embedding: Arc<dyn EmbeddingFunction> = Arc::new(MockEmbed::new("mock", 4));
|
||||
registry.register("mock", mock_embedding).unwrap();
|
||||
|
||||
let conn = connect("memory://")
|
||||
.embedding_registry(registry)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new(
|
||||
"text_vec",
|
||||
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4),
|
||||
false,
|
||||
),
|
||||
]));
|
||||
|
||||
let embedding_def = EmbeddingDefinition::new("text", "mock", Some("text_vec"));
|
||||
let table_def = TableDefinition::new(
|
||||
schema,
|
||||
vec![
|
||||
ColumnDefinition {
|
||||
kind: ColumnKind::Physical,
|
||||
},
|
||||
ColumnDefinition {
|
||||
kind: ColumnKind::Embedding(embedding_def),
|
||||
},
|
||||
],
|
||||
);
|
||||
let rich_schema = table_def.into_rich_schema();
|
||||
|
||||
let table = conn
|
||||
.create_empty_table("embed_nested_test", rich_schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let seed_batch = record_batch!(("text", Utf8, ["hello"])).unwrap();
|
||||
table.add(seed_batch).execute().await.unwrap();
|
||||
|
||||
// Add a STRUCT column after the embedding column.
|
||||
let meta_struct = DataType::Struct(
|
||||
vec![
|
||||
Field::new("source", DataType::Utf8, true),
|
||||
Field::new("score", DataType::Float64, true),
|
||||
]
|
||||
.into(),
|
||||
);
|
||||
let nested_schema = Arc::new(Schema::new(vec![Field::new(
|
||||
"meta",
|
||||
meta_struct.clone(),
|
||||
true,
|
||||
)]));
|
||||
table
|
||||
.add_columns(NewColumnTransform::AllNulls(nested_schema), None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Insert with the nested struct present but the embedding column
|
||||
// absent. The computed batch is [text, meta, text_vec], but the
|
||||
// table schema is [text, text_vec, meta] — only name-based matching
|
||||
// can put `meta` (a struct) in the right slot.
|
||||
let source = Arc::new(arrow_array::StringArray::from(vec!["foo", "bar"]));
|
||||
let score = Arc::new(arrow_array::Float64Array::from(vec![1.0, 2.0]));
|
||||
let meta = Arc::new(arrow_array::StructArray::from(vec![
|
||||
(
|
||||
Arc::new(Field::new("source", DataType::Utf8, true)),
|
||||
source as Arc<dyn arrow_array::Array>,
|
||||
),
|
||||
(
|
||||
Arc::new(Field::new("score", DataType::Float64, true)),
|
||||
score as Arc<dyn arrow_array::Array>,
|
||||
),
|
||||
]));
|
||||
let new_schema = Arc::new(Schema::new(vec![
|
||||
Field::new("text", DataType::Utf8, false),
|
||||
Field::new("meta", meta_struct, true),
|
||||
]));
|
||||
let new_batch = RecordBatch::try_new(
|
||||
new_schema,
|
||||
vec![
|
||||
Arc::new(arrow_array::StringArray::from(vec!["foo", "bar"])),
|
||||
meta,
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
table.add(new_batch).execute().await.unwrap();
|
||||
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 3);
|
||||
|
||||
let results: Vec<RecordBatch> = table
|
||||
.query()
|
||||
.select(Select::columns(&["text", "text_vec", "meta"]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
|
||||
assert_eq!(total_rows, 3);
|
||||
for batch in &results {
|
||||
assert_eq!(batch.schema().field(2).name(), "meta");
|
||||
assert!(matches!(
|
||||
batch.schema().field(2).data_type(),
|
||||
DataType::Struct(_)
|
||||
));
|
||||
// text_vec must be populated for the newly added rows too.
|
||||
assert_eq!(batch.column(1).null_count(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_casts_to_table_schema() {
|
||||
let table_schema = Arc::new(Schema::new(vec![
|
||||
|
||||
@@ -16,6 +16,8 @@ use crate::error::{Error, Result};
|
||||
|
||||
use super::{BaseTable, NativeTable};
|
||||
|
||||
pub(crate) mod lsm;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct MergeResult {
|
||||
// The commit version associated with the operation.
|
||||
|
||||
101
rust/lancedb/src/table/merge/lsm.rs
Normal file
101
rust/lancedb/src/table/merge/lsm.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! MemWAL LSM write-path spec management.
|
||||
//!
|
||||
//! [`set_lsm_write_spec`] installs a [`super::super::LsmWriteSpec`] on a
|
||||
//! table, which selects Lance's MemWAL LSM-style write path for future
|
||||
//! `merge_insert` calls. [`unset_lsm_write_spec`] removes it. The actual
|
||||
//! `merge_insert` dispatch and writer are a follow-up.
|
||||
|
||||
use lance::dataset::mem_wal::DatasetMemWalExt;
|
||||
use lance::index::DatasetIndexExt;
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::{LsmWriteSpec, NativeTable};
|
||||
|
||||
// =============================================================================
|
||||
// set_lsm_write_spec
|
||||
// =============================================================================
|
||||
|
||||
/// Install an [`LsmWriteSpec`] on the table.
|
||||
///
|
||||
/// The bucket / unsharded sharding spec is constructed and validated by Lance's
|
||||
/// [`InitializeMemWalBuilder`](lance::dataset::mem_wal::InitializeMemWalBuilder).
|
||||
#[allow(clippy::redundant_pub_crate)]
|
||||
pub(crate) async fn set_lsm_write_spec(table: &NativeTable, spec: LsmWriteSpec) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
{
|
||||
let dataset = table.dataset.get().await?;
|
||||
if dataset.mem_wal_index_details().await?.is_some() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_lsm_write_spec: an LSM write spec is already set on this table; mutation is not supported".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let mut builder = dataset.initialize_mem_wal();
|
||||
let (maintained_indexes, writer_config_defaults) = match spec {
|
||||
LsmWriteSpec::Bucket {
|
||||
column,
|
||||
num_buckets,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.bucket_sharding(column, num_buckets);
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
LsmWriteSpec::Identity {
|
||||
column,
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.identity_sharding(column);
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
LsmWriteSpec::Unsharded {
|
||||
maintained_indexes,
|
||||
writer_config_defaults,
|
||||
} => {
|
||||
builder = builder.unsharded();
|
||||
(maintained_indexes, writer_config_defaults)
|
||||
}
|
||||
};
|
||||
builder = builder.maintained_indexes(maintained_indexes);
|
||||
for (key, value) in writer_config_defaults {
|
||||
builder = builder.add_writer_config_default(key, value);
|
||||
}
|
||||
builder.execute().await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// unset_lsm_write_spec
|
||||
// =============================================================================
|
||||
|
||||
/// Remove the [`LsmWriteSpec`] from the table by dropping the MemWAL index.
|
||||
///
|
||||
/// Errors if no spec is currently set.
|
||||
#[allow(clippy::redundant_pub_crate)]
|
||||
pub(crate) async fn unset_lsm_write_spec(table: &NativeTable) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
{
|
||||
let dataset = table.dataset.get().await?;
|
||||
if dataset.mem_wal_index_details().await?.is_none() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "unset_lsm_write_spec: no LSM write spec is set on this table".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
dataset
|
||||
.drop_index(lance_index::mem_wal::MEM_WAL_INDEX_NAME)
|
||||
.await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
113
rust/lancedb/src/table/primary_key.rs
Normal file
113
rust/lancedb/src/table/primary_key.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! Table-level unenforced primary key support.
|
||||
//!
|
||||
//! [`set_unenforced_primary_key`] records a column as the table's primary key
|
||||
//! by writing Lance schema field metadata. "Unenforced" means LanceDB does not
|
||||
//! check uniqueness on write; the key is metadata for features such as
|
||||
//! `merge_insert` to consume.
|
||||
//!
|
||||
//! Only a single-column primary key is supported, and the key cannot be
|
||||
//! changed once set.
|
||||
|
||||
use arrow_schema::DataType;
|
||||
use lance_core::datatypes::{LANCE_UNENFORCED_PRIMARY_KEY, LANCE_UNENFORCED_PRIMARY_KEY_POSITION};
|
||||
|
||||
use crate::error::{Error, Result};
|
||||
use crate::table::NativeTable;
|
||||
|
||||
/// Set the unenforced primary key on `table` to the single column in `columns`.
|
||||
///
|
||||
/// Fails if `columns` is not exactly one column (compound primary keys are not
|
||||
/// supported), if the column does not exist or has an unsupported dtype, or if
|
||||
/// the table already has an unenforced primary key (changing the primary key
|
||||
/// is not supported).
|
||||
pub(super) async fn set_unenforced_primary_key(
|
||||
table: &NativeTable,
|
||||
columns: &[&str],
|
||||
) -> Result<()> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
|
||||
if columns.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_unenforced_primary_key: a column is required".into(),
|
||||
});
|
||||
}
|
||||
if columns.len() > 1 {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: compound primary keys are not supported, got {} columns",
|
||||
columns.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
let column = columns[0];
|
||||
|
||||
let updates = {
|
||||
let dataset = table.dataset.get().await?;
|
||||
let schema = dataset.schema();
|
||||
|
||||
// The primary key is immutable once set. The Lance commit layer is the
|
||||
// source of truth for this (it also covers the concurrent-writer race);
|
||||
// this check just fails fast with a clear message.
|
||||
if !schema.unenforced_primary_key().is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: "set_unenforced_primary_key: an unenforced primary key is already set on this table; changing it is not supported".into(),
|
||||
});
|
||||
}
|
||||
|
||||
let field = schema.field(column).ok_or_else(|| Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: column '{}' not found on table",
|
||||
column
|
||||
),
|
||||
})?;
|
||||
if !is_supported_pk_dtype(&field.data_type()) {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
"set_unenforced_primary_key: column '{}' has dtype {:?} which is not supported as a primary key. Supported: Int32, Int64, Utf8, LargeUtf8, Binary, LargeBinary, FixedSizeBinary",
|
||||
column,
|
||||
field.data_type()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Position metadata is 1-indexed; `Schema::unenforced_primary_key`
|
||||
// treats position 0 as a legacy "no specific position" fallback.
|
||||
let mut metadata = field.metadata.clone();
|
||||
metadata.remove(LANCE_UNENFORCED_PRIMARY_KEY);
|
||||
metadata.insert(
|
||||
LANCE_UNENFORCED_PRIMARY_KEY_POSITION.to_string(),
|
||||
"1".to_string(),
|
||||
);
|
||||
vec![(field_id_to_u32(field.id, &field.name)?, metadata)]
|
||||
};
|
||||
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
dataset.replace_field_metadata(updates).await?;
|
||||
table.dataset.update(dataset);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn field_id_to_u32(id: i32, name: &str) -> Result<u32> {
|
||||
u32::try_from(id).map_err(|_| Error::Runtime {
|
||||
message: format!(
|
||||
"internal: field '{}' has unexpected negative field id {}",
|
||||
name, id
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
fn is_supported_pk_dtype(dtype: &DataType) -> bool {
|
||||
matches!(
|
||||
dtype,
|
||||
DataType::Int32
|
||||
| DataType::Int64
|
||||
| DataType::Utf8
|
||||
| DataType::LargeUtf8
|
||||
| DataType::Binary
|
||||
| DataType::LargeBinary
|
||||
| DataType::FixedSizeBinary(_)
|
||||
)
|
||||
}
|
||||
@@ -242,6 +242,10 @@ pub async fn create_plan(
|
||||
scanner.disable_scoring_autoprojection();
|
||||
}
|
||||
|
||||
if let Some(order_by) = &query.base.order_by {
|
||||
scanner.order_by(Some(order_by.clone()))?;
|
||||
}
|
||||
|
||||
Ok(scanner.create_plan().await?)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ pub(crate) mod background_cache;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::{DataType, Schema, SchemaRef};
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef};
|
||||
use datafusion_common::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion_execution::RecordBatchStream;
|
||||
use futures::{FutureExt, Stream};
|
||||
@@ -152,14 +152,10 @@ pub fn validate_namespace(namespace: &[String]) -> Result<()> {
|
||||
/// Find one default column to create index or perform vector query.
|
||||
pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result<String> {
|
||||
// Try to find a vector column.
|
||||
let candidates = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter_map(|field| match infer_vector_dim(field.data_type()) {
|
||||
Ok(d) if dim.is_none() || dim == Some(d as i32) => Some(field.name()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut candidates = Vec::new();
|
||||
for field in schema.fields() {
|
||||
collect_vector_columns(field, &mut Vec::new(), dim, &mut candidates);
|
||||
}
|
||||
if candidates.is_empty() {
|
||||
Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
@@ -180,6 +176,57 @@ pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_vector_columns(
|
||||
field: &Field,
|
||||
path: &mut Vec<String>,
|
||||
dim: Option<i32>,
|
||||
candidates: &mut Vec<String>,
|
||||
) {
|
||||
path.push(field.name().clone());
|
||||
match infer_vector_dim(field.data_type()) {
|
||||
Ok(d) if dim.is_none() || dim == Some(d as i32) => {
|
||||
let path_segments = path.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
candidates.push(lance_core::datatypes::format_field_path(&path_segments));
|
||||
}
|
||||
_ => {
|
||||
if let DataType::Struct(fields) = field.data_type() {
|
||||
for child in fields {
|
||||
collect_vector_columns(child, path, dim, candidates);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
path.pop();
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_arrow_field_path(schema: &Schema, column: &str) -> Result<(String, Field)> {
|
||||
lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
|
||||
message: format!("Invalid field path `{}`: {}", column, e),
|
||||
})?;
|
||||
|
||||
let lance_schema =
|
||||
lance_core::datatypes::Schema::try_from(schema).map_err(|e| Error::Schema {
|
||||
message: format!("Invalid schema: {}", e),
|
||||
})?;
|
||||
let field_path = lance_schema
|
||||
.resolve_case_insensitive(column)
|
||||
.ok_or_else(|| Error::Schema {
|
||||
message: format!(
|
||||
"Field path `{}` not found in schema. Available field paths: {}",
|
||||
column,
|
||||
lance_schema.field_paths().join(", ")
|
||||
),
|
||||
})?;
|
||||
let field = field_path.last().expect("field path should be non-empty");
|
||||
let path_segments = field_path
|
||||
.iter()
|
||||
.map(|field| field.name.as_str())
|
||||
.collect::<Vec<_>>();
|
||||
let canonical_path = lance_core::datatypes::format_field_path(&path_segments);
|
||||
|
||||
Ok((canonical_path, Field::from(*field)))
|
||||
}
|
||||
|
||||
pub fn supported_btree_data_type(dtype: &DataType) -> bool {
|
||||
dtype.is_integer()
|
||||
|| dtype.is_floating()
|
||||
@@ -450,6 +497,49 @@ mod tests {
|
||||
"vec"
|
||||
);
|
||||
|
||||
let schema_with_nested_vec_col = Schema::new(vec![
|
||||
Field::new("id", DataType::Int16, true),
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
default_vector_column(&schema_with_nested_vec_col, None).unwrap(),
|
||||
"image.embedding"
|
||||
);
|
||||
|
||||
let schema_with_escaped_nested_vec_col = Schema::new(vec![Field::new(
|
||||
"image-meta",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding.v1",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
)]);
|
||||
assert_eq!(
|
||||
default_vector_column(&schema_with_escaped_nested_vec_col, None).unwrap(),
|
||||
"`image-meta`.`embedding.v1`"
|
||||
);
|
||||
|
||||
let multi_vec_col = Schema::new(vec![
|
||||
Field::new("id", DataType::Int16, true),
|
||||
Field::new(
|
||||
@@ -469,6 +559,48 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("More than one")
|
||||
);
|
||||
|
||||
let multi_nested_vec_col = Schema::new(vec![
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"text",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
50,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
default_vector_column(&multi_nested_vec_col, Some(50)).unwrap(),
|
||||
"text.embedding"
|
||||
);
|
||||
let err = default_vector_column(&multi_nested_vec_col, None)
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
assert!(err.contains("image.embedding"));
|
||||
assert!(err.contains("text.embedding"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user