Compare commits

..

1 Commits

Author SHA1 Message Date
Brendan Clement
32c77879c9 feat(nodejs): surface skip_auto_cleanup on add and merge insert 2026-05-14 12:59:21 -07:00
125 changed files with 1748 additions and 23751 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.1-beta.0"
current_version = "0.28.0-beta.11"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -11,11 +11,6 @@ updates:
schedule:
interval: weekly
open-pull-requests-limit: 10
# Only update Cargo.lock, never widen/raise the version requirements in
# Cargo.toml. The goal is keeping the lockfile (and the binaries we ship)
# current on security fixes, not forcing our library's consumers onto
# newer minimum versions.
versioning-strategy: lockfile-only
groups:
rust-minor-patch:
update-types:

View File

@@ -29,3 +29,7 @@ runs:
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
working-directory: python
- uses: actions/upload-artifact@v4
with:
name: windows-wheels
path: python\target\wheels

View File

@@ -157,10 +157,7 @@ jobs:
npx jest --testEnvironment jest-environment-node-single-context --verbose
macos:
timeout-minutes: 30
# macos-15 ships a newer linker; the older macos-14 linker fails to insert
# branch islands when the debug cdylib's __text section exceeds the 128 MB
# AArch64 B/BL branch range.
runs-on: "macos-15"
runs-on: "macos-14"
defaults:
run:
shell: bash

View File

@@ -8,9 +8,6 @@ on:
# This should trigger a dry run (we skip the final publish step)
paths:
- .github/workflows/pypi-publish.yml
- .github/workflows/build_linux_wheel/action.yml
- .github/workflows/build_mac_wheel/action.yml
- .github/workflows/build_windows_wheel/action.yml
- Cargo.toml # Change in dependency frequently breaks builds
- Cargo.lock
@@ -24,21 +21,32 @@ jobs:
linux:
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
timeout-minutes: 60
permissions:
id-token: write
contents: read
strategy:
matrix:
config:
- platform: x86_64
manylinux: "2_17"
extra_args: ""
runner: ubuntu-22.04
- platform: x86_64
manylinux: "2_28"
extra_args: "--features fp16kernels"
runner: ubuntu-22.04
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
- platform: aarch64
manylinux: "2_17"
extra_args: ""
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
runner: ubuntu-2404-8x-arm64
- platform: aarch64
manylinux: "2_28"
extra_args: "--features fp16kernels"
runner: ubuntu-2404-8x-arm64
runs-on: ${{ matrix.config.runner }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -52,14 +60,15 @@ jobs:
args: "--release --strip ${{ matrix.config.extra_args }}"
arm-build: ${{ matrix.config.platform == 'aarch64' }}
manylinux: ${{ matrix.config.manylinux }}
- uses: actions/upload-artifact@v7
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-linux-${{ matrix.config.platform }}-${{ matrix.config.manylinux }}
path: target/wheels/lancedb-*.whl
if-no-files-found: error
fury_token: ${{ secrets.FURY_TOKEN }}
mac:
timeout-minutes: 90
permissions:
id-token: write
contents: read
runs-on: ${{ matrix.config.runner }}
strategy:
matrix:
@@ -69,7 +78,7 @@ jobs:
env:
MACOSX_DEPLOYMENT_TARGET: 10.15
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -81,21 +90,18 @@ jobs:
with:
python-minor-version: 10
args: "--release --strip --target ${{ matrix.config.target }} --features fp16kernels"
- uses: actions/upload-artifact@v7
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-mac-${{ matrix.config.target }}
path: target/wheels/lancedb-*.whl
if-no-files-found: error
fury_token: ${{ secrets.FURY_TOKEN }}
windows:
timeout-minutes: 90
timeout-minutes: 60
permissions:
id-token: write
contents: read
runs-on: windows-latest
env:
# link.exe is single-threaded and the long pole on Windows builds. Use
# rustc's bundled lld-link instead.
CARGO_TARGET_X86_64_PC_WINDOWS_MSVC_LINKER: rust-lld
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -107,70 +113,18 @@ jobs:
with:
python-minor-version: 10
args: "--release --strip"
- uses: actions/upload-artifact@v7
vcpkg_token: ${{ secrets.VCPKG_GITHUB_PACKAGES }}
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-windows
path: target/wheels/lancedb-*.whl
if-no-files-found: error
publish:
name: Publish wheels
if: startsWith(github.ref, 'refs/tags/python-v')
needs: [linux, mac, windows]
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v6
- name: Download wheel artifacts
uses: actions/download-artifact@v8
with:
pattern: wheels-*
path: target/wheels
merge-multiple: true
- name: List wheels
run: ls -la target/wheels
- name: Choose repo
id: choose_repo
run: |
if [[ ${{ github.ref }} == *beta* ]]; then
echo "repo=fury" >> $GITHUB_OUTPUT
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
env:
FURY_TOKEN: ${{ secrets.FURY_TOKEN }}
run: |
shopt -s nullglob
WHEELS=(target/wheels/lancedb-*.whl)
if [[ ${#WHEELS[@]} -eq 0 ]]; then
echo "No wheels found in target/wheels/" >&2
exit 1
fi
for WHEEL in "${WHEELS[@]}"; do
echo "Uploading $WHEEL to Fury"
curl -f -F package=@"$WHEEL" "https://$FURY_TOKEN@push.fury.io/lancedb/"
done
# NOTE: pypa/gh-action-pypi-publish must be invoked directly from a
# workflow file, not from inside a composite action. When called from a
# composite, `github.action_repository` is empty (actions/runner#2473)
# and the action falls back to `github.repository`, producing a bogus
# `docker://ghcr.io/<repo>:<ref>` image reference that GHA tries to pull.
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/
fury_token: ${{ secrets.FURY_TOKEN }}
gh-release:
if: startsWith(github.ref, 'refs/tags/python-v')
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -233,13 +187,13 @@ jobs:
report-failure:
name: Report Workflow Failure
runs-on: ubuntu-latest
needs: [linux, mac, windows, publish]
needs: [linux, mac, windows]
permissions:
contents: read
issues: write
if: always() && failure() && startsWith(github.ref, 'refs/tags/python-v')
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- uses: ./.github/actions/create-failure-issue
with:
job-results: ${{ toJSON(needs) }}

View File

@@ -205,7 +205,7 @@ jobs:
- name: Delete wheels
run: rm -rf target/wheels
pydantic1x:
timeout-minutes: 60
timeout-minutes: 30
runs-on: "ubuntu-24.04"
defaults:
run:

View File

@@ -233,26 +233,6 @@ jobs:
cargo update -p aws-sdk-sso --precise 1.62.0
cargo update -p aws-sdk-ssooidc --precise 1.63.0
cargo update -p aws-sdk-sts --precise 1.63.0
# aws-runtime/sigv4/credential-types/types and the aws-smithy-*
# crates bumped their MSRV to 1.91.1 in late 2026; pin to the last
# 1.91.0-compatible versions. The order matters — each downgrade
# only succeeds once everything that still pins it at a higher
# version has itself been downgraded.
cargo update -p aws-runtime --precise 1.5.12
cargo update -p aws-types --precise 1.3.9
cargo update -p aws-sigv4 --precise 1.3.5
cargo update -p aws-credential-types --precise 1.2.8
cargo update -p aws-smithy-checksums --precise 0.63.9
cargo update -p aws-smithy-runtime --precise 1.9.3
cargo update -p aws-smithy-http --precise 0.62.4
cargo update -p aws-smithy-eventstream --precise 0.60.12
cargo update -p aws-smithy-http-client --precise 1.1.3
cargo update -p aws-smithy-observability --precise 0.1.4
cargo update -p aws-smithy-query --precise 0.60.8
cargo update -p aws-smithy-runtime-api --precise 1.9.1
cargo update -p aws-smithy-async --precise 1.2.6
cargo update -p aws-smithy-types --precise 1.3.5
cargo update -p aws-smithy-xml --precise 0.60.11
cargo update -p home --precise 0.5.9
- name: cargo +${{ matrix.msrv }} check
env:

View File

@@ -0,0 +1,34 @@
name: upload-wheel
description: "Upload wheels to Pypi"
inputs:
fury_token:
required: true
description: "release token for the fury repo"
runs:
using: "composite"
steps:
- name: Choose repo
shell: bash
id: choose_repo
run: |
if [[ ${{ github.ref }} == *beta* ]]; then
echo "repo=fury" >> $GITHUB_OUTPUT
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
shell: bash
env:
FURY_TOKEN: ${{ inputs.fury_token }}
run: |
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
echo "Uploading $WHEEL to Fury"
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/

View File

@@ -17,33 +17,9 @@ Common commands:
* Run tests: `cargo test --quiet --features remote --tests`
* Run specific test: `cargo test --quiet --features remote -p <package_name> --test <test_name>`
* Lint: `cargo clippy --quiet --features remote --tests --examples`
* Format Rust: `cargo fmt --all`
* Format Python: `ruff format .`
* Lint Python: `ruff check .`
* Bootstrap Python dev env: `cd python && uv run --extra tests --extra dev maturin develop --extras tests,dev`
* Run Python tests: `cd python && uv run --extra tests pytest python/tests -vv --durations=10 -m "not slow and not s3_test"`
* Run specific Python test: `cd python && uv run --extra tests pytest python/tests/<test_file>.py::<test_name> -q`
* Format: `cargo fmt --all`
For Python validation, prefer the uv-managed environment declared by `python/uv.lock`.
Do not treat system `python`, global `pytest`, or missing editable-install errors as
final blockers; bootstrap or enter the uv environment instead. If `lancedb._lancedb`
is missing or stale, or if Rust/PyO3 binding code changed, rebuild the Python
extension with the bootstrap command above before running tests.
Before committing changes, run formatting for every language you touched. At minimum:
* Rust changes: run `cargo fmt --all`.
* Python changes: run `ruff format .` and `ruff check .` from the repository root,
and run targeted tests through `cd python && uv run ...`.
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
Before creating a PR, the exact value passed to `gh pr create --title` must follow
Conventional Commits, such as `fix: support nested field paths in native index creation`
or `feat(python): add dataset multiprocessing support`. Do not use a plain natural
language summary like `Support nested field paths in native index creation` as the PR
title. The semantic-release check uses the PR title and body as the merge commit message,
so a non-conventional PR title will fail CI. After creating a PR, read the remote PR title
back and fix it immediately if it is not conventional.
Before committing changes, run formatting.
## Coding tips

1930
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.7", default-features = false, "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.7", "tag" = "v7.0.0-beta.7", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -112,25 +112,25 @@ def fetch_remote_tags() -> List[TagInfo]:
"api",
"-X",
"GET",
f"repos/{LANCE_REPO}/releases",
f"repos/{LANCE_REPO}/git/refs/tags",
"--paginate",
"--jq",
".[].tag_name",
"-F",
"per_page=20",
".[].ref",
]
)
tags: List[TagInfo] = []
for line in output.splitlines():
tag = line.strip()
if not tag.startswith("v"):
ref = line.strip()
if not ref.startswith("refs/tags/v"):
continue
tag = ref.split("refs/tags/")[-1]
version = tag.lstrip("v")
try:
tags.append(TagInfo(tag=tag, version=version, semver=parse_semver(version)))
except ValueError:
continue
if not tags:
raise RuntimeError("No Lance releases could be parsed from GitHub API output")
raise RuntimeError("No Lance tags could be parsed from GitHub API output")
return tags

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.30.1-beta.0</version>
<version>0.28.0-beta.11</version>
</dependency>
```

View File

@@ -437,39 +437,6 @@ Open a table in the database.
***
### renameTable()
```ts
abstract renameTable(
currentName,
newName,
options?): Promise<void>
```
Rename a table.
Currently only supported by LanceDB Cloud. Local OSS connections and
namespace-backed connections (via [connectNamespace](../functions/connectNamespace.md)) reject with
a "not supported" error.
#### Parameters
* **currentName**: `string`
The current name of the table.
* **newName**: `string`
The new name for the table.
* **options?**: [`RenameTableOptions`](../interfaces/RenameTableOptions.md)
Optional namespace paths. When
`newNamespacePath` is omitted the table stays in `namespacePath`.
#### Returns
`Promise`&lt;`void`&gt;
***
### tableNames()
#### tableNames(options)

View File

@@ -76,57 +76,6 @@ the query optimizer chooses a suboptimal path.
***
### useLsmWrite()
```ts
useLsmWrite(useLsmWrite): MergeInsertBuilder
```
Controls whether the merge uses the MemWAL LSM write path.
By default (unset), a `mergeInsert` on a table with an LSM write spec is
routed through Lance's MemWAL shard writer, and a table without one uses
the standard path. Pass `false` to force the standard path even when a
spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
is installed.
#### Parameters
* **useLsmWrite**: `boolean`
Whether to use the LSM write path.
#### Returns
[`MergeInsertBuilder`](MergeInsertBuilder.md)
***
### validateSingleShard()
```ts
validateSingleShard(validateSingleShard): MergeInsertBuilder
```
Controls how an LSM merge checks that its input targets a single shard.
When a table has an LSM write spec, every row in a `mergeInsert` call must
route to the same shard. When `true` (the default), every row is inspected
to verify this. When `false`, only the first row is inspected and the
shard it routes to is used for the whole input — a faster path for callers
that have already pre-sharded their input. Has no effect on tables without
an LSM write spec.
#### Parameters
* **validateSingleShard**: `boolean`
Whether to check every row routes to one shard. Defaults to `true`.
#### Returns
[`MergeInsertBuilder`](MergeInsertBuilder.md)
***
### whenMatchedUpdateAll()
```ts

View File

@@ -343,30 +343,6 @@ This is useful for pagination.
***
### orderBy()
```ts
orderBy(ordering): this
```
Sort the results by the specified column(s).
#### Parameters
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
#### Returns
`this`
This query builder.
#### Inherited from
`StandardQueryBase.orderBy`
***
### outputSchema()
```ts

View File

@@ -1,173 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / Scannable
# Class: Scannable
A data source that can be scanned as a stream of Arrow `RecordBatch`es.
`Scannable` wraps the schema + optional row count + rescannable flag and
a callback that yields batches one at a time. It is passed to consumers
(e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that
need to pull data without materializing the full dataset in JS memory.
Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh
writer serializes each batch, and the Rust side decodes it with
`arrow_ipc::reader::StreamReader`. One batch is in flight at a time.
## Properties
### numRows
```ts
readonly numRows: null | number;
```
***
### rescannable
```ts
readonly rescannable: boolean;
```
***
### schema
```ts
readonly schema: Schema<any>;
```
## Methods
### fromFactory()
```ts
static fromFactory(
schema,
factory,
opts): Promise<Scannable>
```
Build a Scannable from an explicit schema and a factory that returns a
fresh batch iterator on each call.
The factory is invoked once per scan. Each iterator yields
`RecordBatch`es matching the declared schema. Use this when you need
direct control over the pull loop — for example, to wrap a streaming
source whose batches are produced lazily.
#### Parameters
* **schema**: `Schema`&lt;`any`&gt;
The Arrow schema of the produced batches.
* **factory**
Called at the start of each scan to produce a batch
iterator. Must be idempotent when `rescannable` is true.
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
Optional hints. `rescannable` defaults to `true`; set to
`false` if calling `factory()` twice would not reproduce the same data.
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromIterable()
```ts
static fromIterable(
schema,
iter,
opts): Promise<Scannable>
```
Build a Scannable from an iterable of `RecordBatch`es. `rescannable`
defaults to `false`. Pass an explicit schema so the consumer can
validate before any batch is pulled.
`opts.rescannable: true` is honest for replayable iterables (Arrays,
Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh
iterator each call). It is rejected for one-shot iterables (generators,
async generators, or already-an-iterator inputs) because their
`[Symbol.iterator]()` returns the same exhausted object on the second
scan. For replayable sources outside this shape, use
`fromFactory(schema, () => createIter(), { rescannable: true })`.
Note: when `opts.rescannable` is `true`, the constructor calls
`[Symbol.iterator]()` once on the input to perform the structural check.
#### Parameters
* **schema**: `Schema`&lt;`any`&gt;
* **iter**: `Iterable`&lt;`RecordBatch`&lt;`any`&gt;&gt; \| `AsyncIterable`&lt;`RecordBatch`&lt;`any`&gt;&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromRecordBatchReader()
```ts
static fromRecordBatchReader(reader, opts): Promise<Scannable>
```
Build a Scannable from an Arrow `RecordBatchReader`. A reader can only
be consumed once; `rescannable` defaults to `false`.
The reader must already be opened (via `.open()`) so its `.schema` is
populated. `RecordBatchReader.from(...)` returns an unopened reader.
`opts.rescannable: true` is rejected because `RecordBatchReader` is a
self-iterator (its `[Symbol.iterator]()` returns itself), and this
constructor does not call `reader.reset()` between scans, so a second
scan would always see an exhausted reader. For genuinely replayable
sources, use
`fromFactory(schema, () => openReader(), { rescannable: true })`,
which mints a fresh reader on each scan.
#### Parameters
* **reader**: `RecordBatchReader`&lt;`any`&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;
***
### fromTable()
```ts
static fromTable(table, opts): Promise<Scannable>
```
Build a Scannable from an in-memory Arrow `Table`. Always rescannable;
the table's batches are replayed on each scan.
The table's row count is authoritative: `opts.numRows` must either be
omitted or equal to `table.numRows`. `opts.rescannable` of `false` is
rejected because in-memory Tables are always rescannable.
#### Parameters
* **table**: `Table`&lt;`any`&gt;
* **opts**: [`ScannableOptions`](../interfaces/ScannableOptions.md) = `{}`
#### Returns
`Promise`&lt;[`Scannable`](Scannable.md)&gt;

View File

@@ -187,25 +187,6 @@ Any attempt to use the table after it is closed will result in an error.
***
### closeLsmWriters()
```ts
abstract closeLsmWriters(): Promise<void>
```
Drain and close any cached MemWAL shard writers held for this table.
When an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) is installed, `mergeInsert` opens MemWAL
shard writers and caches them for reuse across calls. This closes them,
flushing pending data; writers reopen lazily on the next `mergeInsert`.
It is a no-op when no writers are cached.
#### Returns
`Promise`&lt;`void`&gt;
***
### countRows()
```ts
@@ -709,74 +690,6 @@ of the given query
***
### setLsmWriteSpec()
```ts
abstract setLsmWriteSpec(spec): Promise<void>
```
Install an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) on this table, selecting Lance's MemWAL
LSM-style write path for future `mergeInsert` calls.
`LsmWriteSpec` chooses one of three sharding strategies via `specType`:
- `"bucket"` — hash-bucket writes by the single-column unenforced primary
key (`column` and `numBuckets` required).
- `"identity"` — shard by the raw value of a scalar `column`.
- `"unsharded"` — route every write to a single shard.
All variants require the table to have an unenforced primary key
([Table#setUnenforcedPrimaryKey](Table.md#setunenforcedprimarykey)); bucket sharding additionally
requires it to be the single column being bucketed.
#### Parameters
* **spec**: [`LsmWriteSpec`](../interfaces/LsmWriteSpec.md)
The sharding spec to install.
#### Returns
`Promise`&lt;`void`&gt;
#### Example
```ts
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 16,
maintainedIndexes: ["id_idx"],
});
```
***
### setUnenforcedPrimaryKey()
```ts
abstract setUnenforcedPrimaryKey(columns): Promise<void>
```
Set the unenforced primary key for this table to a single column.
"Unenforced" means LanceDB does not check uniqueness on writes; the
column is recorded in the schema as the primary key for use by features
such as `merge_insert`. Only single-column primary keys are supported,
and the key cannot be changed once set.
#### Parameters
* **columns**: `string` \| `string`[]
The primary key column. A one-element
array is also accepted; passing more than one column is rejected.
#### Returns
`Promise`&lt;`void`&gt;
***
### stats()
```ts
@@ -880,23 +793,6 @@ Return the table as an arrow table
***
### unsetLsmWriteSpec()
```ts
abstract unsetLsmWriteSpec(): Promise<void>
```
Remove the [LsmWriteSpec](../interfaces/LsmWriteSpec.md) from this table, reverting to the standard
`mergeInsert` write path.
Errors if no spec is currently set.
#### Returns
`Promise`&lt;`void`&gt;
***
### update()
#### update(opts)

View File

@@ -498,30 +498,6 @@ This is useful for pagination.
***
### orderBy()
```ts
orderBy(ordering): this
```
Sort the results by the specified column(s).
#### Parameters
* **ordering**: [`ColumnOrdering`](../interfaces/ColumnOrdering.md) \| [`ColumnOrdering`](../interfaces/ColumnOrdering.md)[]
#### Returns
`this`
This query builder.
#### Inherited from
`StandardQueryBase.orderBy`
***
### outputSchema()
```ts

View File

@@ -32,7 +32,6 @@
- [PhraseQuery](classes/PhraseQuery.md)
- [Query](classes/Query.md)
- [QueryBase](classes/QueryBase.md)
- [Scannable](classes/Scannable.md)
- [Session](classes/Session.md)
- [StaticHeaderProvider](classes/StaticHeaderProvider.md)
- [Table](classes/Table.md)
@@ -51,7 +50,6 @@
- [AlterColumnsResult](interfaces/AlterColumnsResult.md)
- [ClientConfig](interfaces/ClientConfig.md)
- [ColumnAlteration](interfaces/ColumnAlteration.md)
- [ColumnOrdering](interfaces/ColumnOrdering.md)
- [CompactionStats](interfaces/CompactionStats.md)
- [ConnectNamespaceOptions](interfaces/ConnectNamespaceOptions.md)
- [ConnectionOptions](interfaces/ConnectionOptions.md)
@@ -80,17 +78,14 @@
- [IvfRqOptions](interfaces/IvfRqOptions.md)
- [ListNamespacesOptions](interfaces/ListNamespacesOptions.md)
- [ListNamespacesResponse](interfaces/ListNamespacesResponse.md)
- [LsmWriteSpec](interfaces/LsmWriteSpec.md)
- [MergeResult](interfaces/MergeResult.md)
- [OpenTableOptions](interfaces/OpenTableOptions.md)
- [OptimizeOptions](interfaces/OptimizeOptions.md)
- [OptimizeStats](interfaces/OptimizeStats.md)
- [QueryExecutionOptions](interfaces/QueryExecutionOptions.md)
- [RemovalStats](interfaces/RemovalStats.md)
- [RenameTableOptions](interfaces/RenameTableOptions.md)
- [RestNamespaceConfig](interfaces/RestNamespaceConfig.md)
- [RetryConfig](interfaces/RetryConfig.md)
- [ScannableOptions](interfaces/ScannableOptions.md)
- [ShuffleOptions](interfaces/ShuffleOptions.md)
- [SplitCalculatedOptions](interfaces/SplitCalculatedOptions.md)
- [SplitHashOptions](interfaces/SplitHashOptions.md)
@@ -105,7 +100,6 @@
- [UpdateResult](interfaces/UpdateResult.md)
- [Version](interfaces/Version.md)
- [WriteExecutionOptions](interfaces/WriteExecutionOptions.md)
- [WriteProgress](interfaces/WriteProgress.md)
## Type Aliases

View File

@@ -19,39 +19,3 @@ mode: "append" | "overwrite";
If "append" (the default) then the new data will be added to the table
If "overwrite" then the new data will replace the existing data in the table.
***
### progress()
```ts
progress: (progress) => void;
```
Optional callback invoked periodically with write progress.
The callback is fired once per batch written and once more with
`done: true` when the write completes. Calls are dispatched
asynchronously to the JS event loop and never block the write — a slow
callback will queue events rather than back-pressure the writer.
Errors thrown from the callback are logged with `console.warn` and
swallowed — they do not abort the write.
#### Parameters
* **progress**: [`WriteProgress`](WriteProgress.md)
#### Returns
`void`
#### Example
```ts
await table.add(data, {
progress: (p) => {
console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
},
});
```

View File

@@ -1,31 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / ColumnOrdering
# Interface: ColumnOrdering
## Properties
### ascending?
```ts
optional ascending: boolean;
```
***
### columnName
```ts
columnName: string;
```
***
### nullsFirst?
```ts
optional nullsFirst: boolean;
```

View File

@@ -70,20 +70,16 @@ client used by manifest-enabled native connections.
optional readConsistencyInterval: number;
```
The interval, in seconds, at which to check for updates to the table
from other processes. If None, then consistency is not checked. For
performance reasons, this is the default. For strong consistency, set
this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero value for
eventual consistency. If more than that interval has passed since the
last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
(For LanceDB OSS only): The interval, in seconds, at which to check for
updates to the table from other processes. If None, then consistency is not
checked. For performance reasons, this is the default. For strong
consistency, set this to zero seconds. Then every read will check for
updates from other processes. As a compromise, you can set this to a
non-zero value for eventual consistency. If more than that interval
has passed since the last check, then the table will be checked for updates.
Note: this consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
***
### region?

View File

@@ -1,67 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / LsmWriteSpec
# Interface: LsmWriteSpec
Specification selecting Lance's MemWAL LSM-style write path for
`mergeInsert`.
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
`column` and `numBuckets` are required; for `"identity"`, `column` is
required and must be a deterministic function of the unenforced primary
key (every row with a given primary key must always produce the same
`column` value, or upserts of that key can land in different shards and a
stale version can win).
## Properties
### column?
```ts
optional column: string;
```
Bucket and identity variants: the sharding column.
***
### maintainedIndexes?
```ts
optional maintainedIndexes: string[];
```
Names of indexes the MemWAL should keep up to date during writes.
***
### numBuckets?
```ts
optional numBuckets: number;
```
Bucket variant: the number of buckets, in `[1, 1024]`.
***
### specType
```ts
specType: "bucket" | "identity" | "unsharded";
```
One of `"bucket"`, `"identity"`, or `"unsharded"`.
***
### writerConfigDefaults?
```ts
optional writerConfigDefaults: Record<string, string>;
```
Default `ShardWriter` configuration recorded in the MemWAL index.

View File

@@ -32,14 +32,6 @@ numInsertedRows: number;
***
### numRows
```ts
numRows: number;
```
***
### numUpdatedRows
```ts

View File

@@ -1,29 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / RenameTableOptions
# Interface: RenameTableOptions
## Properties
### namespacePath?
```ts
optional namespacePath: string[];
```
The namespace path of the table being renamed. Defaults to the root
namespace (`[]`) when omitted.
***
### newNamespacePath?
```ts
optional newNamespacePath: string[];
```
The namespace path to move the table to as part of the rename. When
omitted the table stays in `namespacePath`.

View File

@@ -1,29 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / ScannableOptions
# Interface: ScannableOptions
## Properties
### numRows?
```ts
optional numRows: number;
```
Hint about the number of rows. Not validated against the stream.
***
### rescannable?
```ts
optional rescannable: boolean;
```
Whether the source can be scanned more than once. Defaults to `true` for
`fromTable` / `fromFactory` and `false` for `fromIterable` /
`fromRecordBatchReader`.

View File

@@ -1,84 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / WriteProgress
# Interface: WriteProgress
Progress snapshot for a write operation, delivered to the `progress`
callback passed to [Table.add](../classes/Table.md#add).
## Properties
### activeTasks
```ts
activeTasks: number;
```
Number of parallel write tasks currently in flight.
***
### done
```ts
done: boolean;
```
`true` for the final callback; `false` otherwise.
***
### elapsedSeconds
```ts
elapsedSeconds: number;
```
Wall-clock seconds since the write started.
***
### outputBytes
```ts
outputBytes: number;
```
Number of bytes written so far.
***
### outputRows
```ts
outputRows: number;
```
Number of rows written so far.
***
### totalRows?
```ts
optional totalRows: number;
```
Total rows expected, when the input source reports it.
Always set on the final callback (the one with `done: true`), falling
back to the actual number of rows written when the source could not
report a row count up front.
***
### totalTasks
```ts
totalTasks: number;
```
Total number of parallel write tasks (the write parallelism).

View File

@@ -166,12 +166,6 @@ lists the indices that LanceDb supports.
::: lancedb.index.IvfFlat
::: lancedb.index.IvfSq
::: lancedb.index.IvfRq
::: lancedb.index.HnswFlat
::: lancedb.table.IndexStatistics
## Querying (Asynchronous)

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.30.1-beta.0</version>
<version>0.28.0-beta.11</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.30.1-beta.0</version>
<version>0.28.0-beta.11</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.2.0-beta.1</lance-core.version>
<lance-core.version>7.0.0-beta.7</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.30.1-beta.0"
version = "0.28.0-beta.11"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -47,14 +47,6 @@ describe("given a connection", () => {
await db.close();
expect(db.isOpen()).toBe(false);
await expect(db.tableNames()).rejects.toThrow("Connection is closed");
await expect(db.renameTable("a", "b")).rejects.toThrow(
"Connection is closed",
);
});
it("should report renameTable as unsupported on an OSS connection", async () => {
await db.createTable("a", [{ id: 1 }]);
await expect(db.renameTable("a", "b")).rejects.toThrow(/not supported/);
});
it("should be able to create a table from an object arg `createTable(options)`, or args `createTable(name, data, options)`", async () => {
let tbl = await db.createTable("test", [{ id: 1 }, { id: 2 }]);
@@ -171,22 +163,18 @@ describe("given a connection", () => {
let manifestDir =
tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], {
enableV2ManifestPaths: true,
})) as LocalTable;
expect(await table.usesV2ManifestPaths()).toBe(true);
manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
it("should be able to migrate tables to the V2 manifest paths", async () => {
@@ -203,20 +191,16 @@ describe("given a connection", () => {
const manifestDir =
tmpDir.name + "/test_manifest_path_migration.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
await table.migrateManifestPathsV2();
expect(await table.usesV2ManifestPaths()).toBe(true);
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
});

View File

@@ -109,209 +109,3 @@ describe("Query outputSchema", () => {
expect(schema.fields.length).toBe(3);
});
});
describe("Query orderBy", () => {
let tmpDir: tmp.DirResult;
let table: Table;
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const db = await connect(tmpDir.name);
// Create table with numeric data for sorting
const schema = new Schema([
new Field("id", new Int64(), true),
new Field("score", new Float32(), true),
new Field("name", new Utf8(), true),
]);
const data = makeArrowTable(
[
{ id: 1n, score: 3.5, name: "charlie" },
{ id: 2n, score: 1.2, name: "alice" },
{ id: 3n, score: 2.8, name: "bob" },
{ id: 4n, score: 0.5, name: "david" },
{ id: 5n, score: 4.1, name: "eve" },
],
{ schema },
);
table = await db.createTable("test", data);
});
afterEach(() => {
tmpDir.removeCallback();
});
it("should sort by single column ascending", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: true, nullsFirst: false })
.toArray();
expect(results.length).toBe(5);
// Verify ascending order
expect(results[0].score).toBeCloseTo(0.5, 0.001);
expect(results[1].score).toBeCloseTo(1.2, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(3.5, 0.001);
expect(results[4].score).toBeCloseTo(4.1, 0.001);
});
it("should sort by single column descending", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: false, nullsFirst: false })
.toArray();
expect(results.length).toBe(5);
// Verify descending order
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(1.2, 0.001);
expect(results[4].score).toBeCloseTo(0.5, 0.001);
});
it("should use ascending as default direction", async () => {
const results = await table
.query()
.orderBy({ columnName: "score" })
.toArray();
expect(results.length).toBe(5);
// Verify ascending order (default)
expect(results[0].score).toBeCloseTo(0.5, 0.001);
expect(results[1].score).toBeCloseTo(1.2, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(results[3].score).toBeCloseTo(3.5, 0.001);
expect(results[4].score).toBeCloseTo(4.1, 0.001);
});
it("should sort by string column", async () => {
const results = await table
.query()
.orderBy({ columnName: "name" })
.toArray();
expect(results.length).toBe(5);
// Verify alphabetical order
expect(results[0].name).toBe("alice");
expect(results[1].name).toBe("bob");
expect(results[2].name).toBe("charlie");
expect(results[3].name).toBe("david");
expect(results[4].name).toBe("eve");
});
it("should support method chaining with where", async () => {
const results = await table
.query()
.where("score > 2.0")
.orderBy({ columnName: "score" })
.toArray();
expect(results.length).toBe(3);
// Verify filtered and sorted
expect(results[0].score).toBeCloseTo(2.8, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(4.1, 0.001);
});
it("should support method chaining with limit", async () => {
const results = await table
.query()
.orderBy({ columnName: "score", ascending: false })
.limit(3)
.toArray();
expect(results.length).toBe(3);
// Verify top 3 in descending order
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
});
it("should support method chaining with offset", async () => {
const results = await table
.query()
.orderBy({ columnName: "score" })
.offset(2)
.limit(2)
.toArray();
expect(results.length).toBe(2);
// Verify results skip first 2 and take next 2
expect(results[0].score).toBeCloseTo(2.8, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
});
it("should support method chaining with select", async () => {
const results = await table
.query()
.orderBy({ columnName: "name" })
.select(["name", "score"])
.toArray();
expect(results.length).toBe(5);
// Verify only selected columns are present
expect(Object.keys(results[0])).toEqual(["name", "score"]);
expect(Object.keys(results[4])).toEqual(["name", "score"]);
// Verify sorted by name
expect(results[0].name).toBe("alice");
expect(results[4].name).toBe("eve");
});
it("should support complex method chaining", async () => {
const results = await table
.query()
.where("score > 1.0")
.orderBy({ columnName: "score", ascending: false })
.limit(3)
.select(["id", "score", "name"])
.toArray();
expect(results.length).toBe(3);
// Verify filtered, sorted, limited, and projected
expect(results[0].score).toBeCloseTo(4.1, 0.001);
expect(results[1].score).toBeCloseTo(3.5, 0.001);
expect(results[2].score).toBeCloseTo(2.8, 0.001);
expect(Object.keys(results[0])).toEqual(["id", "score", "name"]);
});
it("should support multi-column ordering and null placement", async () => {
const schema = new Schema([
new Field("group", new Int64(), true),
new Field("score", new Float32(), true),
new Field("name", new Utf8(), true),
]);
const data = makeArrowTable(
[
{ group: 1n, score: null, name: "z" },
{ group: 1n, score: 1.0, name: "b" },
{ group: 1n, score: 1.0, name: "a" },
{ group: 2n, score: 0.5, name: "c" },
],
{ schema },
);
const nullTable = await (await connect(tmpDir.name)).createTable(
"test_multi_order",
data,
{ mode: "overwrite" },
);
const results = await nullTable
.query()
.orderBy([
{ columnName: "group", ascending: true, nullsFirst: false },
{ columnName: "score", ascending: true, nullsFirst: true },
{ columnName: "name", ascending: true, nullsFirst: false },
])
.toArray();
expect(results.map((r) => [r.group, r.score, r.name])).toEqual([
[1n, null, "z"],
[1n, 1.0, "a"],
[1n, 1.0, "b"],
[2n, 0.5, "c"],
]);
});
});

View File

@@ -617,68 +617,4 @@ describe("remote connection", () => {
);
});
});
describe("renameTable", () => {
async function captureRenameRequest(
call: (db: Connection) => Promise<void>,
): Promise<{ url: string; body: Record<string, unknown> }> {
let captured: { url: string; body: Record<string, unknown> } | undefined;
await withMockDatabase((req, res) => {
let raw = "";
req.on("data", (chunk) => {
raw += chunk;
});
req.on("end", () => {
captured = {
url: req.url ?? "",
body: raw ? JSON.parse(raw) : {},
};
res.writeHead(200, { "Content-Type": "application/json" }).end("");
});
}, call);
if (!captured) {
throw new Error("mock server never saw a request");
}
return captured;
}
it("sends rename request for a table in the root namespace", async () => {
const { url, body } = await captureRenameRequest(async (db) => {
await db.renameTable("table1", "table2");
});
expect(url).toBe("/v1/table/table1/rename/");
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
expect(body).toEqual({ new_table_name: "table2" });
});
it("omits new_namespace when only the current namespace is supplied", async () => {
// Safe-default check: passing namespacePath alone must not send
// `new_namespace`, so the server keeps the table in its current
// namespace instead of silently moving it to root.
const { url, body } = await captureRenameRequest(async (db) => {
await db.renameTable("table1", "table2", {
namespacePath: ["ns1"],
});
});
expect(url).toBe("/v1/table/ns1$table1/rename/");
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
expect(body).toEqual({ new_table_name: "table2" });
});
it("includes new_namespace in the body for a cross-namespace rename", async () => {
const { url, body } = await captureRenameRequest(async (db) => {
await db.renameTable("table1", "table2", {
namespacePath: ["ns1"],
newNamespacePath: ["ns2"],
});
});
expect(url).toBe("/v1/table/ns1$table1/rename/");
expect(body).toEqual({
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
new_table_name: "table2",
// biome-ignore lint/style/useNamingConvention: snake_case mandated by the server wire format
new_namespace: ["ns2"],
});
});
});
});

View File

@@ -1,438 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import {
Field,
Float16,
Int32,
type RecordBatch,
RecordBatchReader,
Schema,
tableToIPC,
} from "apache-arrow";
import { makeArrowTable, makeEmptyTable } from "../lancedb/arrow";
import { Scannable } from "../lancedb/scannable";
function makeTable() {
return makeArrowTable(
[
{ id: 1, name: "a" },
{ id: 2, name: "b" },
{ id: 3, name: "c" },
],
{ vectorColumns: {} },
);
}
async function makeReader(): Promise<RecordBatchReader> {
// `RecordBatchReader.from()` returns an unopened reader; `.schema` is only
// populated after `.open()`. Opening sync readers is synchronous.
const reader = RecordBatchReader.from(tableToIPC(makeTable()));
return reader.open() as RecordBatchReader;
}
describe("Scannable", () => {
describe("fromTable", () => {
test("reflects schema, numRows, and defaults rescannable=true", async () => {
const table = makeTable();
const scannable = await Scannable.fromTable(table);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBe(table.numRows);
expect(scannable.rescannable).toBe(true);
});
test("throws when opts.numRows does not match table.numRows", async () => {
await expect(
Scannable.fromTable(makeTable(), { numRows: 42 }),
).rejects.toThrow(/does not match table\.numRows/);
});
test("throws when opts.rescannable is false", async () => {
await expect(
Scannable.fromTable(makeTable(), { rescannable: false }),
).rejects.toThrow(/always rescannable/);
});
});
describe("fromRecordBatchReader", () => {
test("reflects schema and defaults numRows=null, rescannable=false", async () => {
const reader = await makeReader();
const scannable = await Scannable.fromRecordBatchReader(reader);
expect(scannable.schema).toBe(reader.schema);
expect(scannable.numRows).toBeNull();
expect(scannable.rescannable).toBe(false);
});
test("honors numRows override", async () => {
const scannable = await Scannable.fromRecordBatchReader(
await makeReader(),
{ numRows: 3 },
);
expect(scannable.numRows).toBe(3);
expect(scannable.rescannable).toBe(false);
});
test("rescannable: false explicit does not throw", async () => {
const reader = await makeReader();
const scannable = await Scannable.fromRecordBatchReader(reader, {
rescannable: false,
});
expect(scannable.rescannable).toBe(false);
});
test("throws when opts.rescannable is true", async () => {
const reader = await makeReader();
await expect(
Scannable.fromRecordBatchReader(reader, { rescannable: true }),
).rejects.toThrow(/does not accept rescannable/);
});
test("throws when opts.rescannable is true even alongside numRows", async () => {
const reader = await makeReader();
await expect(
Scannable.fromRecordBatchReader(reader, {
numRows: 3,
rescannable: true,
}),
).rejects.toThrow(/does not accept rescannable/);
});
});
describe("fromIterable", () => {
test("accepts a sync iterable of batches", async () => {
const table = makeTable();
const scannable = await Scannable.fromIterable(
table.schema,
table.batches,
);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBeNull();
expect(scannable.rescannable).toBe(false);
});
test("accepts an async iterable of batches", async () => {
const table = makeTable();
async function* generator(): AsyncGenerator<RecordBatch> {
for (const batch of table.batches) {
yield batch;
}
}
const scannable = await Scannable.fromIterable(table.schema, generator());
expect(scannable.schema).toBe(table.schema);
expect(scannable.rescannable).toBe(false);
});
describe("rescannable: true detection", () => {
// Replayable inputs: [Symbol.iterator]() / [Symbol.asyncIterator]()
// returns a fresh iterator each call. Must NOT throw.
test("Array passes (fresh ArrayIterator each call)", async () => {
const table = makeTable();
const scannable = await Scannable.fromIterable(
table.schema,
table.batches,
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
test("Set passes (fresh SetIterator each call)", async () => {
const table = makeTable();
const set = new Set<RecordBatch>(table.batches);
const scannable = await Scannable.fromIterable(table.schema, set, {
rescannable: true,
});
expect(scannable.rescannable).toBe(true);
});
test("custom Iterable returning a fresh iterator passes", async () => {
const table = makeTable();
const replayable: Iterable<RecordBatch> = {
[Symbol.iterator]() {
return table.batches[Symbol.iterator]();
},
};
const scannable = await Scannable.fromIterable(
table.schema,
replayable,
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
test("object with generator method passes (fresh generator each call)", async () => {
const table = makeTable();
const replayable: Iterable<RecordBatch> = {
*[Symbol.iterator]() {
for (const batch of table.batches) yield batch;
},
};
const scannable = await Scannable.fromIterable(
table.schema,
replayable,
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
test("empty Array passes (replayable degenerate case)", async () => {
const schema = makeTable().schema;
const scannable = await Scannable.fromIterable(
schema,
[] as RecordBatch[],
{ rescannable: true },
);
expect(scannable.rescannable).toBe(true);
});
// One-shot inputs: [Symbol.iterator]() / [Symbol.asyncIterator]()
// returns the same object, or the input is already-an-iterator.
// Must throw with a /one-shot/ message.
test("sync generator throws", async () => {
const table = makeTable();
function* generator(): Generator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
await expect(
Scannable.fromIterable(table.schema, generator(), {
rescannable: true,
}),
).rejects.toThrow(/one-shot/);
});
test("async generator throws", async () => {
const table = makeTable();
async function* generator(): AsyncGenerator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
await expect(
Scannable.fromIterable(table.schema, generator(), {
rescannable: true,
}),
).rejects.toThrow(/one-shot/);
});
test("empty generator throws (one-shot degenerate case)", async () => {
const schema = makeTable().schema;
function* generator(): Generator<RecordBatch> {
// intentionally empty; yields nothing.
}
await expect(
Scannable.fromIterable(schema, generator(), { rescannable: true }),
).rejects.toThrow(/one-shot/);
});
test("custom self-iterator throws", async () => {
const table = makeTable();
const batches = table.batches;
let i = 0;
const oneShot: Iterable<RecordBatch> & Iterator<RecordBatch> = {
[Symbol.iterator]() {
return this;
},
next() {
if (i >= batches.length) {
return { done: true, value: undefined };
}
return { done: false, value: batches[i++] };
},
};
await expect(
Scannable.fromIterable(table.schema, oneShot, { rescannable: true }),
).rejects.toThrow(/one-shot/);
});
test("Array.values() (IterableIterator) throws", async () => {
const table = makeTable();
const iter = table.batches.values();
await expect(
Scannable.fromIterable(table.schema, iter, { rescannable: true }),
).rejects.toThrow(/one-shot/);
});
test("raw iterator (only `.next`) throws", async () => {
const table = makeTable();
const batches = table.batches;
let i = 0;
const rawIter = {
next(): IteratorResult<RecordBatch> {
if (i >= batches.length) {
return { done: true, value: undefined };
}
return { done: false, value: batches[i++] };
},
};
await expect(
Scannable.fromIterable(
table.schema,
rawIter as unknown as Iterable<RecordBatch>,
{ rescannable: true },
),
).rejects.toThrow(/one-shot/);
});
// Edge: null/undefined must not crash the detection helper. The
// null check belongs to `normalizeIterator` and only fires when a
// scan starts.
test("null input does not crash detection at construction", async () => {
const schema = makeTable().schema;
await expect(
Scannable.fromIterable(
schema,
null as unknown as Iterable<RecordBatch>,
{
rescannable: true,
},
),
).resolves.toBeDefined();
});
test("undefined input does not crash detection at construction", async () => {
const schema = makeTable().schema;
await expect(
Scannable.fromIterable(
schema,
undefined as unknown as Iterable<RecordBatch>,
{ rescannable: true },
),
).resolves.toBeDefined();
});
// Default (rescannable omitted) skips the check entirely, so even
// pathological inputs construct without throwing here.
test("rescannable omitted skips detection entirely (generator passes)", async () => {
const table = makeTable();
function* generator(): Generator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
const scannable = await Scannable.fromIterable(
table.schema,
generator(),
);
expect(scannable.rescannable).toBe(false);
});
test("rescannable: false explicit skips detection entirely (generator passes)", async () => {
const table = makeTable();
function* generator(): Generator<RecordBatch> {
for (const batch of table.batches) yield batch;
}
const scannable = await Scannable.fromIterable(
table.schema,
generator(),
{ rescannable: false },
);
expect(scannable.rescannable).toBe(false);
});
});
});
describe("fromFactory", () => {
test("defaults rescannable=true and does not invoke the factory eagerly", async () => {
const table = makeTable();
const factory = jest.fn(() => table.batches);
const scannable = await Scannable.fromFactory(table.schema, factory);
expect(scannable.schema).toBe(table.schema);
expect(scannable.rescannable).toBe(true);
expect(factory).not.toHaveBeenCalled();
});
test("honors rescannable and numRows overrides", async () => {
const table = makeTable();
const scannable = await Scannable.fromFactory(
table.schema,
() => table.batches,
{ numRows: 7, rescannable: false },
);
expect(scannable.numRows).toBe(7);
expect(scannable.rescannable).toBe(false);
});
});
describe("validation", () => {
test("throws when numRows is negative", async () => {
await expect(
Scannable.fromFactory(makeTable().schema, () => [], { numRows: -1 }),
).rejects.toThrow(/non-negative/);
});
test("throws when numRows is not an integer", async () => {
await expect(
Scannable.fromFactory(makeTable().schema, () => [], { numRows: 3.5 }),
).rejects.toThrow(/integer/);
});
});
describe("native handle", () => {
test("exposes a native handle via inner", async () => {
const scannable = await Scannable.fromTable(makeTable());
expect(scannable.inner).toBeDefined();
expect(typeof scannable.inner).toBe("object");
expect(scannable.inner).not.toBeNull();
});
});
// Schema-variety construction tests. Each asserts that construction
// succeeds against a richer Arrow schema, which transitively exercises
// schema serialization and the Rust-side `ipc_file_to_schema` for types
// beyond flat primitives.
describe("schema variety", () => {
test("accepts an empty table", async () => {
const schema = new Schema([new Field("id", new Int32(), true)]);
const table = makeEmptyTable(schema);
const scannable = await Scannable.fromTable(table);
expect(scannable.numRows).toBe(0);
expect(scannable.schema).toBe(table.schema);
});
test("accepts nested struct and list columns", async () => {
const table = makeArrowTable(
[
{ id: 1, point: { x: 0, y: 0 }, tags: ["a", "b"] },
{ id: 2, point: { x: 1, y: 2 }, tags: ["c"] },
],
{ vectorColumns: {} },
);
const scannable = await Scannable.fromTable(table);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBe(2);
});
test("accepts a FixedSizeList (vector) column", async () => {
const table = makeArrowTable(
[
{ id: 1, vec: [1, 2, 3] },
{ id: 2, vec: [4, 5, 6] },
],
{ vectorColumns: { vec: { type: new Float16() } } },
);
const scannable = await Scannable.fromTable(table);
expect(scannable.schema).toBe(table.schema);
expect(scannable.numRows).toBe(2);
});
test("accepts a table with many columns", async () => {
const row: Record<string, number> = {};
for (let i = 0; i < 50; i++) row[`c${i}`] = i;
const table = makeArrowTable([row, row], { vectorColumns: {} });
const scannable = await Scannable.fromTable(table);
expect(scannable.schema.fields.length).toBe(50);
expect(scannable.numRows).toBe(2);
});
});
});

View File

@@ -28,7 +28,6 @@ import {
List,
Schema,
SchemaLike,
Struct,
Type,
Uint8,
Utf8,
@@ -116,46 +115,10 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
await expect(table.countRows()).resolves.toBe(1);
});
it("should invoke the progress callback", async () => {
const events: import("../lancedb").WriteProgress[] = [];
await table.add([{ id: 1 }, { id: 2 }, { id: 3 }], {
progress: (p) => events.push(p),
});
expect(events.length).toBeGreaterThan(0);
const last = events[events.length - 1];
expect(last.done).toBe(true);
// Earlier callbacks must have done=false.
for (const ev of events.slice(0, -1)) {
expect(ev.done).toBe(false);
}
// outputRows reflects the rows added in this call, not table size.
expect(last.outputRows).toBe(3);
// The input source (an array) reports a row count, so totalRows is set.
expect(last.totalRows).toBe(3);
// outputRows is monotonic.
for (let i = 1; i < events.length; i++) {
expect(events[i].outputRows).toBeGreaterThanOrEqual(
events[i - 1].outputRows,
);
}
});
it("should swallow errors thrown from the progress callback", async () => {
const warn = jest
.spyOn(console, "warn")
.mockImplementation(() => undefined);
try {
const res = await table.add([{ id: 1 }, { id: 2 }], {
progress: () => {
throw new Error("callback bomb");
},
});
expect(res.version).toBeGreaterThan(0);
expect(warn).toHaveBeenCalled();
} finally {
warn.mockRestore();
}
it("should accept skipAutoCleanup on add()", async () => {
await table.add([{ id: 1 }], { skipAutoCleanup: true });
await table.add([{ id: 2 }], { skipAutoCleanup: true });
await expect(table.countRows()).resolves.toBe(2);
});
it("should let me close the table", async () => {
@@ -781,113 +744,6 @@ describe("When creating an index", () => {
expect(indices2.length).toBe(0);
});
it("should create and search a nested vector index", async () => {
const db = await connect(tmpDir.name);
const nestedSchema = new Schema([
new Field("id", new Int32(), true),
new Field(
"image",
new Struct([
new Field(
"embedding",
new FixedSizeList(2, new Field("item", new Float32(), true)),
true,
),
]),
true,
),
]);
const nestedTable = await db.createTable(
"nested_vector",
makeArrowTable(
Array.from({ length: 300 }, (_, id) => ({
id,
image: { embedding: [id, id + 1] },
})),
{ schema: nestedSchema },
),
);
await nestedTable.createIndex("image.embedding", {
name: "image_embedding_idx",
});
const indices = await nestedTable.listIndices();
expect(indices).toContainEqual({
name: "image_embedding_idx",
indexType: "IvfPq",
columns: ["image.embedding"],
});
const explicit = await nestedTable
.query()
.nearestTo([0.0, 1.0])
.column("image.embedding")
.limit(1)
.toArray();
const inferred = await nestedTable
.query()
.nearestTo([0.0, 1.0])
.limit(1)
.toArray();
expect(inferred[0].id).toEqual(explicit[0].id);
});
it("should report multiple nested vector candidates", async () => {
const db = await connect(tmpDir.name);
const nestedSchema = new Schema([
new Field(
"image",
new Struct([
new Field(
"embedding",
new FixedSizeList(2, new Field("item", new Float32(), true)),
true,
),
]),
true,
),
new Field(
"text",
new Struct([
new Field(
"embedding",
new FixedSizeList(2, new Field("item", new Float32(), true)),
true,
),
]),
true,
),
]);
const nestedTable = await db.createTable(
"multiple_nested_vectors",
makeArrowTable(
[
{
image: { embedding: [0.0, 1.0] },
text: { embedding: [2.0, 3.0] },
},
],
{ schema: nestedSchema },
),
);
await expect(
nestedTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
).rejects.toThrow(/image\.embedding.*text\.embedding/);
});
it("should report when no default vector column exists", async () => {
const db = await connect(tmpDir.name);
const noVectorTable = await db.createTable(
"no_vector",
makeArrowTable([{ id: 0, label: "cat" }]),
);
await expect(
noVectorTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
).rejects.toThrow(/No vector column/);
});
it("should wait for index readiness", async () => {
// Create an index and then wait for it to be ready
await tbl.createIndex("vec");
@@ -2498,224 +2354,3 @@ describe("when creating a table with Float32Array vectors", () => {
expect((fsl.children[0].type as Float32).precision).toBe(1);
});
});
describe("setUnenforcedPrimaryKey", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
it("sets a single-column primary key (string or one-element array)", async () => {
const conn = await connect(tmpDir.name);
const schema = new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
]);
const t1 = await conn.createEmptyTable("t1", schema);
await t1.setUnenforcedPrimaryKey("id");
const t2 = await conn.createEmptyTable("t2", schema);
await t2.setUnenforcedPrimaryKey(["id"]);
});
it("rejects a compound primary key", async () => {
const conn = await connect(tmpDir.name);
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
new arrow.Field("name", new arrow.Utf8(), false),
]),
);
await expect(
table.setUnenforcedPrimaryKey(["id", "name"]),
).rejects.toThrow();
});
it("rejects changing the primary key once set", async () => {
const conn = await connect(tmpDir.name);
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Int64(), false),
new arrow.Field("name", new arrow.Utf8(), false),
]),
);
await table.setUnenforcedPrimaryKey("id");
await expect(table.setUnenforcedPrimaryKey("name")).rejects.toThrow();
await expect(table.setUnenforcedPrimaryKey("id")).rejects.toThrow();
});
});
describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
async function makeTable(conn: Connection): Promise<Table> {
return await conn.createEmptyTable(
"t",
new arrow.Schema([new arrow.Field("id", new arrow.Int64(), false)]),
);
}
it("installs and removes a bucket spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 4,
});
await table.unsetLsmWriteSpec();
// A second unset errors — there is no spec left to remove.
await expect(table.unsetLsmWriteSpec()).rejects.toThrow();
// A fresh spec can be installed after unset.
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 8,
});
});
it("installs an unsharded spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({ specType: "unsharded" });
await table.unsetLsmWriteSpec();
});
it("installs an identity spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
await table.setLsmWriteSpec({ specType: "identity", column: "id" });
await table.unsetLsmWriteSpec();
});
it("rejects an invalid spec", async () => {
const conn = await connect(tmpDir.name);
const table = await makeTable(conn);
await table.setUnenforcedPrimaryKey("id");
// num_buckets out of range.
await expect(
table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 0,
}),
).rejects.toThrow();
// Column mismatch.
await expect(
table.setLsmWriteSpec({
specType: "bucket",
column: "missing",
numBuckets: 4,
}),
).rejects.toThrow();
});
});
describe("LSM merge insert", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
async function bucketTable(conn: Connection): Promise<Table> {
// The primary key column must be non-nullable.
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Utf8(), false),
new arrow.Field("value", new arrow.Float64(), true),
]),
);
await table.add([
{ id: "a", value: 1 },
{ id: "b", value: 2 },
]);
await table.setUnenforcedPrimaryKey("id");
// numBuckets = 1: every row routes to the single bucket.
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 1,
});
return table;
}
it("routes merge_insert through the shard writer", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute([
{ id: "c", value: 3 },
{ id: "d", value: 4 },
]);
// LSM path: rows go to the MemWAL, so only numRows is populated.
expect(res.numRows).toBe(2);
expect(res.version).toBe(0);
expect(res.numInsertedRows).toBe(0);
await table.closeLsmWriters();
});
it("falls back to the standard path with useLsmWrite(false)", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.useLsmWrite(false)
.execute([
{ id: "b", value: 9 },
{ id: "e", value: 5 },
]);
// Standard path commits: id="e" inserted ("b" already exists).
expect(res.numInsertedRows).toBe(1);
expect(await table.countRows()).toBe(3);
});
it("supports validateSingleShard(false)", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.validateSingleShard(false)
.execute([{ id: "f", value: 6 }]);
expect(res.numRows).toBe(1);
});
it("rejects a non-upsert merge under an LSM spec", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
await expect(
table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.execute([{ id: "g", value: 7 }]),
).rejects.toThrow();
});
});

View File

@@ -38,14 +38,5 @@ test("filtering examples", async () => {
// --8<-- [start:sql_search]
await tbl.query().where("id = 10").limit(10).toArray();
// --8<-- [end:sql_search]
// --8<-- [start:orderby_search]
await tbl
.query()
.where("id > 10")
.orderBy({ columnName: "id", ascending: false })
.limit(5)
.toArray();
// --8<-- [end:orderby_search]
});
});

View File

@@ -1291,18 +1291,6 @@ export async function fromRecordBatchToBuffer(
return Buffer.from(await writer.toUint8Array());
}
/**
* Create a buffer containing a single record batch using the Arrow IPC Stream
* serialization. Each call produces a self-contained Stream message (schema +
* batch + EOS) suitable for incremental decode by `arrow_ipc::reader::StreamReader`.
*/
export async function fromRecordBatchToStreamBuffer(
batch: RecordBatch,
): Promise<Buffer> {
const writer = RecordBatchStreamWriter.writeAll([batch]);
return Buffer.from(await writer.toUint8Array());
}
/**
* Serialize an Arrow Table into a buffer using the Arrow IPC Stream serialization
*

View File

@@ -144,19 +144,6 @@ export interface DropNamespaceOptions {
behavior?: "restrict" | "cascade";
}
export interface RenameTableOptions {
/**
* The namespace path of the table being renamed. Defaults to the root
* namespace (`[]`) when omitted.
*/
namespacePath?: string[];
/**
* The namespace path to move the table to as part of the rename. When
* omitted the table stays in `namespacePath`.
*/
newNamespacePath?: string[];
}
/**
* A LanceDB Connection that allows you to open tables and create new ones.
*
@@ -404,24 +391,6 @@ export abstract class Connection {
isShallow?: boolean;
},
): Promise<Table>;
/**
* Rename a table.
*
* Currently only supported by LanceDB Cloud. Local OSS connections and
* namespace-backed connections (via {@link connectNamespace}) reject with
* a "not supported" error.
*
* @param {string} currentName - The current name of the table.
* @param {string} newName - The new name for the table.
* @param {RenameTableOptions} options - Optional namespace paths. When
* `newNamespacePath` is omitted the table stays in `namespacePath`.
*/
abstract renameTable(
currentName: string,
newName: string,
options?: RenameTableOptions,
): Promise<void>;
}
/** @hideconstructor */
@@ -682,19 +651,6 @@ export class LocalConnection extends Connection {
options?.behavior,
);
}
async renameTable(
currentName: string,
newName: string,
options?: RenameTableOptions,
): Promise<void> {
return this.inner.renameTable(
currentName,
newName,
options?.namespacePath ?? [],
options?.newNamespacePath,
);
}
}
/**

View File

@@ -71,7 +71,6 @@ export {
CreateNamespaceResponse,
DropNamespaceResponse,
DescribeNamespaceResponse,
RenameTableOptions,
} from "./connection";
export { Session } from "./native.js";
@@ -83,7 +82,6 @@ export {
VectorQuery,
TakeQuery,
QueryExecutionOptions,
ColumnOrdering,
FullTextSearchOptions,
RecordBatchIterator,
FullTextQuery,
@@ -114,8 +112,6 @@ export {
UpdateOptions,
OptimizeOptions,
Version,
WriteProgress,
LsmWriteSpec,
ColumnAlteration,
} from "./table";
@@ -130,7 +126,6 @@ export { MergeInsertBuilder, WriteExecutionOptions } from "./merge";
export * as embedding from "./embedding";
export { permutationBuilder, PermutationBuilder } from "./permutation";
export { Scannable, ScannableOptions } from "./scannable";
export * as rerankers from "./rerankers";
export {
SchemaLike,

View File

@@ -87,38 +87,20 @@ export class MergeInsertBuilder {
this.#schema,
);
}
/**
* Controls whether the merge uses the MemWAL LSM write path.
* Skip the automatic cleanup of old dataset versions that would otherwise
* run as part of this merge insert's commit. Forwards to
* `MergeInsertBuilder::skip_auto_cleanup` in lance-core.
*
* By default (unset), a `mergeInsert` on a table with an LSM write spec is
* routed through Lance's MemWAL shard writer, and a table without one uses
* the standard path. Pass `false` to force the standard path even when a
* spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
* is installed.
* Useful for high-frequency writers that prefer to manage version cleanup
* themselves, or writers without delete permissions on the underlying storage.
*
* @param useLsmWrite - Whether to use the LSM write path.
* @param skip - If true, the auto-cleanup step is skipped at commit time.
*/
useLsmWrite(useLsmWrite: boolean): MergeInsertBuilder {
skipAutoCleanup(skip: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.useLsmWrite(useLsmWrite),
this.#schema,
);
}
/**
* Controls how an LSM merge checks that its input targets a single shard.
*
* When a table has an LSM write spec, every row in a `mergeInsert` call must
* route to the same shard. When `true` (the default), every row is inspected
* to verify this. When `false`, only the first row is inspected and the
* shard it routes to is used for the whole input — a faster path for callers
* that have already pre-sharded their input. Has no effect on tables without
* an LSM write spec.
*
* @param validateSingleShard - Whether to check every row routes to one shard. Defaults to `true`.
*/
validateSingleShard(validateSingleShard: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.validateSingleShard(validateSingleShard),
this.#native.skipAutoCleanup(skip),
this.#schema,
);
}

View File

@@ -79,12 +79,6 @@ export interface QueryExecutionOptions {
timeoutMs?: number;
}
export interface ColumnOrdering {
columnName: string;
ascending?: boolean;
nullsFirst?: boolean;
}
/**
* Options that control the behavior of a full text search
*/
@@ -423,21 +417,6 @@ export class StandardQueryBase<
return this;
}
/**
* Sort the results by the specified column(s).
* @returns This query builder.
*/
orderBy(ordering: ColumnOrdering | ColumnOrdering[]): this {
const orderings = Array.isArray(ordering) ? ordering : [ordering];
const normalized = orderings.map((o) => ({
columnName: o.columnName,
ascending: o.ascending ?? true,
nullsFirst: o.nullsFirst ?? false,
}));
this.doCall((inner) => inner.orderBy(normalized));
return this;
}
/**
* Skip searching un-indexed data. This can make search faster, but will miss
* any data that is not yet indexed.

View File

@@ -1,274 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import {
Table as ArrowTable,
RecordBatch,
RecordBatchReader,
Schema,
} from "apache-arrow";
import {
fromRecordBatchToStreamBuffer,
fromTableToBuffer,
makeEmptyTable,
} from "./arrow";
import { NapiScannable } from "./native.js";
export interface ScannableOptions {
/** Hint about the number of rows. Not validated against the stream. */
numRows?: number;
/**
* Whether the source can be scanned more than once. Defaults to `true` for
* `fromTable` / `fromFactory` and `false` for `fromIterable` /
* `fromRecordBatchReader`.
*/
rescannable?: boolean;
}
/**
* A data source that can be scanned as a stream of Arrow `RecordBatch`es.
*
* `Scannable` wraps the schema + optional row count + rescannable flag and
* a callback that yields batches one at a time. It is passed to consumers
* (e.g. `Table.add`, `createTable`, `mergeInsert` — follow-up work) that
* need to pull data without materializing the full dataset in JS memory.
*
* Batches cross the JS↔Rust boundary as Arrow IPC Stream messages; a fresh
* writer serializes each batch, and the Rust side decodes it with
* `arrow_ipc::reader::StreamReader`. One batch is in flight at a time.
*/
export class Scannable {
readonly schema: Schema;
readonly numRows: number | null;
readonly rescannable: boolean;
/** @hidden */
private readonly native: NapiScannable;
private constructor(
native: NapiScannable,
schema: Schema,
numRows: number | null,
rescannable: boolean,
) {
this.native = native;
this.schema = schema;
this.numRows = numRows;
this.rescannable = rescannable;
}
/** @hidden Access the native handle for passing through to Rust consumers. */
get inner(): NapiScannable {
return this.native;
}
/**
* Build a Scannable from an explicit schema and a factory that returns a
* fresh batch iterator on each call.
*
* The factory is invoked once per scan. Each iterator yields
* `RecordBatch`es matching the declared schema. Use this when you need
* direct control over the pull loop — for example, to wrap a streaming
* source whose batches are produced lazily.
*
* @param schema - The Arrow schema of the produced batches.
* @param factory - Called at the start of each scan to produce a batch
* iterator. Must be idempotent when `rescannable` is true.
* @param opts - Optional hints. `rescannable` defaults to `true`; set to
* `false` if calling `factory()` twice would not reproduce the same data.
*/
static async fromFactory(
schema: Schema,
factory: () =>
| AsyncIterable<RecordBatch>
| Iterable<RecordBatch>
| AsyncIterator<RecordBatch>
| Iterator<RecordBatch>,
opts: ScannableOptions = {},
): Promise<Scannable> {
const numRows = opts.numRows ?? null;
if (numRows != null && !Number.isInteger(numRows)) {
throw new TypeError("numRows must be an integer");
}
const rescannable = opts.rescannable ?? true;
let iter: AsyncIterator<RecordBatch> | Iterator<RecordBatch> | null = null;
const getNextBatch = async (isStart: boolean): Promise<Buffer | null> => {
// `isStart` is true on the first pull of every new scan_as_stream.
// Drop any cached iterator so factory() is re-invoked for the next scan
if (isStart) {
iter = null;
}
if (iter === null) {
iter = normalizeIterator(factory());
}
const result = await iter.next();
if (result.done) {
iter = null;
return null;
}
return fromRecordBatchToStreamBuffer(result.value);
};
const schemaBuf = await fromTableToBuffer(makeEmptyTable(schema));
const native = new NapiScannable(
schemaBuf,
numRows,
rescannable,
getNextBatch,
);
return new Scannable(native, schema, numRows, rescannable);
}
/**
* Build a Scannable from an in-memory Arrow `Table`. Always rescannable;
* the table's batches are replayed on each scan.
*
* The table's row count is authoritative: `opts.numRows` must either be
* omitted or equal to `table.numRows`. `opts.rescannable` of `false` is
* rejected because in-memory Tables are always rescannable.
*/
static async fromTable(
table: ArrowTable,
opts: ScannableOptions = {},
): Promise<Scannable> {
if (opts.numRows != null && opts.numRows !== table.numRows) {
throw new TypeError(
`opts.numRows (${opts.numRows}) does not match table.numRows (${table.numRows}). ` +
`The table's row count is authoritative; omit numRows or pass the matching value.`,
);
}
if (opts.rescannable === false) {
throw new TypeError(
`fromTable does not accept rescannable: false. ` +
`In-memory Arrow Tables are always rescannable; omit the option or pass true.`,
);
}
return Scannable.fromFactory(table.schema, () => table.batches, {
numRows: table.numRows,
rescannable: true,
});
}
/**
* Build a Scannable from an iterable of `RecordBatch`es. `rescannable`
* defaults to `false`. Pass an explicit schema so the consumer can
* validate before any batch is pulled.
*
* `opts.rescannable: true` is honest for replayable iterables (Arrays,
* Sets, or custom iterables whose `[Symbol.iterator]()` returns a fresh
* iterator each call). It is rejected for one-shot iterables (generators,
* async generators, or already-an-iterator inputs) because their
* `[Symbol.iterator]()` returns the same exhausted object on the second
* scan. For replayable sources outside this shape, use
* `fromFactory(schema, () => createIter(), { rescannable: true })`.
*
* Note: when `opts.rescannable` is `true`, the constructor calls
* `[Symbol.iterator]()` once on the input to perform the structural check.
*/
static async fromIterable(
schema: Schema,
iter: AsyncIterable<RecordBatch> | Iterable<RecordBatch>,
opts: ScannableOptions = {},
): Promise<Scannable> {
if (opts.rescannable === true && isOneShotIterable(iter)) {
throw new TypeError(
`fromIterable: rescannable: true is not honest for one-shot iterables ` +
`(generators, async generators, or iterators where [Symbol.iterator]() ` +
`returns the same object). The source would be exhausted after the first scan. ` +
`Use fromFactory(schema, () => createIter(), { rescannable: true }) for sources ` +
`where each call mints a fresh iterator.`,
);
}
return Scannable.fromFactory(schema, () => iter, {
numRows: opts.numRows,
rescannable: opts.rescannable ?? false,
});
}
/**
* Build a Scannable from an Arrow `RecordBatchReader`. A reader can only
* be consumed once; `rescannable` defaults to `false`.
*
* The reader must already be opened (via `.open()`) so its `.schema` is
* populated. `RecordBatchReader.from(...)` returns an unopened reader.
*
* `opts.rescannable: true` is rejected because `RecordBatchReader` is a
* self-iterator (its `[Symbol.iterator]()` returns itself), and this
* constructor does not call `reader.reset()` between scans, so a second
* scan would always see an exhausted reader. For genuinely replayable
* sources, use
* `fromFactory(schema, () => openReader(), { rescannable: true })`,
* which mints a fresh reader on each scan.
*/
static async fromRecordBatchReader(
reader: RecordBatchReader,
opts: ScannableOptions = {},
): Promise<Scannable> {
if (opts.rescannable === true) {
throw new TypeError(
`fromRecordBatchReader does not accept rescannable: true. ` +
`RecordBatchReader is a self-iterator (its [Symbol.iterator]() ` +
`returns itself) and would be exhausted after the first scan. ` +
`Use fromFactory(schema, () => openReader(), { rescannable: true }) ` +
`for sources where each call mints a fresh reader.`,
);
}
return Scannable.fromFactory(reader.schema, () => reader, {
numRows: opts.numRows,
rescannable: false,
});
}
}
function normalizeIterator<T>(
source: AsyncIterable<T> | Iterable<T> | AsyncIterator<T> | Iterator<T>,
): AsyncIterator<T> | Iterator<T> {
if (source == null) {
throw new TypeError("Scannable factory returned null/undefined");
}
if (
typeof (source as AsyncIterable<T>)[Symbol.asyncIterator] === "function"
) {
return (source as AsyncIterable<T>)[Symbol.asyncIterator]();
}
if (typeof (source as Iterable<T>)[Symbol.iterator] === "function") {
return (source as Iterable<T>)[Symbol.iterator]();
}
// Already an iterator (has `.next`).
if (typeof (source as Iterator<T>).next === "function") {
return source as Iterator<T>;
}
throw new TypeError("Scannable factory returned a non-iterable value");
}
// A "self-iterator" returns the same object from `[Symbol.iterator]()` /
// `[Symbol.asyncIterator]()`. Generators behave this way, so they exhaust
// after one pass. Replayable iterables (Array, Set, custom) return a fresh
// iterator each call. Detection mirrors `normalizeIterator`'s ordering so
// classification matches scan-time behavior.
function isOneShotIterable(
source: AsyncIterable<unknown> | Iterable<unknown>,
): boolean {
// null/undefined are not one-shot in any meaningful sense; let
// `normalizeIterator` raise the actual error at scan time.
if (source == null) return false;
const ref = source as unknown;
if (
typeof (source as AsyncIterable<unknown>)[Symbol.asyncIterator] ===
"function"
) {
const it = (source as AsyncIterable<unknown>)[
Symbol.asyncIterator
]() as unknown;
return it === ref;
}
if (typeof (source as Iterable<unknown>)[Symbol.iterator] === "function") {
const it = (source as Iterable<unknown>)[Symbol.iterator]() as unknown;
return it === ref;
}
// Already-an-iterator (has `.next` but no `Symbol.iterator`) is by
// definition one-shot.
if (typeof (source as { next?: unknown }).next === "function") return true;
return false;
}

View File

@@ -46,33 +46,6 @@ import { sanitizeType } from "./sanitize";
import { IntoSql, toSQL } from "./util";
export { IndexConfig } from "./native";
/**
* Progress snapshot for a write operation, delivered to the `progress`
* callback passed to {@link Table.add}.
*/
export interface WriteProgress {
/** Number of rows written so far. */
outputRows: number;
/** Number of bytes written so far. */
outputBytes: number;
/**
* Total rows expected, when the input source reports it.
*
* Always set on the final callback (the one with `done: true`), falling
* back to the actual number of rows written when the source could not
* report a row count up front.
*/
totalRows?: number;
/** Wall-clock seconds since the write started. */
elapsedSeconds: number;
/** Number of parallel write tasks currently in flight. */
activeTasks: number;
/** Total number of parallel write tasks (the write parallelism). */
totalTasks: number;
/** `true` for the final callback; `false` otherwise. */
done: boolean;
}
/**
* Options for adding data to a table.
*/
@@ -83,28 +56,18 @@ export interface AddDataOptions {
* If "overwrite" then the new data will replace the existing data in the table.
*/
mode: "append" | "overwrite";
/**
* Optional callback invoked periodically with write progress.
* If true, skip the automatic cleanup of old dataset versions that would
* otherwise run as part of this write's commit. Forwards to
* `WriteParams.skip_auto_cleanup` in lance-core.
*
* The callback is fired once per batch written and once more with
* `done: true` when the write completes. Calls are dispatched
* asynchronously to the JS event loop and never block the write — a slow
* callback will queue events rather than back-pressure the writer.
* Useful for high-frequency writers that prefer to manage version cleanup
* themselves (for example, via a separate periodic optimize job), or for
* writers that don't have delete permissions on the underlying storage.
*
* Errors thrown from the callback are logged with `console.warn` and
* swallowed — they do not abort the write.
*
* @example
* ```ts
* await table.add(data, {
* progress: (p) => {
* console.log(`${p.outputRows}/${p.totalRows ?? "?"} rows`);
* },
* });
* ```
* Defaults to false.
*/
progress: (progress: WriteProgress) => void;
skipAutoCleanup?: boolean;
}
export interface UpdateOptions {
@@ -155,30 +118,6 @@ export interface Version {
metadata: Record<string, string>;
}
/**
* Specification selecting Lance's MemWAL LSM-style write path for
* `mergeInsert`.
*
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
* `column` and `numBuckets` are required; for `"identity"`, `column` is
* required and must be a deterministic function of the unenforced primary
* key (every row with a given primary key must always produce the same
* `column` value, or upserts of that key can land in different shards and a
* stale version can win).
*/
export interface LsmWriteSpec {
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
specType: "bucket" | "identity" | "unsharded";
/** Bucket and identity variants: the sharding column. */
column?: string;
/** Bucket variant: the number of buckets, in `[1, 1024]`. */
numBuckets?: number;
/** Names of indexes the MemWAL should keep up to date during writes. */
maintainedIndexes?: string[];
/** Default `ShardWriter` configuration recorded in the MemWAL index. */
writerConfigDefaults?: Record<string, string>;
}
/**
* A Table is a collection of Records in a LanceDB Database.
*
@@ -522,64 +461,6 @@ export abstract class Table {
* containing the new version number of the table after dropping the columns.
*/
abstract dropColumns(columnNames: string[]): Promise<DropColumnsResult>;
/**
* Set the unenforced primary key for this table to a single column.
*
* "Unenforced" means LanceDB does not check uniqueness on writes; the
* column is recorded in the schema as the primary key for use by features
* such as `merge_insert`. Only single-column primary keys are supported,
* and the key cannot be changed once set.
* @param {string | string[]} columns The primary key column. A one-element
* array is also accepted; passing more than one column is rejected.
* @returns {Promise<void>}
*/
abstract setUnenforcedPrimaryKey(columns: string | string[]): Promise<void>;
/**
* Install an {@link LsmWriteSpec} on this table, selecting Lance's MemWAL
* LSM-style write path for future `mergeInsert` calls.
*
* `LsmWriteSpec` chooses one of three sharding strategies via `specType`:
*
* - `"bucket"` — hash-bucket writes by the single-column unenforced primary
* key (`column` and `numBuckets` required).
* - `"identity"` — shard by the raw value of a scalar `column`.
* - `"unsharded"` — route every write to a single shard.
*
* All variants require the table to have an unenforced primary key
* ({@link Table#setUnenforcedPrimaryKey}); bucket sharding additionally
* requires it to be the single column being bucketed.
* @param {LsmWriteSpec} spec The sharding spec to install.
* @returns {Promise<void>}
* @example
* ```ts
* await table.setUnenforcedPrimaryKey("id");
* await table.setLsmWriteSpec({
* specType: "bucket",
* column: "id",
* numBuckets: 16,
* maintainedIndexes: ["id_idx"],
* });
* ```
*/
abstract setLsmWriteSpec(spec: LsmWriteSpec): Promise<void>;
/**
* Remove the {@link LsmWriteSpec} from this table, reverting to the standard
* `mergeInsert` write path.
*
* Errors if no spec is currently set.
* @returns {Promise<void>}
*/
abstract unsetLsmWriteSpec(): Promise<void>;
/**
* Drain and close any cached MemWAL shard writers held for this table.
*
* When an {@link LsmWriteSpec} is installed, `mergeInsert` opens MemWAL
* shard writers and caches them for reuse across calls. This closes them,
* flushing pending data; writers reopen lazily on the next `mergeInsert`.
* It is a no-op when no writers are cached.
* @returns {Promise<void>}
*/
abstract closeLsmWriters(): Promise<void>;
/** Retrieve the version of the table */
abstract version(): Promise<number>;
@@ -767,20 +648,7 @@ export class LocalTable extends Table {
const schema = await this.schema();
const buffer = await fromDataToBuffer(data, undefined, schema);
// Wrap the user callback so a thrown error doesn't surface as an
// unhandled exception (the callback fires from a napi threadsafe
// function — exceptions there crash the process).
const userProgress = options?.progress;
const progress = userProgress
? (p: WriteProgress) => {
try {
userProgress(p);
} catch (e) {
console.warn("Table.add progress callback threw:", e);
}
}
: undefined;
return await this.inner.add(buffer, mode, progress);
return await this.inner.add(buffer, mode, options?.skipAutoCleanup);
}
async update(
@@ -1041,23 +909,6 @@ export class LocalTable extends Table {
return await this.inner.dropColumns(columnNames);
}
async setUnenforcedPrimaryKey(columns: string | string[]): Promise<void> {
const cols = typeof columns === "string" ? [columns] : columns;
return await this.inner.setUnenforcedPrimaryKey(cols);
}
async setLsmWriteSpec(spec: LsmWriteSpec): Promise<void> {
return await this.inner.setLsmWriteSpec(spec);
}
async unsetLsmWriteSpec(): Promise<void> {
return await this.inner.unsetLsmWriteSpec();
}
async closeLsmWriters(): Promise<void> {
return await this.inner.closeLsmWriters();
}
async version(): Promise<number> {
return await this.inner.version();
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

11029
nodejs/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.30.1-beta.0",
"version": "0.28.0-beta.11",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -459,23 +459,4 @@ impl Connection {
transaction_id: resp.transaction_id,
})
}
/// Rename a table. `current_namespace_path` and `new_namespace_path` default to
/// the root namespace when omitted; the caller is expected to either pass both
/// or pass neither.
#[napi(catch_unwind)]
pub async fn rename_table(
&self,
current_name: String,
new_name: String,
current_namespace_path: Option<Vec<String>>,
new_namespace_path: Option<Vec<String>>,
) -> napi::Result<()> {
let cur_ns = current_namespace_path.unwrap_or_default();
let new_ns = new_namespace_path.unwrap_or_default();
self.get_inner()?
.rename_table(&current_name, &new_name, &cur_ns, &new_ns)
.await
.default_error()
}
}

View File

@@ -16,7 +16,6 @@ pub mod permutation;
mod query;
pub mod remote;
mod rerankers;
mod scannable;
mod session;
mod table;
mod util;
@@ -24,19 +23,15 @@ mod util;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
/// The interval, in seconds, at which to check for updates to the table
/// from other processes. If None, then consistency is not checked. For
/// performance reasons, this is the default. For strong consistency, set
/// this to zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero value for
/// eventual consistency. If more than that interval has passed since the
/// last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
pub read_consistency_interval: Option<f64>,
/// (For LanceDB OSS only): configuration for object storage.
///

View File

@@ -51,16 +51,9 @@ impl NativeMergeInsertBuilder {
}
#[napi]
pub fn use_lsm_write(&self, use_lsm_write: bool) -> Self {
pub fn skip_auto_cleanup(&self, skip: bool) -> Self {
let mut this = self.clone();
this.inner.use_lsm_write(use_lsm_write);
this
}
#[napi]
pub fn validate_single_shard(&self, validate_single_shard: bool) -> Self {
let mut this = self.clone();
this.inner.validate_single_shard(validate_single_shard);
this.inner.skip_auto_cleanup(skip);
this
}

View File

@@ -3,12 +3,6 @@
use std::sync::Arc;
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer};
use arrow_array::{
Array, Float16Array as ArrowFloat16Array, Float32Array as ArrowFloat32Array,
Float64Array as ArrowFloat64Array, UInt8Array as ArrowUInt8Array,
@@ -25,27 +19,16 @@ use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::Select;
use lancedb::query::TakeQuery as LanceDbTakeQuery;
use lancedb::query::{ColumnOrdering as LanceDbColumnOrdering, VectorQuery as LanceDbVectorQuery};
use lancedb::query::VectorQuery as LanceDbVectorQuery;
use napi::bindgen_prelude::*;
use napi_derive::napi;
#[napi(object)]
pub struct ColumnOrdering {
pub ascending: bool,
pub nulls_first: bool,
pub column_name: String,
}
impl From<ColumnOrdering> for LanceDbColumnOrdering {
fn from(value: ColumnOrdering) -> Self {
match (value.ascending, value.nulls_first) {
(true, true) => Self::asc_nulls_first(value.column_name),
(true, false) => Self::asc_nulls_last(value.column_name),
(false, true) => Self::desc_nulls_first(value.column_name),
(false, false) => Self::desc_nulls_last(value.column_name),
}
}
}
use crate::error::NapiErrorExt;
use crate::error::convert_error;
use crate::iterator::RecordBatchIterator;
use crate::rerankers::RerankHybridCallbackArgs;
use crate::rerankers::Reranker;
use crate::util::{parse_distance_type, schema_to_buffer};
fn bytes_to_arrow_array(data: Uint8Array, dtype: String) -> napi::Result<Arc<dyn Array>> {
let buf = arrow_buffer::Buffer::from(data.to_vec());
@@ -145,18 +128,6 @@ impl Query {
self.inner = self.inner.clone().with_row_id();
}
#[napi]
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
let ordering = ordering.map(|ordering| {
ordering
.into_iter()
.map(LanceDbColumnOrdering::from)
.collect()
});
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[napi(catch_unwind)]
pub async fn output_schema(&self) -> napi::Result<Buffer> {
let schema = self.inner.output_schema().await.default_error()?;
@@ -357,18 +328,6 @@ impl VectorQuery {
Ok(())
}
#[napi]
pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> napi::Result<()> {
let ordering = ordering.map(|ordering| {
ordering
.into_iter()
.map(LanceDbColumnOrdering::from)
.collect()
});
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[napi(catch_unwind)]
pub async fn output_schema(&self) -> napi::Result<Buffer> {
let schema = self.inner.output_schema().await.default_error()?;

View File

@@ -1,253 +0,0 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! NodeJS binding for the [`lancedb::data::scannable::Scannable`] trait.
//!
//! The JS side supplies a `getNextBatch(isStart)` callback that returns the
//! next Arrow `RecordBatch` encoded as a self-contained Arrow IPC Stream
//! message (schema message + record batch message + EOS marker) wrapped in a
//! `Buffer`, or `null` when the stream is exhausted. The Rust side parses
//! each buffer with `arrow_ipc::reader::StreamReader`, validates every
//! standalone batch stream against the declared schema, and yields decoded
//! `RecordBatch`es as a [`SendableRecordBatchStream`].
//!
//! `isStart` is `true` on the first `getNextBatch` call of each new
//! `scan_as_stream` and `false` thereafter. JS uses it to drop any cached
//! iterator and re-invoke its factory at scan boundaries, so retries
//! triggered by mid-stream failures restart at batch 0.
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_ipc::reader::StreamReader;
use arrow_schema::SchemaRef;
use futures::stream::once;
use lancedb::arrow::{SendableRecordBatchStream, SimpleRecordBatchStream};
use lancedb::data::scannable::Scannable as LanceScannable;
use lancedb::ipc::ipc_file_to_schema;
use lancedb::{Error, Result as LanceResult};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::ThreadsafeFunction;
use napi_derive::napi;
/// Threadsafe handle to the JS `getNextBatch` callback. The callback takes a
/// single boolean `isStart` (`true` on the first call of each new scan) and
/// returns a Promise that resolves to a `Buffer` containing one IPC Stream
/// message, or `null` at end-of-stream.
type GetNextBatchFn = ThreadsafeFunction<bool, Promise<Option<Buffer>>, bool, Status, false>;
/// A Rust-side view of a JS-constructed `Scannable`.
///
/// Held in JS as the return value of the `Scannable` class constructor. When
/// passed to a consumer that accepts `impl lancedb::data::scannable::Scannable`,
/// the consumer invokes `scan_as_stream()` to pull batches through the JS
/// callback.
#[napi]
pub struct NapiScannable {
schema: SchemaRef,
num_rows: Option<usize>,
rescannable: bool,
// `ThreadsafeFunction` is not `Clone`; wrap in `Arc` so the stream
// returned by `scan_as_stream` can own a handle independent of `self`.
get_next_batch: Arc<GetNextBatchFn>,
// Tracks whether a scan has already started; used to enforce one-shot
// semantics on non-rescannable sources.
scanned: bool,
}
#[napi]
impl NapiScannable {
/// Construct a new `NapiScannable`.
///
/// - `schema_buf` — Arrow IPC File buffer carrying only the schema (no batches).
/// - `num_rows` — optional row count hint; not validated against the stream.
/// - `rescannable` — whether `get_next_batch` may be re-driven after the
/// scan completes.
/// - `get_next_batch` -- JS callback that yields the next batch as an Arrow
/// IPC Stream message wrapped in a `Buffer`, or `null` at EOF. The
/// `isStart` argument is `true` on the first call of each new scan;
/// JS uses it to discard any cached iterator before pulling.
#[napi(constructor)]
pub fn new(
schema_buf: Buffer,
num_rows: Option<i64>,
rescannable: bool,
get_next_batch: Function<bool, Promise<Option<Buffer>>>,
) -> napi::Result<Self> {
let schema = ipc_file_to_schema(schema_buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Invalid schema buffer: {}", e)))?;
let num_rows = num_rows
.map(|n| {
usize::try_from(n)
.map_err(|_| napi::Error::from_reason("num_rows must be non-negative"))
})
.transpose()?;
let get_next_batch = Arc::new(get_next_batch.build_threadsafe_function().build()?);
Ok(Self {
schema,
num_rows,
rescannable,
get_next_batch,
scanned: false,
})
}
}
impl std::fmt::Debug for NapiScannable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NapiScannable")
.field("schema", &self.schema)
.field("num_rows", &self.num_rows)
.field("rescannable", &self.rescannable)
.finish()
}
}
impl LanceScannable for NapiScannable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
let schema = self.schema.clone();
// One-shot enforcement for non-rescannable sources: return a stream
// whose first item is an error.
if self.scanned && !self.rescannable {
let err_stream = once(async {
Err(Error::InvalidInput {
message: "Scannable has already been consumed (non-rescannable source)"
.to_string(),
})
});
return Box::pin(SimpleRecordBatchStream::new(err_stream, schema));
}
self.scanned = true;
let tsfn = Arc::clone(&self.get_next_batch);
let declared_schema = schema.clone();
// State threaded through the unfold. `is_first_pull` starts true so
// the first call into JS signals a new-scan boundary; JS uses it to
// reset any cached iterator before factory()-ing a fresh one.
let initial = State {
tsfn,
batch_index: 0,
declared_schema,
errored: false,
is_first_pull: true,
};
let stream = futures::stream::unfold(initial, |mut state| async move {
if state.errored {
return None;
}
// Pull the next IPC Stream buffer from JS. `is_first_pull` is
// consumed here and cleared so subsequent pulls continue the
// same scan rather than restarting it.
let is_start = state.is_first_pull;
state.is_first_pull = false;
let buf = match pull_next(&state.tsfn, is_start).await {
Ok(Some(buf)) => buf,
Ok(None) => return None,
Err(e) => {
state.errored = true;
return Some((Err(e), state));
}
};
match decode_one_batch(buf.as_ref(), &state.declared_schema) {
Ok(batch) => {
state.batch_index += 1;
Some((Ok(batch), state))
}
Err(e) => {
let tagged = Error::Runtime {
message: format!(
"[scannable/rust-bridge] failure at batch index {}: {}",
state.batch_index, e
),
};
state.errored = true;
Some((Err(tagged), state))
}
}
});
Box::pin(SimpleRecordBatchStream::new(stream, schema))
}
fn num_rows(&self) -> Option<usize> {
self.num_rows
}
fn rescannable(&self) -> bool {
self.rescannable
}
}
struct State {
tsfn: Arc<GetNextBatchFn>,
batch_index: usize,
declared_schema: SchemaRef,
errored: bool,
/// True for the very first pull of a new scan. Forwarded to JS so the
/// callback can drop any cached iterator and call its factory fresh,
/// which makes rescannable sources restart at batch 0 even when the
/// previous scan ended mid-stream.
is_first_pull: bool,
}
/// Invoke the JS callback and await its Promise. `is_start` is forwarded to
/// the JS side as the `isStart` argument so it can reset its iterator at the
/// scan boundary. Errors on the JS side surface here as rejected promises
/// and are tunneled back as `lancedb::Error::Runtime`.
async fn pull_next(tsfn: &GetNextBatchFn, is_start: bool) -> LanceResult<Option<Buffer>> {
let promise = tsfn
.call_async(is_start)
.await
.map_err(|e| Error::Runtime {
message: format!(
"[scannable/js-factory] napi error status={}, reason={}",
e.status, e.reason
),
})?;
promise.await.map_err(|e| Error::Runtime {
message: format!(
"[scannable/js-iterator] napi error status={}, reason={}",
e.status, e.reason
),
})
}
/// Decode one IPC Stream buffer (schema + batch + EOS) into a `RecordBatch`.
/// Each buffer is a standalone IPC stream, so every decoded stream schema must
/// match the one declared at construction.
fn decode_one_batch(buf: &[u8], declared: &SchemaRef) -> LanceResult<RecordBatch> {
let reader = StreamReader::try_new(Cursor::new(buf), None).map_err(|e| Error::Runtime {
message: format!("failed to open IPC stream reader: {}", e),
})?;
let actual = reader.schema();
if actual.as_ref() != declared.as_ref() {
return Err(Error::InvalidInput {
message: format!(
"declared schema does not match stream schema: declared={:?} actual={:?}",
declared, actual
),
});
}
let mut iter = reader;
let batch = iter
.next()
.ok_or_else(|| Error::Runtime {
message: "IPC stream contained schema but no record batch".to_string(),
})?
.map_err(|e| Error::Runtime {
message: format!("failed to decode record batch: {}", e),
})?;
Ok(batch)
}

View File

@@ -6,10 +6,9 @@ use std::collections::HashMap;
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
OptimizeAction, OptimizeOptions, Table as LanceDbTable, WriteOptions,
};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi;
use crate::error::NapiErrorExt;
@@ -68,15 +67,12 @@ impl Table {
schema_to_buffer(&schema)
}
#[napi(
catch_unwind,
ts_args_type = "buf: Buffer, mode: string, progressCallback?: (progress: WriteProgressInfo) => void"
)]
#[napi(catch_unwind)]
pub async fn add(
&self,
buf: Buffer,
mode: String,
progress_callback: Option<ProgressFn>,
skip_auto_cleanup: Option<bool>,
) -> napi::Result<AddResult> {
let batches = ipc_file_to_batches(buf.to_vec())
.map_err(|e| napi::Error::from_reason(format!("Failed to read IPC file: {}", e)))?;
@@ -101,16 +97,10 @@ impl Table {
return Err(napi::Error::from_reason(format!("Invalid mode: {}", mode)));
};
if let Some(tsfn) = progress_callback {
op = op.progress(move |p| {
// NonBlocking: dispatch onto the JS event loop without
// blocking the writer thread. With napi-rs's default
// unbounded queue, events are not dropped — a slow JS
// callback will just queue them.
tsfn.call(
WriteProgressInfo::from(p),
ThreadsafeFunctionCallMode::NonBlocking,
);
if skip_auto_cleanup.unwrap_or(false) {
op = op.write_options(WriteOptions {
skip_auto_cleanup: true,
..Default::default()
});
}
@@ -366,36 +356,6 @@ impl Table {
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn set_unenforced_primary_key(&self, columns: Vec<String>) -> napi::Result<()> {
self.inner_ref()?
.set_unenforced_primary_key(columns)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn set_lsm_write_spec(&self, spec: LsmWriteSpec) -> napi::Result<()> {
let native_spec = lancedb::table::LsmWriteSpec::try_from(spec)?;
self.inner_ref()?
.set_lsm_write_spec(native_spec)
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn unset_lsm_write_spec(&self) -> napi::Result<()> {
self.inner_ref()?
.unset_lsm_write_spec()
.await
.default_error()
}
#[napi(catch_unwind)]
pub async fn close_lsm_writers(&self) -> napi::Result<()> {
self.inner_ref()?.close_lsm_writers().await.default_error()
}
#[napi(catch_unwind)]
pub async fn version(&self) -> napi::Result<i64> {
self.inner_ref()?
@@ -590,63 +550,6 @@ impl From<lancedb::index::IndexConfig> for IndexConfig {
}
}
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `mergeInsert`.
///
/// `specType` must be `"bucket"`, `"identity"`, or `"unsharded"`. For
/// `"bucket"`, `column` and `numBuckets` are required; for `"identity"`,
/// `column` is required.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct LsmWriteSpec {
/// One of `"bucket"`, `"identity"`, or `"unsharded"`.
pub spec_type: String,
/// Bucket and identity variants: the sharding column.
pub column: Option<String>,
/// Bucket variant: the number of buckets, in `[1, 1024]`.
pub num_buckets: Option<u32>,
/// Names of indexes the MemWAL should keep up to date during writes.
pub maintained_indexes: Option<Vec<String>>,
/// Default `ShardWriter` configuration recorded in the MemWAL index.
pub writer_config_defaults: Option<HashMap<String, String>>,
}
impl TryFrom<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
type Error = napi::Error;
fn try_from(value: LsmWriteSpec) -> napi::Result<Self> {
let maintained = value.maintained_indexes.unwrap_or_default();
let writer_config_defaults = value.writer_config_defaults.unwrap_or_default();
let spec = match value.spec_type.as_str() {
"bucket" => {
let column = value.column.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec bucket requires `column`")
})?;
let num_buckets = value.num_buckets.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec bucket requires `numBuckets`")
})?;
Self::bucket(column, num_buckets)
}
"identity" => {
let column = value.column.ok_or_else(|| {
napi::Error::from_reason("LsmWriteSpec identity requires `column`")
})?;
Self::identity(column)
}
"unsharded" => Self::unsharded(),
other => {
return Err(napi::Error::from_reason(format!(
"LsmWriteSpec `specType` must be 'bucket', 'identity', or 'unsharded', got '{}'",
other
)));
}
};
Ok(spec
.with_maintained_indexes(maintained)
.with_writer_config_defaults(writer_config_defaults))
}
}
/// Statistics about a compaction operation.
#[napi(object)]
#[derive(Clone, Debug)]
@@ -681,44 +584,6 @@ pub struct OptimizeStats {
pub prune: RemovalStats,
}
/// Progress snapshot for a write operation, delivered to the JS callback
/// passed to `Table.add`.
#[napi(object)]
#[derive(Clone, Debug)]
pub struct WriteProgressInfo {
/// Number of rows written so far.
pub output_rows: i64,
/// Number of bytes written so far.
pub output_bytes: i64,
/// Total rows expected, if the input source reports it.
/// Always set on the final callback (where `done` is `true`).
pub total_rows: Option<i64>,
/// Wall-clock seconds since monitoring started.
pub elapsed_seconds: f64,
/// Number of parallel write tasks currently in flight.
pub active_tasks: i64,
/// Total number of parallel write tasks (the write parallelism).
pub total_tasks: i64,
/// `true` for the final callback; `false` otherwise.
pub done: bool,
}
impl From<&lancedb::table::write_progress::WriteProgress> for WriteProgressInfo {
fn from(p: &lancedb::table::write_progress::WriteProgress) -> Self {
Self {
output_rows: p.output_rows() as i64,
output_bytes: p.output_bytes() as i64,
total_rows: p.total_rows().map(|n| n as i64),
elapsed_seconds: p.elapsed().as_secs_f64(),
active_tasks: p.active_tasks() as i64,
total_tasks: p.total_tasks() as i64,
done: p.done(),
}
}
}
type ProgressFn = ThreadsafeFunction<WriteProgressInfo, (), WriteProgressInfo, Status, false>;
/// A definition of a column alteration. The alteration changes the column at
/// `path` to have the new name `name`, to be nullable if `nullable` is true,
/// and to have the data type `data_type`. At least one of `rename` or `nullable`
@@ -945,7 +810,6 @@ pub struct MergeResult {
pub num_updated_rows: i64,
pub num_deleted_rows: i64,
pub num_attempts: i64,
pub num_rows: i64,
}
impl From<lancedb::table::MergeResult> for MergeResult {
@@ -956,7 +820,6 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_updated_rows: value.num_updated_rows as i64,
num_deleted_rows: value.num_deleted_rows as i64,
num_attempts: value.num_attempts as i64,
num_rows: value.num_rows as i64,
}
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.33.1-beta.0"
current_version = "0.31.0-beta.11"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -4,26 +4,16 @@ code is in the `src/` directory and the Python bindings are in the `lancedb/` di
Common commands:
* Bootstrap dev env: `uv run --extra tests --extra dev maturin develop --extras tests,dev`
* Build: `make develop`
* Format: `make format`
* Lint: `make check`
* Fix lints: `make fix`
* Test: `uv run --extra tests pytest python/tests -vv --durations=10 -m "not slow and not s3_test"`
* Run specific test: `uv run --extra tests pytest python/tests/<test_file>.py::<test_name> -q`
* Doc test: `uv run --extra tests pytest --doctest-modules python/lancedb`
Use the uv-managed environment declared by `uv.lock` for Python validation. Do
not treat system `python`, global `pytest`, or missing editable-install errors
as final blockers; bootstrap or enter the uv environment instead. `make test`
and `make doctest` assume the development environment is already prepared.
* Test: `make test`
* Doc test: `make doctest`
Before committing changes, run lints and then formatting.
When you change the Rust code, PyO3 binding code, or see a missing/stale
`lancedb._lancedb`, recompile the Python bindings with
`uv run --extra tests --extra dev maturin develop --extras tests,dev` before
running tests.
When you change the Rust code, you will need to recompile the Python bindings: `make develop`.
When you export new types from Rust to Python, you must manually update `python/lancedb/_lancedb.pyi`
with the corresponding type hints. You can run `pyright` to check for type errors in the Python code.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.33.1-beta.0"
version = "0.31.0-beta.11"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"
@@ -19,7 +19,6 @@ arrow = { version = "58.0.0", features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
datafusion-common.workspace = true
lance-core.workspace = true
lance-namespace.workspace = true
lance-namespace-impls.workspace = true

View File

@@ -94,6 +94,7 @@ def connect(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -103,10 +104,6 @@ def connect(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -150,13 +147,6 @@ def connect(
>>> db = lancedb.connect("s3://my-bucket/lancedb",
... storage_options={"aws_access_key_id": "***"})
For tests and temporary data, use an in-memory database:
>>> db = lancedb.connect("memory://")
In-memory databases are not persisted. Tables are dropped when the last
connection or table handle referencing them is closed.
Connect to LanceDB cloud:
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
@@ -220,7 +210,6 @@ def connect(
request_thread_pool=request_thread_pool,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
**kwargs,
)
_check_s3_bucket_with_dots(str(uri), storage_options)
@@ -315,15 +304,6 @@ def deserialize_conn(
manifest_enabled=parsed.get("manifest_enabled", False),
namespace_client_properties=parsed.get("namespace_client_properties"),
)
elif connection_type == "remote":
return RemoteDBConnection(
parsed["db_url"],
parsed["api_key"],
parsed.get("region", "us-east-1"),
host_override=parsed.get("host_override"),
client_config=parsed.get("client_config"),
storage_options=storage_options,
)
else:
raise ValueError(f"Unknown connection_type: {connection_type}")
@@ -356,6 +336,7 @@ async def connect_async(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -365,10 +346,6 @@ async def connect_async(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -401,8 +378,6 @@ async def connect_async(
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
... storage_options={
... "aws_access_key_id": "***"})
... # For tests and temporary data, use an in-memory database
... db = await lancedb.connect_async("memory://")
... # Connect to LanceDB cloud
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
... client_config={

View File

@@ -51,7 +51,7 @@ class PyExpr:
def to_sql(self) -> str: ...
def expr_col(name: str) -> PyExpr: ...
def expr_lit(value: Union[bool, int, float, str, bytes]) -> PyExpr: ...
def expr_lit(value: Union[bool, int, float, str]) -> PyExpr: ...
def expr_func(name: str, args: List[PyExpr]) -> PyExpr: ...
class Session:
@@ -217,11 +217,6 @@ class Table:
async def uri(self) -> str: ...
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
async def _table_reopen_state(self) -> Dict[str, Any]: ...
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
async def unset_lsm_write_spec(self) -> None: ...
async def close_lsm_writers(self) -> None: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...
@@ -260,11 +255,6 @@ class RecordBatchStream:
def __aiter__(self) -> "RecordBatchStream": ...
async def __anext__(self) -> pa.RecordBatch: ...
class ColumnOrdering(TypedDict):
column_name: str
ascending: bool
nulls_first: bool
class Query:
def where(self, filter: str): ...
def where_expr(self, expr: PyExpr): ...
@@ -278,7 +268,6 @@ class Query:
def postfilter(self): ...
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
def nearest_to_text(self, query: dict) -> FTSQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
async def output_schema(self) -> pa.Schema: ...
async def execute(
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
@@ -307,7 +296,6 @@ class FTSQuery:
def get_query(self) -> str: ...
def add_query_vector(self, query_vec: pa.Array) -> None: ...
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
async def output_schema(self) -> pa.Schema: ...
async def execute(
self, max_batch_length: Optional[int], timeout: Optional[timedelta]
@@ -333,7 +321,6 @@ class VectorQuery:
def maximum_nprobes(self, maximum_nprobes: int): ...
def bypass_vector_index(self): ...
def nearest_to_text(self, query: dict) -> HybridQuery: ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
def to_query_request(self) -> PyQueryRequest: ...
class HybridQuery:
@@ -352,7 +339,6 @@ class HybridQuery:
def minimum_nprobes(self, minimum_nprobes: int): ...
def maximum_nprobes(self, maximum_nprobes: int): ...
def bypass_vector_index(self): ...
def order_by(self, ordering: Optional[List[ColumnOrdering]]): ...
def to_vector_query(self) -> VectorQuery: ...
def to_fts_query(self) -> FTSQuery: ...
def get_limit(self) -> int: ...
@@ -382,7 +368,6 @@ class PyQueryRequest:
bypass_vector_index: Optional[bool]
postfilter: Optional[bool]
norm: Optional[str]
order_by: Optional[List[ColumnOrdering]]
class CompactionStats:
fragments_removed: int
@@ -422,38 +407,6 @@ class MergeResult:
num_inserted_rows: int
num_deleted_rows: int
num_attempts: int
num_rows: int
class LsmWriteSpec:
"""Specification selecting Lance's MemWAL LSM-style write path for
`merge_insert`."""
@staticmethod
def bucket(column: str, num_buckets: int) -> "LsmWriteSpec": ...
@staticmethod
def identity(column: str) -> "LsmWriteSpec": ...
@staticmethod
def unsharded() -> "LsmWriteSpec": ...
def with_maintained_indexes(self, indexes: List[str]) -> "LsmWriteSpec":
"""Return a copy of this spec asking the MemWAL to keep the named
indexes up to date as rows are appended."""
...
def with_writer_config_defaults(self, defaults: Dict[str, str]) -> "LsmWriteSpec":
"""Return a copy of this spec recording the given default
`ShardWriter` configuration in the MemWAL index."""
...
@property
def spec_type(self) -> str:
"""One of 'bucket', 'identity', or 'unsharded'."""
...
@property
def column(self) -> Optional[str]: ...
@property
def num_buckets(self) -> Optional[int]: ...
@property
def maintained_indexes(self) -> List[str]: ...
@property
def writer_config_defaults(self) -> Dict[str, str]: ...
class AddColumnsResult:
version: int

View File

@@ -8,17 +8,7 @@ from abc import abstractmethod
from datetime import timedelta
from pathlib import Path
import sys
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Iterable,
List,
Literal,
Optional,
Union,
)
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
if sys.version_info >= (3, 12):
from typing import override
@@ -323,7 +313,7 @@ class DBConnection(EnforceOverrides):
>>> data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
>>> db.create_table("my_table", data)
LanceTable(name='my_table', ...)
LanceTable(name='my_table', version=1, ...)
>>> db["my_table"].head()
pyarrow.Table
vector: fixed_size_list<item: float>[2]
@@ -344,7 +334,7 @@ class DBConnection(EnforceOverrides):
... "long": [-122.7, -74.1]
... })
>>> db.create_table("table2", data)
LanceTable(name='table2', ...)
LanceTable(name='table2', version=1, ...)
>>> db["table2"].head()
pyarrow.Table
vector: fixed_size_list<item: float>[2]
@@ -367,7 +357,7 @@ class DBConnection(EnforceOverrides):
... pa.field("long", pa.float32())
... ])
>>> db.create_table("table3", data, schema = custom_schema)
LanceTable(name='table3', ...)
LanceTable(name='table3', version=1, ...)
>>> db["table3"].head()
pyarrow.Table
vector: fixed_size_list<item: float>[2]
@@ -401,7 +391,7 @@ class DBConnection(EnforceOverrides):
... pa.field("price", pa.float32()),
... ])
>>> db.create_table("table4", make_batches(), schema=schema)
LanceTable(name='table4', ...)
LanceTable(name='table4', version=1, ...)
"""
raise NotImplementedError
@@ -578,15 +568,15 @@ class LanceDBConnection(DBConnection):
>>> db = lancedb.connect("./.lancedb")
>>> db.create_table("my_table", data=[{"vector": [1.1, 1.2], "b": 2},
... {"vector": [0.5, 1.3], "b": 4}])
LanceTable(name='my_table', ...)
LanceTable(name='my_table', version=1, ...)
>>> db.create_table("another_table", data=[{"vector": [0.4, 0.4], "b": 6}])
LanceTable(name='another_table', ...)
LanceTable(name='another_table', version=1, ...)
>>> sorted(db.table_names())
['another_table', 'my_table']
>>> len(db)
2
>>> db["my_table"]
LanceTable(name='my_table', ...)
LanceTable(name='my_table', version=1, ...)
>>> "my_table" in db
True
>>> db.drop_table("my_table")
@@ -857,20 +847,11 @@ class LanceDBConnection(DBConnection):
)
)
def _all_table_names(self) -> Generator[str, None, None]:
page_token = None
while True:
response = self.list_tables(page_token=page_token)
yield from response.tables
page_token = response.page_token
if not page_token:
return
def __len__(self) -> int:
return sum(1 for _ in self._all_table_names())
return len(self.table_names())
def __contains__(self, name: str) -> bool:
return name in self._all_table_names()
return name in self.table_names()
@override
def create_table(

View File

@@ -63,7 +63,7 @@ def _coerce(value: "ExprLike") -> "Expr":
# Type alias used in annotations.
ExprLike = Union["Expr", bool, int, float, str, bytes]
ExprLike = Union["Expr", bool, int, float, str]
class Expr:
@@ -261,13 +261,13 @@ def col(name: str) -> Expr:
return Expr(expr_col(name))
def lit(value: Union[bool, int, float, str, bytes]) -> Expr:
def lit(value: Union[bool, int, float, str]) -> Expr:
"""Create a literal (constant) value expression.
Parameters
----------
value:
A Python ``bool``, ``int``, ``float``, ``str``, or ``bytes``.
A Python ``bool``, ``int``, ``float``, or ``str``.
Examples
--------

View File

@@ -281,9 +281,6 @@ class HnswPq:
m: int = 20
ef_construction: int = 300
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -389,9 +386,6 @@ class HnswSq:
m: int = 20
ef_construction: int = 300
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -585,9 +579,6 @@ class IvfFlat:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -618,9 +609,6 @@ class IvfSq:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -751,9 +739,6 @@ class IvfPq:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -807,9 +792,6 @@ class IvfRq:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
__all__ = [

View File

@@ -34,8 +34,6 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._use_lsm_write = None
self._validate_single_shard = None
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -98,46 +96,6 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def use_lsm_write(self, use_lsm_write: bool) -> LanceMergeInsertBuilder:
"""
Controls whether the merge uses the MemWAL LSM write path.
By default (unset), a `merge_insert` on a table with an LSM write spec
is routed through Lance's MemWAL shard writer, and a table without one
uses the standard path. Pass `False` to force the standard path even
when a spec is set. Pass `True` to require a spec — `merge_insert`
raises an error if none is installed.
Parameters
----------
use_lsm_write: bool
Whether to use the LSM write path.
"""
self._use_lsm_write = use_lsm_write
return self
def validate_single_shard(
self, validate_single_shard: bool
) -> LanceMergeInsertBuilder:
"""
Controls how an LSM merge checks that its input targets a single shard.
When a table has an LSM write spec, every row in a `merge_insert` call
must route to the same shard. When `True` (the default), every row is
inspected to verify this. When `False`, only the first row is inspected
and the shard it routes to is used for the whole input — a faster path
for callers that have already pre-sharded their input.
Has no effect on tables without an LSM write spec.
Parameters
----------
validate_single_shard: bool
Whether to check every row routes to one shard. Defaults to `True`.
"""
self._validate_single_shard = validate_single_shard
return self
def execute(
self,
new_data: DATA,

View File

@@ -6,44 +6,22 @@
from typing import Optional
_CREATE_NAMESPACE_MODES = frozenset({"create", "exist_ok", "overwrite"})
_DROP_NAMESPACE_MODES = frozenset({"SKIP", "FAIL"})
_DROP_NAMESPACE_BEHAVIORS = frozenset({"RESTRICT", "CASCADE"})
def _normalize_create_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize create namespace mode to lowercase (API expects lowercase)."""
if mode is None:
return None
normalized = mode.lower()
if normalized not in _CREATE_NAMESPACE_MODES:
raise ValueError(
f"Invalid create namespace mode {mode!r}: "
f"expected one of 'create', 'exist_ok', 'overwrite'"
)
return normalized
return mode.lower()
def _normalize_drop_namespace_mode(mode: Optional[str]) -> Optional[str]:
"""Normalize drop namespace mode to uppercase (API expects uppercase)."""
if mode is None:
return None
normalized = mode.upper()
if normalized not in _DROP_NAMESPACE_MODES:
raise ValueError(
f"Invalid drop namespace mode {mode!r}: expected one of 'skip', 'fail'"
)
return normalized
return mode.upper()
def _normalize_drop_namespace_behavior(behavior: Optional[str]) -> Optional[str]:
"""Normalize drop namespace behavior to uppercase (API expects uppercase)."""
if behavior is None:
return None
normalized = behavior.upper()
if normalized not in _DROP_NAMESPACE_BEHAVIORS:
raise ValueError(
f"Invalid drop namespace behavior {behavior!r}: "
f"expected one of 'restrict', 'cascade'"
)
return normalized
return behavior.upper()

View File

@@ -3,13 +3,12 @@
import copy
import json
import os
from deprecation import deprecated
import pyarrow as pa
from ._lancedb import async_permutation_builder, PermutationReader
from .table import LanceTable, Table
from .table import LanceTable
from .background_loop import LOOP
from .util import batch_to_tensor, batch_to_tensor_rows
from typing import Any, Callable, Iterator, Literal, Optional, TYPE_CHECKING, Union
@@ -355,41 +354,6 @@ class Transforms:
DEFAULT_BATCH_SIZE = 100
def _table_to_pickle_state(table: Table) -> dict[str, Any]:
from .remote.table import RemoteTable
if isinstance(table, LanceTable) and table._conn.uri.startswith("memory://"):
return {
"kind": "memory",
"name": table.name,
"data": table.to_arrow(),
}
if isinstance(table, (LanceTable, RemoteTable)):
return {
"kind": "table",
"table": table,
}
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
def _table_from_pickle_state(state: dict[str, Any]) -> Table:
from . import connect
kind = state["kind"]
if kind == "table":
return state["table"]
if kind == "remote":
return state["table"]
if kind == "memory":
return connect("memory://").create_table(state["name"], state["data"])
if kind == "local":
db = connect(state["uri"], storage_options=state["storage_options"])
return db.open_table(state["name"], namespace_path=state["namespace"] or None)
raise ValueError(f"Unknown table pickle state kind: {kind}")
class Permutation:
"""
A Permutation is a view of a dataset that can be used as input to model training
@@ -405,15 +369,15 @@ class Permutation:
def __init__(
self,
base_table: Table,
permutation_table: Optional[Table],
base_table: LanceTable,
permutation_table: Optional[LanceTable],
split: int,
selection: dict[str, str],
batch_size: int,
transform_fn: Callable[pa.RecordBatch, Any],
offset: Optional[int] = None,
limit: Optional[int] = None,
connection_factory: Optional[Callable[[str], Table]] = None,
connection_factory: Optional[Callable[[str], LanceTable]] = None,
_reader: Optional[PermutationReader] = None,
):
"""
@@ -433,7 +397,6 @@ class Permutation:
if _reader is None:
_reader = LOOP.run(self._build_reader())
self.reader: PermutationReader = _reader
self._pid = os.getpid()
async def _build_reader(self) -> PermutationReader:
reader = await PermutationReader.from_tables(
@@ -465,25 +428,29 @@ class Permutation:
return new
def with_connection_factory(
self, connection_factory: Callable[[str], Table]
self, connection_factory: Callable[[str], LanceTable]
) -> "Permutation":
"""
Creates a new permutation that will use ``connection_factory`` to reopen
the base table when this permutation is unpickled in a worker process.
The factory is a callable that takes a single argument, the base table
name, and returns a LanceDB table. It must be picklable; the worker
The factory is a callable that takes a single argument the base table
name and returns a [LanceTable]. It must be picklable; the worker
will pickle it via standard ``pickle`` and call it to recover the base
table. Picklable callables in practice means top-level (module-level)
functions, ``functools.partial`` of such functions, or instances of
picklable classes implementing ``__call__``. Lambdas and closures over
local variables don't pickle with the default protocol.
A factory is optional for normal local and remote LanceDB connections:
if not set, ``__getstate__`` captures the table's own picklable reopen
state. Use a factory when that default state is not enough, for example
when credentials should be loaded from the worker environment instead
of being embedded in the pickle.
Setting a factory is necessary when the URI alone is not enough to
re-open the connection — most importantly for LanceDB Cloud (``db://``)
connections, where ``api_key`` and ``region`` aren't recoverable from
the connection object after construction.
For local file or cloud-storage paths the factory is optional: if not
set, ``__getstate__`` falls back to capturing
``(uri, storage_options, namespace_path)`` and re-opening via
``lancedb.connect(uri, storage_options=...)``.
Examples
--------
@@ -541,7 +508,7 @@ class Permutation:
return new
@classmethod
def identity(cls, table: Table) -> "Permutation":
def identity(cls, table: LanceTable) -> "Permutation":
"""
Creates an identity permutation for the given table.
"""
@@ -550,8 +517,8 @@ class Permutation:
@classmethod
def from_tables(
cls,
base_table: Table,
permutation_table: Optional[Table] = None,
base_table: LanceTable,
permutation_table: Optional[LanceTable] = None,
split: Optional[Union[str, int]] = None,
) -> "Permutation":
"""
@@ -627,10 +594,11 @@ class Permutation:
The base table is captured either via a user-supplied
``connection_factory`` (see [with_connection_factory]) or, as a
fallback, by the table's own picklable reopen state. The permutation
table is captured as a pyarrow Table (which pickles via Arrow IPC
natively). The reader is dropped from the wire format and rebuilt
lazily on first use.
fallback, by introspecting ``(uri, storage_options, namespace_path)``
on the connection. The permutation table — always an in-memory
LanceDB table — is captured as a pyarrow Table (which pickles via
Arrow IPC natively). The reader is dropped from the wire format;
``__setstate__`` rebuilds it from the restored tables.
"""
permutation_data: Optional[pa.Table] = None
if self.permutation_table is not None:
@@ -654,9 +622,39 @@ class Permutation:
# namespace from the existing connection.
return common
# URI-introspection fallback: only viable for native (OSS) connections
# where (uri, storage_options) is enough to reopen. Remote / cloud
# connections don't expose recoverable api_key / region — those users
# must call with_connection_factory().
try:
base_uri = self.base_table._conn.uri
storage_options = self.base_table._conn.storage_options
except AttributeError as e:
raise ValueError(
"Cannot pickle this Permutation: the base table's connection "
"does not expose a uri/storage_options, which usually means it "
"is a remote (LanceDB Cloud) connection. Call "
"Permutation.with_connection_factory(...) first to provide a "
"picklable callable that re-opens the base table from a worker "
"process."
) from e
if base_uri.startswith("memory://"):
# In-memory base tables don't exist in any worker process by
# default, so dump the entire base table into the pickle. This
# can be expensive for large datasets — users with large
# in-memory base tables should either persist them or set a
# connection_factory.
return {
**common,
"base_table_data": self.base_table.to_arrow(),
}
return {
**common,
"base_table_state": _table_to_pickle_state(self.base_table),
"base_table_uri": base_uri,
"base_table_namespace": self.base_table._namespace_path,
"base_table_storage_options": storage_options,
}
def __setstate__(self, state: dict[str, Any]) -> None:
@@ -665,8 +663,6 @@ class Permutation:
connection_factory = state["connection_factory"]
if connection_factory is not None:
base_table = connection_factory(state["base_table_name"])
elif "base_table_state" in state:
base_table = _table_from_pickle_state(state["base_table_state"])
elif "base_table_data" in state:
# In-memory base table inlined into the pickle; rebuild the same
# way we rebuild the in-memory permutation table.
@@ -684,7 +680,7 @@ class Permutation:
namespace_path=state["base_table_namespace"] or None,
)
permutation_table: Optional[Table] = None
permutation_table: Optional[LanceTable] = None
if state["permutation_data"] is not None:
mem_db = connect("memory://")
permutation_table = mem_db.create_table(
@@ -700,28 +696,10 @@ class Permutation:
self.offset = state["offset"]
self.limit = state["limit"]
self.connection_factory = connection_factory
self.reader = None
self._pid = None
def _ensure_open(self) -> None:
pid = os.getpid()
if self.reader is not None and getattr(self, "_pid", None) == pid:
return
# The reader owns Rust-side table handles. Rebuild it after unpickle or
# fork even though the Python table wrappers reopen themselves.
if hasattr(self.base_table, "_ensure_open"):
self.base_table._ensure_open()
if self.permutation_table is not None and hasattr(
self.permutation_table, "_ensure_open"
):
self.permutation_table._ensure_open()
self.reader = LOOP.run(self._build_reader())
self._pid = pid
@property
def schema(self) -> pa.Schema:
self._ensure_open()
async def do_output_schema():
return await self.reader.output_schema(self.selection)
@@ -739,7 +717,6 @@ class Permutation:
"""
The number of rows in the permutation
"""
self._ensure_open()
return self.reader.count_rows()
@property
@@ -898,7 +875,6 @@ class Permutation:
If skip_last_batch is True, the last batch will be skipped if it is not a
multiple of batch_size.
"""
self._ensure_open()
async def get_iter():
return await self.reader.read(self.selection, batch_size=batch_size)
@@ -992,33 +968,22 @@ class Permutation:
new.transform_fn = transform
return new
def take_offsets(self, offsets: list[int]) -> Any:
"""
Take rows from the permutation by offset
The returned value is passed through the permutation's current transform,
so `with_format` and `with_transform` affect this method in the same way
they affect iteration.
"""
self._ensure_open()
async def do_take_offsets():
return await self.reader.take_offsets(offsets, selection=self.selection)
batch = LOOP.run(do_take_offsets())
return self.transform_fn(batch)
def __getitem__(self, index: int) -> Any:
"""
Returns a single row from the permutation by offset
"""
return self.take_offsets([index])
return self.__getitems__([index])
def __getitems__(self, indices: list[int]) -> Any:
"""
Returns rows from the permutation by offset
"""
return self.take_offsets(indices)
async def do_getitems():
return await self.reader.take_offsets(indices, selection=self.selection)
batch = LOOP.run(do_getitems())
return self.transform_fn(batch)
@deprecated(details="Use with_skip instead")
def skip(self, skip: int) -> "Permutation":
@@ -1036,11 +1001,9 @@ class Permutation:
"""
Skip the first `skip` rows of the permutation
"""
self._ensure_open()
new = copy.copy(self)
new.offset = skip
new.reader = LOOP.run(new._build_reader())
new._pid = os.getpid()
return new
@deprecated(details="Use with_take instead")
@@ -1059,11 +1022,9 @@ class Permutation:
"""
Limit the permutation to `limit` rows (following any `skip`)
"""
self._ensure_open()
new = copy.copy(self)
new.limit = limit
new.reader = LOOP.run(new._build_reader())
new._pid = os.getpid()
return new
@deprecated(details="Use with_repeat instead")

View File

@@ -3,14 +3,12 @@
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from enum import Enum
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Literal,
@@ -19,40 +17,41 @@ from typing import (
Type,
TypeVar,
Union,
Any,
)
import asyncio
import deprecation
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pydantic
from typing_extensions import Annotated
from lancedb._lancedb import fts_query_to_json
from lancedb.background_loop import LOOP
from lancedb.pydantic import PYDANTIC_VERSION
from lancedb.background_loop import LOOP
from . import __version__
from .arrow import AsyncRecordBatchReader
from .dependencies import pandas as pd
from .expr import Expr
from .rerankers.base import Reranker
from .rerankers.rrf import RRFReranker
from .rerankers.util import check_reranker_result
from .util import flatten_columns
from .expr import Expr
from lancedb._lancedb import fts_query_to_json
from typing_extensions import Annotated
if TYPE_CHECKING:
import sys
import PIL
import polars as pl
from ._lancedb import Query as LanceQuery
from ._lancedb import FTSQuery as LanceFTSQuery
from ._lancedb import HybridQuery as LanceHybridQuery
from ._lancedb import PyQueryRequest
from ._lancedb import Query as LanceQuery
from ._lancedb import TakeQuery as LanceTakeQuery
from ._lancedb import VectorQuery as LanceVectorQuery
from ._lancedb import TakeQuery as LanceTakeQuery
from ._lancedb import PyQueryRequest
from .common import VEC
from .pydantic import LanceModel
from .table import Table
@@ -93,12 +92,6 @@ def ensure_vector_query(
return val
class ColumnOrdering(pydantic.BaseModel):
column_name: str
ascending: bool = True
nulls_first: bool = False
class FullTextQueryType(str, Enum):
MATCH = "match"
MATCH_PHRASE = "match_phrase"
@@ -511,8 +504,6 @@ class Query(pydantic.BaseModel):
# Bypass the vector index and use a brute force search
bypass_vector_index: Optional[bool] = None
order_by: Optional[List[ColumnOrdering]] = None
@classmethod
def from_inner(cls, req: PyQueryRequest) -> Self:
query = cls()
@@ -533,8 +524,6 @@ class Query(pydantic.BaseModel):
query.refine_factor = req.refine_factor
query.bypass_vector_index = req.bypass_vector_index
query.postfilter = req.postfilter
if req.order_by is not None:
query.order_by = [ColumnOrdering(**o) for o in req.order_by]
if req.full_text_search is not None:
query.full_text_query = FullTextSearchQuery(
columns=None,
@@ -583,22 +572,9 @@ class LanceQueryBuilder(ABC):
If "auto", the query type is inferred based on the query.
vector_column_name: str
The name of the vector column to use for vector search.
ordering_field_name: Optional[str]
.. deprecated:: 0.27.0
Use ``order_by()`` method instead.
fts_columns: Optional[Union[str, List[str]]]
The columns to search in for full text search.
fast_search: bool
Skip flat search of unindexed data.
"""
if ordering_field_name is not None:
import warnings
warnings.warn(
"ordering_field_name is deprecated, use .order_by() method instead.",
DeprecationWarning,
stacklevel=2,
)
# Check hybrid search first as it supports empty query pattern
if query_type == "hybrid":
# hybrid fts and vector query
@@ -695,7 +671,6 @@ class LanceQueryBuilder(ABC):
self._text = None
self._ef = None
self._bypass_vector_index = None
self._order_by = None
@deprecation.deprecated(
deprecated_in="0.3.1",
@@ -719,7 +694,6 @@ class LanceQueryBuilder(ABC):
flatten: Optional[Union[int, bool]] = None,
*,
timeout: Optional[timedelta] = None,
**kwargs,
) -> "pd.DataFrame":
"""
Execute the query and return the results as a pandas DataFrame.
@@ -737,12 +711,9 @@ class LanceQueryBuilder(ABC):
timeout: Optional[timedelta]
The maximum time to wait for the query to complete.
If None, wait indefinitely.
**kwargs
Forwarded to pyarrow.Table.to_pandas after query execution and
optional flattening.
"""
tbl = flatten_columns(self.to_arrow(timeout=timeout), flatten)
return tbl.to_pandas(**kwargs)
return tbl.to_pandas()
@abstractmethod
def to_arrow(self, *, timeout: Optional[timedelta] = None) -> pa.Table:
@@ -976,24 +947,6 @@ class LanceQueryBuilder(ABC):
""" # noqa: E501
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
"""
Set the ordering for the results.
Parameters
----------
ordering: Optional[List[ColumnOrdering]]
The ordering to use for the results. If None, then the default ordering
will be used.
Returns
-------
LanceQueryBuilder
The LanceQueryBuilder object.
"""
self._order_by = ordering
return self
def analyze_plan(self) -> str:
"""
Run the query and return its execution plan with runtime metrics.
@@ -1361,7 +1314,6 @@ class LanceVectorQueryBuilder(LanceQueryBuilder):
fast_search=self._fast_search,
ef=self._ef,
bypass_vector_index=self._bypass_vector_index,
order_by=self._order_by,
)
def to_batches(
@@ -1513,9 +1465,7 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
super().__init__(table)
self._query = query
self._phrase_query = False
# Deprecated compatibility parameter. Native FTS ordering is now
# configured through order_by(); LanceQueryBuilder.create emits the warning.
_ = ordering_field_name
self.ordering_field_name = ordering_field_name
self._reranker = None
self._fast_search = fast_search
if isinstance(fts_columns, str):
@@ -1564,7 +1514,6 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
),
offset=self._offset,
fast_search=self._fast_search,
order_by=self._order_by,
)
def output_schema(self) -> pa.Schema:
@@ -1630,7 +1579,6 @@ class LanceEmptyQueryBuilder(LanceQueryBuilder):
limit=self._limit,
with_row_id=self._with_row_id,
offset=self._offset,
order_by=self._order_by,
)
def output_schema(self) -> pa.Schema:
@@ -2357,7 +2305,6 @@ class AsyncQueryBase(object):
self,
flatten: Optional[Union[int, bool]] = None,
timeout: Optional[timedelta] = None,
**kwargs,
) -> "pd.DataFrame":
"""
Execute the query and collect the results into a pandas DataFrame.
@@ -2390,13 +2337,10 @@ class AsyncQueryBase(object):
The maximum time to wait for the query to complete.
If not specified, no timeout is applied. If the query does not
complete within the specified time, an error will be raised.
**kwargs
Forwarded to pyarrow.Table.to_pandas after query execution and
optional flattening.
"""
return (
flatten_columns(await self.to_arrow(timeout=timeout), flatten)
).to_pandas(**kwargs)
).to_pandas()
async def to_polars(
self,
@@ -2558,27 +2502,6 @@ class AsyncStandardQuery(AsyncQueryBase):
self._inner.offset(offset)
return self
def order_by(self, ordering: Optional[List[ColumnOrdering]]) -> Self:
"""
Set the ordering for the results.
Parameters
----------
ordering: Optional[List[ColumnOrdering]]
The ordering to use for the results. If None, then the default ordering
will be used.
"""
if ordering is None:
self._inner.order_by(None)
else:
self._inner.order_by(
[
o.model_dump() if hasattr(o, "model_dump") else o.dict()
for o in ordering
]
)
return self
def fast_search(self) -> Self:
"""
Skip searching un-indexed data.
@@ -3349,18 +3272,16 @@ class BaseQueryBuilder(object):
If not specified, no timeout is applied. If the query does not
complete within the specified time, an error will be raised.
"""
async_reader = LOOP.run(
self._inner.to_batches(max_batch_length=max_batch_length, timeout=timeout)
)
async_iter = LOOP.run(self._inner.execute(max_batch_length, timeout))
def iter_sync():
try:
while True:
yield LOOP.run(async_reader.__anext__())
yield LOOP.run(async_iter.__anext__())
except StopAsyncIteration:
return
return pa.RecordBatchReader.from_batches(async_reader.schema, iter_sync())
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
def to_arrow(self, timeout: Optional[timedelta] = None) -> pa.Table:
"""
@@ -3400,7 +3321,6 @@ class BaseQueryBuilder(object):
self,
flatten: Optional[Union[int, bool]] = None,
timeout: Optional[timedelta] = None,
**kwargs,
) -> "pd.DataFrame":
"""
Execute the query and collect the results into a pandas DataFrame.
@@ -3433,11 +3353,8 @@ class BaseQueryBuilder(object):
The maximum time to wait for the query to complete.
If not specified, no timeout is applied. If the query does not
complete within the specified time, an error will be raised.
**kwargs
Forwarded to pyarrow.Table.to_pandas after query execution and
optional flattening.
"""
return LOOP.run(self._inner.to_pandas(flatten, timeout, **kwargs))
return LOOP.run(self._inner.to_pandas(flatten, timeout))
def to_polars(
self,

View File

@@ -3,7 +3,6 @@
from datetime import timedelta
import json
import logging
from concurrent.futures import ThreadPoolExecutor
import sys
@@ -18,7 +17,7 @@ else:
# Remove this import to fix circular dependency
# from lancedb import connect_async
from lancedb.remote import ClientConfig, RetryConfig, TimeoutConfig, TlsConfig
from lancedb.remote import ClientConfig
import pyarrow as pa
from ..common import DATA
@@ -37,64 +36,6 @@ from ..table import Table
from ..util import validate_table_name
def _duration_seconds(value: Optional[timedelta]) -> Optional[float]:
return value.total_seconds() if value is not None else None
def _timeout_config_to_dict(
config: Optional[TimeoutConfig],
) -> Optional[dict[str, Any]]:
if config is None:
return None
return {
"timeout": _duration_seconds(config.timeout),
"connect_timeout": _duration_seconds(config.connect_timeout),
"read_timeout": _duration_seconds(config.read_timeout),
"pool_idle_timeout": _duration_seconds(config.pool_idle_timeout),
}
def _retry_config_to_dict(config: RetryConfig) -> dict[str, Any]:
return {
"retries": config.retries,
"connect_retries": config.connect_retries,
"read_retries": config.read_retries,
"backoff_factor": config.backoff_factor,
"backoff_jitter": config.backoff_jitter,
"statuses": config.statuses,
}
def _tls_config_to_dict(config: Optional[TlsConfig]) -> Optional[dict[str, Any]]:
if config is None:
return None
return {
"cert_file": config.cert_file,
"key_file": config.key_file,
"ssl_ca_cert": config.ssl_ca_cert,
"assert_hostname": config.assert_hostname,
}
def _client_config_to_dict(config: ClientConfig) -> dict[str, Any]:
if config.header_provider is not None:
raise ValueError(
"Cannot serialize a remote connection with a header_provider. "
"Use static api_key/extra_headers or provide a worker-side "
"connection factory instead."
)
return {
"user_agent": config.user_agent,
"retry_config": _retry_config_to_dict(config.retry_config),
"timeout_config": _timeout_config_to_dict(config.timeout_config),
"extra_headers": config.extra_headers,
"id_delimiter": config.id_delimiter,
"tls_config": _tls_config_to_dict(config.tls_config),
"header_provider": None,
"user_id": config.user_id,
}
class RemoteDBConnection(DBConnection):
"""A connection to a remote LanceDB database."""
@@ -109,7 +50,6 @@ class RemoteDBConnection(DBConnection):
connection_timeout: Optional[float] = None,
read_timeout: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
read_consistency_interval: Optional[timedelta] = None,
):
"""Connect to a remote LanceDB database."""
if isinstance(client_config, dict):
@@ -148,11 +88,6 @@ class RemoteDBConnection(DBConnection):
parsed = urlparse(db_url)
if parsed.scheme != "db":
raise ValueError(f"Invalid scheme: {parsed.scheme}, only accepts db://")
self.db_url = db_url
self.api_key = api_key
self.region = region
self.host_override = host_override
self.storage_options = storage_options
self.db_name = parsed.netloc
self.client_config = client_config
@@ -168,27 +103,12 @@ class RemoteDBConnection(DBConnection):
host_override=host_override,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
)
)
def __repr__(self) -> str:
return f"RemoteConnect(name={self.db_name})"
@override
def serialize(self) -> str:
return json.dumps(
{
"connection_type": "remote",
"db_url": self.db_url,
"api_key": self.api_key,
"region": self.region,
"host_override": self.host_override,
"client_config": _client_config_to_dict(self.client_config),
"storage_options": self.storage_options,
}
)
@override
def list_namespaces(
self,
@@ -409,12 +329,7 @@ class RemoteDBConnection(DBConnection):
)
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=namespace_path,
)
return RemoteTable(table, self.db_name)
def clone_table(
self,
@@ -463,12 +378,7 @@ class RemoteDBConnection(DBConnection):
is_shallow=is_shallow,
)
)
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=target_namespace_path,
)
return RemoteTable(table, self.db_name)
@override
def create_table(
@@ -613,12 +523,7 @@ class RemoteDBConnection(DBConnection):
fill_value=fill_value,
)
)
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=namespace_path,
)
return RemoteTable(table, self.db_name)
@override
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):

View File

@@ -2,25 +2,11 @@
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from datetime import timedelta
import deprecation
import logging
from functools import cached_property
import os
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Union,
Literal,
overload,
)
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, Literal
import warnings
from lancedb import __version__
from lancedb._lancedb import (
AddColumnsResult,
AddResult,
@@ -28,7 +14,6 @@ from lancedb._lancedb import (
DeleteResult,
DropColumnsResult,
IndexConfig,
LsmWriteSpec,
MergeResult,
UpdateResult,
)
@@ -46,7 +31,6 @@ from lancedb.index import (
LabelList,
)
from lancedb.remote.db import LOOP
from lancedb.table import IndexConfigType, KNOWN_METRICS
import pyarrow as pa
from lancedb.common import DATA, VEC, VECTOR_COLUMN_NAME
@@ -55,7 +39,7 @@ from lancedb.embeddings import EmbeddingFunctionRegistry
from lancedb.table import _normalize_progress
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder, LanceTakeQueryBuilder
from ..table import AsyncTable, BlobMode, IndexStatistics, Query, Table, Tags
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
from ..types import BaseTokenizerType
@@ -64,103 +48,14 @@ class RemoteTable(Table):
self,
table: AsyncTable,
db_name: str,
*,
connection_state: Optional[Union[str, Callable[[], str]]] = None,
namespace_path: Optional[List[str]] = None,
):
self._table_handle = table
self._name = table.name
self._table = table
self.db_name = db_name
self._connection_state = connection_state
self._namespace_path = list(namespace_path or [])
self._checkout_version: Optional[int] = None
self._table_state: Optional[dict[str, Any]] = None
self._pid = os.getpid()
def _serialized_connection_state(self) -> str:
if self._connection_state is None:
raise RuntimeError(
"Cannot reopen this remote table because it does not carry "
"serialized connection state"
)
if callable(self._connection_state):
self._connection_state = self._connection_state()
return self._connection_state
def _reopen_state(self) -> dict[str, Any]:
if self._table_state is not None:
return self._table_state
self._table_state = {
"name": self._name,
"namespace_path": self._namespace_path,
"storage_options": None,
}
return self._table_state
@property
def _table(self) -> AsyncTable:
self._ensure_open()
assert self._table_handle is not None
return self._table_handle
@_table.setter
def _table(self, table: AsyncTable) -> None:
self._table_handle = table
self._name = table.name
self._table_state = None
self._pid = os.getpid()
def _ensure_open(self) -> None:
pid = os.getpid()
if self._table_handle is not None and self._pid == pid:
return
# Pickle clears the handle; fork inherits a handle created in the
# parent process. In both cases reopen before touching the Rust client.
from lancedb import deserialize_conn
db = deserialize_conn(self._serialized_connection_state(), for_worker=True)
table_state = self._reopen_state()
table = db.open_table(
table_state["name"],
namespace_path=table_state["namespace_path"] or None,
)
if self._checkout_version is not None:
table.checkout(self._checkout_version)
self._table_handle = table._table
self.db_name = table.db_name
self._pid = pid
def __getstate__(self) -> dict:
return {
"connection_state": self._serialized_connection_state(),
"db_name": self.db_name,
"table_state": self._reopen_state(),
"checkout_version": self._checkout_version,
}
def __setstate__(self, state: dict) -> None:
self._table_handle = None
table_state = state.get("table_state")
if table_state is None:
table_state = {
"name": state["name"],
"namespace_path": state["namespace_path"],
"storage_options": None,
}
self._table_state = table_state
self._name = table_state["name"]
self.db_name = state["db_name"]
self._connection_state = state["connection_state"]
self._namespace_path = table_state["namespace_path"]
self._checkout_version = state["checkout_version"]
self._pid = None
@property
def name(self) -> str:
"""The name of the table"""
return self._name
return self._table.name
def __repr__(self) -> str:
return f"RemoteTable({self.db_name}.{self.name})"
@@ -205,24 +100,18 @@ class RemoteTable(Table):
"""to_arrow() is not yet supported on LanceDB cloud."""
raise NotImplementedError("to_arrow() is not yet supported on LanceDB cloud.")
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs):
def to_pandas(self):
"""to_pandas() is not yet supported on LanceDB cloud."""
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
def checkout(self, version: Union[int, str]):
result = LOOP.run(self._table.checkout(version))
self._checkout_version = self.version
return result
return LOOP.run(self._table.checkout(version))
def checkout_latest(self):
result = LOOP.run(self._table.checkout_latest())
self._checkout_version = None
return result
return LOOP.run(self._table.checkout_latest())
def restore(self, version: Optional[Union[int, str]] = None):
result = LOOP.run(self._table.restore(version))
self._checkout_version = None
return result
return LOOP.run(self._table.restore(version))
def list_indices(self) -> Iterable[IndexConfig]:
"""List all the indices on the table"""
@@ -232,11 +121,6 @@ class RemoteTable(Table):
"""List all the stats of a specified index"""
return LOOP.run(self._table.index_stats(index_uuid))
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=BTree()/Bitmap()/LabelList() instead.",
)
def create_scalar_index(
self,
column: str,
@@ -246,12 +130,7 @@ class RemoteTable(Table):
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
"""Creates a scalar index.
.. deprecated:: 0.25.0
Use :meth:`create_index` with a BTree, Bitmap, or LabelList config instead.
Example: ``table.create_index("column", config=BTree())``
"""Creates a scalar index
Parameters
----------
column : str
@@ -282,11 +161,6 @@ class RemoteTable(Table):
)
)
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=FTS() instead.",
)
def create_fts_index(
self,
column: str,
@@ -307,12 +181,6 @@ class RemoteTable(Table):
prefix_only: bool = False,
name: Optional[str] = None,
):
"""Create a full-text search index on a column.
.. deprecated:: 0.25.0
Use :meth:`create_index` with an FTS config instead.
Example: ``table.create_index("text_column", config=FTS())``
"""
config = FTS(
with_position=with_position,
base_tokenizer=base_tokenizer,
@@ -336,43 +204,9 @@ class RemoteTable(Table):
)
)
# New unified API overload
@overload
def create_index(
self,
column: str,
/,
*,
config: IndexConfigType,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
# Legacy API overload (deprecated)
@overload
def create_index(
self,
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
vector_column_name: str = ...,
index_cache_size: Optional[int] = ...,
num_partitions: Optional[int] = ...,
num_sub_vectors: Optional[int] = ...,
replace: Optional[bool] = ...,
accelerator: Optional[str] = ...,
index_type: Literal[
"VECTOR", "IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = ...,
wait_timeout: Optional[timedelta] = ...,
*,
num_bits: int = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
def create_index(
self,
metric: str = "l2",
metric="l2",
vector_column_name: str = VECTOR_COLUMN_NAME,
index_cache_size: Optional[int] = None,
num_partitions: Optional[int] = None,
@@ -383,113 +217,89 @@ class RemoteTable(Table):
wait_timeout: Optional[timedelta] = None,
*,
num_bits: int = 8,
config: Optional[IndexConfigType] = None,
name: Optional[str] = None,
train: bool = True,
):
"""Create an index on a column.
"""Create an index on the table.
This method supports both the new unified API and the legacy API
for backwards compatibility. The new API takes the column name as the
first positional argument and an index configuration object via
``config``; the legacy API takes the distance metric as the first
argument plus separate ``vector_column_name`` / ``num_partitions`` /
etc. parameters, and emits a ``DeprecationWarning``.
Parameters
----------
metric : str
The metric to use for the index. Default is "l2".
vector_column_name : str
The name of the vector column. Default is "vector".
Examples
--------
New API (recommended):
>>> table.create_index( # doctest: +SKIP
... "vector", config=IvfPq(distance_type="l2")
>>> import lancedb
>>> import uuid
>>> from lancedb.schema import vector
>>> db = lancedb.connect("db://...", api_key="...", # doctest: +SKIP
... region="...") # doctest: +SKIP
>>> table_name = uuid.uuid4().hex
>>> schema = pa.schema(
... [
... pa.field("id", pa.uint32(), False),
... pa.field("vector", vector(128), False),
... pa.field("s", pa.string(), False),
... ]
... )
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
Legacy API (deprecated):
>>> table.create_index( # doctest: +SKIP
... "l2", vector_column_name="vector"
>>> table = db.create_table( # doctest: +SKIP
... table_name, # doctest: +SKIP
... schema=schema, # doctest: +SKIP
... )
>>> table.create_index("l2", "vector") # doctest: +SKIP
"""
# Detect whether this is a legacy API call
is_legacy = self._is_legacy_create_index_call(
metric,
config,
num_partitions,
num_sub_vectors,
vector_column_name,
accelerator,
index_cache_size,
replace,
)
if is_legacy:
warnings.warn(
"The create_index() API with metric/num_partitions parameters is "
"deprecated and will be removed in a future version. "
"Please migrate to the new unified API:\n"
" # Old (deprecated):\n"
" table.create_index('l2', vector_column_name='my_vector')\n"
" # New (recommended):\n"
" table.create_index('my_vector', config=IvfPq(distance_type='l2'))",
DeprecationWarning,
stacklevel=2,
if accelerator is not None:
logging.warning(
"GPU accelerator is not yet supported on LanceDB cloud."
"If you have 100M+ vectors to index,"
"please contact us at contact@lancedb.com"
)
if replace is not None:
logging.warning(
"replace is not supported on LanceDB cloud."
"Existing indexes will always be replaced."
)
column = vector_column_name
if accelerator is not None:
logging.warning(
"GPU accelerator is not yet supported on LanceDB cloud."
"If you have 100M+ vectors to index,"
"please contact us at contact@lancedb.com"
)
if replace is not None:
logging.warning(
"replace is not supported on LanceDB cloud."
"Existing indexes will always be replaced."
)
idx_type = index_type.upper()
if idx_type == "VECTOR" or idx_type == "IVF_PQ":
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
)
elif idx_type == "IVF_RQ":
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
)
elif idx_type == "IVF_SQ":
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
elif idx_type == "IVF_HNSW_PQ":
raise ValueError(
"IVF_HNSW_PQ is not supported on LanceDB cloud."
"Please use IVF_HNSW_SQ instead."
)
elif idx_type == "IVF_HNSW_SQ":
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
elif idx_type == "IVF_HNSW_FLAT":
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
elif idx_type == "IVF_FLAT":
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
else:
raise ValueError(
f"Unknown vector index type: {idx_type}. Valid options are"
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
)
index_type = index_type.upper()
if index_type == "VECTOR" or index_type == "IVF_PQ":
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
)
elif index_type == "IVF_RQ":
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
)
elif index_type == "IVF_SQ":
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_HNSW_PQ":
raise ValueError(
"IVF_HNSW_PQ is not supported on LanceDB cloud."
"Please use IVF_HNSW_SQ instead."
)
elif index_type == "IVF_HNSW_SQ":
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_HNSW_FLAT":
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_FLAT":
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
else:
column = metric
raise ValueError(
f"Unknown vector index type: {index_type}. Valid options are"
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
)
LOOP.run(
self._table.create_index(
column,
vector_column_name,
config=config,
wait_timeout=wait_timeout,
name=name,
@@ -497,37 +307,6 @@ class RemoteTable(Table):
)
)
def _is_legacy_create_index_call(
self,
first_arg: str,
config: Optional[IndexConfigType],
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
vector_column_name: str,
accelerator: Optional[str],
index_cache_size: Optional[int],
replace: Optional[bool],
) -> bool:
"""Detect if this is a legacy create_index call."""
if config is not None:
return False
if any(
x is not None
for x in (
num_partitions,
num_sub_vectors,
accelerator,
index_cache_size,
replace,
)
):
return True
if vector_column_name != VECTOR_COLUMN_NAME:
return True
if first_arg.lower() in KNOWN_METRICS:
return True
return False
def add(
self,
data: DATA,
@@ -876,22 +655,6 @@ class RemoteTable(Table):
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.set_unenforced_primary_key(columns))
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.set_lsm_write_spec(spec))
def unset_lsm_write_spec(self) -> None:
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.unset_lsm_write_spec())
def close_lsm_writers(self) -> None:
"""No-op on LanceDB Cloud (no local shard writers)."""
return LOOP.run(self._table.close_lsm_writers())
def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name))

View File

@@ -102,15 +102,8 @@ class LinearCombinationReranker(Reranker):
combined_list = []
for row_id, result in results.items():
# Convert vector distance to a relevance score in [0, 1] where
# higher is better. Missing vector entries are penalised with
# `_invert_score(fill)` = 1 - fill (= 0.0 for the default fill=1).
vector_score = self._invert_score(result.get("_distance", fill))
# FTS scores (BM25) are already in a "higher = more relevant" space.
# Missing FTS entries are penalised symmetrically: we use
# `1 - fill` so that the same `fill` value drives both missing-vector
# and missing-FTS penalties in the same direction.
fts_score = result.get("_score", 1 - fill)
fts_score = result.get("_score", fill)
result["_relevance_score"] = self._combine_score(vector_score, fts_score)
combined_list.append(result)
@@ -130,12 +123,8 @@ class LinearCombinationReranker(Reranker):
return tbl
def _combine_score(self, vector_score, fts_score):
# Both vector_score (inverted distance) and fts_score are in a
# "higher = more relevant" space. A straight weighted average gives
# higher _relevance_score to better matches, as expected.
# Previously this returned `1 - (...)` which inverted the final
# ranking so that the *least* relevant document ranked first.
return self.weight * vector_score + (1 - self.weight) * fts_score
# these scores represent distance
return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score)
def _invert_score(self, dist: float):
# Invert the score between relevance and distance

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import asyncio
import inspect
import os
import deprecation
import warnings
from abc import ABC, abstractmethod
@@ -88,8 +87,6 @@ from .util import (
)
from .index import lang_mapping
BlobMode = Literal["lazy", "bytes", "descriptions"]
_MODEL_BACKED_TOKENIZER_PREFIXES = ("jieba", "lindera")
_MODEL_BACKED_TOKENIZER_ERRORS = (
"unknown base tokenizer",
@@ -157,7 +154,6 @@ if TYPE_CHECKING:
AlterColumnsResult,
DeleteResult,
DropColumnsResult,
LsmWriteSpec,
MergeResult,
UpdateResult,
)
@@ -175,24 +171,6 @@ if TYPE_CHECKING:
DistanceType,
)
# Type alias for index configuration objects
IndexConfigType = Union[
IvfFlat,
IvfPq,
IvfSq,
IvfRq,
HnswFlat,
HnswPq,
HnswSq,
BTree,
Bitmap,
LabelList,
FTS,
]
# Known distance metrics for legacy API detection
KNOWN_METRICS = {"l2", "cosine", "dot", "hamming"}
def _into_pyarrow_reader(
data, schema: Optional[pa.Schema] = None
@@ -758,12 +736,8 @@ class Table(ABC):
"""
raise NotImplementedError
def _ensure_open(self) -> None:
pass
def __len__(self) -> int:
"""The number of rows in this Table"""
self._ensure_open()
return self.count_rows(None)
@property
@@ -785,22 +759,14 @@ class Table(ABC):
"""
raise NotImplementedError
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pandas.DataFrame":
def to_pandas(self) -> "pandas.DataFrame":
"""Return the table as a pandas DataFrame.
Parameters
----------
blob_mode: str, default "lazy"
Controls how blob columns are returned for backends that support
Lance blob-aware pandas conversion.
**kwargs
Forwarded to PyArrow / Lance pandas conversion.
Returns
-------
pd.DataFrame
"""
return self.to_arrow().to_pandas(**kwargs)
return self.to_arrow().to_pandas()
@abstractmethod
def to_arrow(self) -> pa.Table:
@@ -830,49 +796,11 @@ class Table(ABC):
"""
raise NotImplementedError
# New unified API overload
@overload
def create_index(
self,
column: str,
/,
*,
config: IndexConfigType,
replace: bool = ...,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
# Legacy API overload (deprecated)
@overload
def create_index(
self,
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
num_partitions: Optional[int] = ...,
num_sub_vectors: Optional[int] = ...,
vector_column_name: str = ...,
replace: bool = ...,
accelerator: Optional[str] = ...,
index_cache_size: Optional[int] = ...,
*,
index_type: VectorIndexType = ...,
wait_timeout: Optional[timedelta] = ...,
num_bits: int = ...,
max_iterations: int = ...,
sample_rate: int = ...,
m: int = ...,
ef_construction: int = ...,
name: Optional[str] = ...,
train: bool = ...,
target_partition_size: Optional[int] = ...,
) -> None: ...
def create_index(
self,
metric: DistanceType = "l2",
num_partitions: Optional[int] = None,
num_sub_vectors: Optional[int] = None,
metric="l2",
num_partitions=256,
num_sub_vectors=96,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
@@ -885,53 +813,46 @@ class Table(ABC):
sample_rate: int = 256,
m: int = 20,
ef_construction: int = 300,
config: Optional[IndexConfigType] = None,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on a column.
This method supports both the new unified API and the legacy API
for backwards compatibility. The new API takes the column name as the
first positional argument and an index configuration object via
``config``; the legacy API takes the distance metric as the first
argument plus separate ``vector_column_name`` / ``num_partitions`` /
etc. parameters, and emits a ``DeprecationWarning``.
"""Create an index on the table.
Parameters
----------
metric : str
For new API: the column name to index.
For legacy API: the distance metric ("l2", "cosine", "dot", "hamming").
config : IndexConfigType, optional
The index configuration object. If provided, uses the new unified API.
Can be one of: IvfFlat, IvfPq, IvfSq, IvfRq, HnswPq, HnswSq,
BTree, Bitmap, LabelList, FTS.
replace : bool, default True
Whether to replace an existing index on this column.
wait_timeout : timedelta, optional
Timeout to wait for async indexing to complete.
name : str, optional
Custom name for the index.
train : bool, default True
Whether to train the index with existing data.
metric: str, default "l2"
The distance metric to use when creating the index.
Valid values are "l2", "cosine", "dot", or "hamming".
l2 is euclidean distance.
Hamming is available only for binary vectors.
num_partitions: int, default 256
The number of IVF partitions to use when creating the index.
Default is 256.
num_sub_vectors: int, default 96
The number of PQ sub-vectors to use when creating the index.
Default is 96.
vector_column_name: str, default "vector"
The vector column name to create the index.
replace: bool, default True
- If True, replace the existing index if it exists.
Examples
--------
New API (recommended):
>>> table.create_index( # doctest: +SKIP
... "vector", config=IvfPq(distance_type="l2")
... )
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
Legacy API (deprecated):
>>> table.create_index( # doctest: +SKIP
... "l2", vector_column_name="vector"
... )
- If False, raise an error if duplicate index exists.
accelerator: str, default None
If set, use the given accelerator to create the index.
Only support "cuda" for now.
index_cache_size : int, optional
The size of the index cache in number of entries. Default value is 256.
num_bits: int
The number of bits to encode sub-vectors. Only used with the IVF_PQ index.
Only 4 and 8 are supported.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
train: bool, default True
Whether to train the index with existing data. Vector indices always train
with existing data.
"""
raise NotImplementedError
@@ -1256,7 +1177,7 @@ class Table(ABC):
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -1409,7 +1330,6 @@ class Table(ABC):
pa.RecordBatch
A record batch containing the rows at the given offsets.
"""
self._ensure_open()
# We don't know the order of the results at all. So we calculate a permutation
# for ordering the given offsets. Then we load the data with the _rowoffset
# column. Then we sort by _rowoffset and apply the inverse of the permutation
@@ -1968,7 +1888,6 @@ class LanceTable(Table):
self._location = location # Store location for use in _dataset_path
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
self._init_reopen_tracking()
if _async is not None:
self._table = _async
else:
@@ -1984,66 +1903,6 @@ class LanceTable(Table):
)
)
def _init_reopen_tracking(self) -> None:
self._checkout_version: Optional[int] = None
self._table_state: Optional[dict[str, Any]] = None
self._pid = os.getpid()
def _reopen_state(self) -> dict[str, Any]:
state = LOOP.run(self._table._table_reopen_state())
if get_uri_scheme(self._conn.uri) == "memory":
raise ValueError(
"Cannot pickle an in-memory LanceTable. Use a persisted table "
"or provide a worker-side connection factory."
)
return state
def _copy_reopened_table(self, table: "LanceTable") -> None:
self._conn = table._conn
self._namespace_path = table._namespace_path
self._location = table._location
self._namespace_client = table._namespace_client
self._pushdown_operations = table._pushdown_operations
self._table = table._table
self._pid = os.getpid()
def _ensure_open(self) -> None:
pid = os.getpid()
if getattr(self, "_table", None) is not None and self._pid == pid:
return
if self._table_state is None:
self._table_state = self._reopen_state()
table = self._conn.open_table(
self._table_state["name"],
namespace_path=self._table_state["namespace_path"] or None,
storage_options=self._table_state["storage_options"],
)
if self._checkout_version is not None:
table.checkout(self._checkout_version)
self._copy_reopened_table(table)
def __getstate__(self) -> dict[str, Any]:
return {
"connection_state": self._conn.serialize(),
"table_state": self._reopen_state(),
"checkout_version": self._checkout_version,
}
def __setstate__(self, state: dict[str, Any]) -> None:
from . import deserialize_conn
self._conn = deserialize_conn(state["connection_state"], for_worker=True)
self._namespace_path = list(state["table_state"]["namespace_path"] or [])
self._location = None
self._namespace_client = None
self._pushdown_operations = set()
self._checkout_version = state["checkout_version"]
self._table_state = state["table_state"]
self._table = None
self._pid = None
self._ensure_open()
@property
def name(self) -> str:
return self._table.name
@@ -2247,7 +2106,6 @@ class LanceTable(Table):
0 [1.1, 0.9] vector
"""
LOOP.run(self._table.checkout(version))
self._checkout_version = self.version
def checkout_latest(self):
"""Checkout the latest version of the table. This is an in-place operation.
@@ -2256,7 +2114,6 @@ class LanceTable(Table):
version of the table.
"""
LOOP.run(self._table.checkout_latest())
self._checkout_version = None
def restore(self, version: Optional[Union[int, str]] = None):
"""Restore a version of the table. This is an in-place operation.
@@ -2305,13 +2162,12 @@ class LanceTable(Table):
if version is not None:
LOOP.run(self._table.checkout(version))
LOOP.run(self._table.restore())
self._checkout_version = None
def count_rows(self, filter: Optional[str] = None) -> int:
return LOOP.run(self._table.count_rows(filter))
def __repr__(self) -> str:
val = f"{self.__class__.__name__}(name={self.name!r}"
val = f"{self.__class__.__name__}(name={self.name!r}, version={self.version}"
if self._conn.read_consistency_interval is not None:
val += ", read_consistency_interval={!r}".format(
self._conn.read_consistency_interval
@@ -2326,27 +2182,14 @@ class LanceTable(Table):
"""Return the first n rows of the table."""
return LOOP.run(self._table.head(n))
def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
def to_pandas(self) -> "pd.DataFrame":
"""Return the table as a pandas DataFrame.
Parameters
----------
blob_mode: str, default "lazy"
Controls how Lance blob columns are returned.
**kwargs
Forwarded to Lance pandas conversion.
Returns
-------
pd.DataFrame
"""
if blob_mode == "lazy" and (
self._namespace_client is not None
or get_uri_scheme(self._dataset_path) == "memory"
):
return self.to_arrow().to_pandas(**kwargs)
return self.to_lance().to_pandas(blob_mode=blob_mode, **kwargs)
return self.to_arrow().to_pandas()
def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
@@ -2383,51 +2226,11 @@ class LanceTable(Table):
dataset, allow_pyarrow_filter=False, batch_size=batch_size
)
# New unified API overload
@overload
def create_index(
self,
column: str,
/,
*,
config: IndexConfigType,
replace: bool = ...,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
# Legacy API overload (deprecated)
@overload
def create_index(
self,
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
num_partitions: Optional[int] = ...,
num_sub_vectors: Optional[int] = ...,
vector_column_name: str = ...,
replace: bool = ...,
accelerator: Optional[str] = ...,
index_cache_size: Optional[int] = ...,
num_bits: int = ...,
index_type: Literal[
"IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_RQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = ...,
max_iterations: int = ...,
sample_rate: int = ...,
m: int = ...,
ef_construction: int = ...,
*,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
target_partition_size: Optional[int] = ...,
) -> None: ...
def create_index(
self,
metric: str = "l2",
num_partitions: Optional[int] = None,
num_sub_vectors: Optional[int] = None,
metric: DistanceType = "l2",
num_partitions=None,
num_sub_vectors=None,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
@@ -2447,232 +2250,47 @@ class LanceTable(Table):
m: int = 20,
ef_construction: int = 300,
*,
config: Optional[IndexConfigType] = None,
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on a column.
This method supports both the new unified API and the legacy API
for backwards compatibility. The new API takes the column name as the
first positional argument and an index configuration object via
``config``; the legacy API takes the distance metric as the first
argument plus separate ``vector_column_name`` / ``num_partitions`` /
etc. parameters, and emits a ``DeprecationWarning``.
Parameters
----------
metric : str
For new API: the column name to index.
For legacy API: the distance metric ("l2", "cosine", "dot", "hamming").
config : IndexConfigType, optional
The index configuration object. If provided, uses the new unified API.
Can be one of: IvfFlat, IvfPq, IvfSq, IvfRq, HnswPq, HnswSq,
BTree, Bitmap, LabelList, FTS.
replace : bool, default True
Whether to replace an existing index on this column.
wait_timeout : timedelta, optional
Timeout to wait for async indexing to complete.
name : str, optional
Custom name for the index.
train : bool, default True
Whether to train the index with existing data.
Examples
--------
New API (recommended):
>>> table.create_index( # doctest: +SKIP
... "vector", config=IvfPq(distance_type="l2")
... )
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
Legacy API (deprecated):
>>> table.create_index( # doctest: +SKIP
... "l2", vector_column_name="vector"
... )
"""
# Detect whether this is a legacy API call
is_legacy = self._is_legacy_create_index_call(
metric,
config,
num_partitions,
num_sub_vectors,
vector_column_name,
accelerator,
index_cache_size,
)
if is_legacy:
warnings.warn(
"The create_index() API with metric/num_partitions parameters is "
"deprecated and will be removed in a future version. "
"Please migrate to the new unified API:\n"
" # Old (deprecated):\n"
" table.create_index('l2', vector_column_name='my_vector')\n"
" # New (recommended):\n"
" table.create_index('my_vector', config=IvfPq(distance_type='l2'))",
DeprecationWarning,
stacklevel=2,
)
# Legacy API: first arg is the distance metric
column = vector_column_name
# Build config from legacy parameters
config = self._build_vector_config_from_legacy_params(
metric=metric,
"""Create an index on the table."""
if accelerator is not None:
# accelerator is only supported through pylance.
self.to_lance().create_index(
column=vector_column_name,
index_type=index_type,
metric=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
replace=replace,
accelerator=accelerator,
index_cache_size=index_cache_size,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
# Handle accelerator through pylance
if accelerator is not None:
self.to_lance().create_index(
column=column,
index_type=index_type,
metric=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
replace=replace,
accelerator=accelerator,
index_cache_size=index_cache_size,
num_bits=num_bits,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
self.checkout_latest()
return
else:
# New API: metric is the column name
column = metric
# Check if config has accelerator set and dispatch to pylance
if config is not None and hasattr(config, "accelerator"):
acc = getattr(config, "accelerator", None)
if acc is not None:
# Dispatch to pylance for GPU acceleration
index_type_map = {
"IvfFlat": "IVF_FLAT",
"IvfSq": "IVF_SQ",
"IvfPq": "IVF_PQ",
"IvfRq": "IVF_RQ",
"HnswPq": "IVF_HNSW_PQ",
"HnswSq": "IVF_HNSW_SQ",
}
cfg_type = type(config).__name__
lance_index_type = index_type_map.get(cfg_type, "IVF_PQ")
self.to_lance().create_index(
column=column,
index_type=lance_index_type,
metric=getattr(config, "distance_type", "l2"),
num_partitions=getattr(config, "num_partitions", None),
num_sub_vectors=getattr(config, "num_sub_vectors", None),
replace=replace,
accelerator=acc,
num_bits=getattr(config, "num_bits", 8),
m=getattr(config, "m", 20),
ef_construction=getattr(config, "ef_construction", 300),
target_partition_size=getattr(
config, "target_partition_size", None
),
)
self.checkout_latest()
return
return LOOP.run(
self._table.create_index(
column,
replace=replace,
config=config,
wait_timeout=wait_timeout,
name=name,
train=train,
)
)
def _is_legacy_create_index_call(
self,
first_arg: str,
config: Optional[IndexConfigType],
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
vector_column_name: str,
accelerator: Optional[str],
index_cache_size: Optional[int],
) -> bool:
"""Detect if this is a legacy create_index call."""
# If config is provided, it's definitely the new API
if config is not None:
return False
# If old-style parameters were explicitly set, it's legacy
if any(
x is not None
for x in (num_partitions, num_sub_vectors, accelerator, index_cache_size)
):
return True
# If vector_column_name differs from default, it's legacy
if vector_column_name != VECTOR_COLUMN_NAME:
return True
# If first arg is a known metric, assume legacy
if first_arg.lower() in KNOWN_METRICS:
return True
# Otherwise assume new API
return False
def _build_vector_config_from_legacy_params(
self,
metric: str,
index_type: str,
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
num_bits: int,
max_iterations: int,
sample_rate: int,
m: int,
ef_construction: int,
target_partition_size: Optional[int],
accelerator: Optional[str],
) -> IndexConfigType:
"""Build an index config object from legacy parameters."""
if index_type == "IVF_FLAT":
return IvfFlat(
self.checkout_latest()
return
elif index_type == "IVF_FLAT":
config = IvfFlat(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_SQ":
return IvfSq(
config = IvfSq(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_PQ":
return IvfPq(
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
@@ -2680,20 +2298,18 @@ class LanceTable(Table):
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_RQ":
return IvfRq(
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_HNSW_PQ":
return HnswPq(
config = HnswPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
@@ -2703,10 +2319,9 @@ class LanceTable(Table):
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_HNSW_SQ":
return HnswSq(
config = HnswSq(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
@@ -2714,10 +2329,9 @@ class LanceTable(Table):
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_HNSW_FLAT":
return HnswFlat(
config = HnswFlat(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
@@ -2729,6 +2343,16 @@ class LanceTable(Table):
else:
raise ValueError(f"Unknown index type {index_type}")
return LOOP.run(
self._table.create_index(
vector_column_name,
replace=replace,
config=config,
name=name,
train=train,
)
)
def drop_index(self, name: str) -> None:
"""
Drops an index from the table
@@ -2828,11 +2452,6 @@ class LanceTable(Table):
"""
return LOOP.run(self._table.latest_storage_options())
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=BTree()/Bitmap()/LabelList() instead.",
)
def create_scalar_index(
self,
column: str,
@@ -2841,12 +2460,6 @@ class LanceTable(Table):
index_type: ScalarIndexType = "BTREE",
name: Optional[str] = None,
):
"""Create a scalar index on a column.
.. deprecated:: 0.25.0
Use :meth:`create_index` with a BTree, Bitmap, or LabelList config instead.
Example: ``table.create_index("column", config=BTree())``
"""
if index_type == "BTREE":
config = BTree()
elif index_type == "BITMAP":
@@ -2859,11 +2472,6 @@ class LanceTable(Table):
self._table.create_index(column, replace=replace, config=config, name=name)
)
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=FTS() instead.",
)
def create_fts_index(
self,
field_names: Union[str, List[str]],
@@ -2887,12 +2495,6 @@ class LanceTable(Table):
prefix_only: bool = False,
name: Optional[str] = None,
):
"""Create a full-text search index on a column.
.. deprecated:: 0.25.0
Use :meth:`create_index` with an FTS config instead.
Example: ``table.create_index("text_column", config=FTS())``
"""
self._ensure_no_legacy_fts_index()
if use_tantivy:
@@ -2916,6 +2518,11 @@ class LanceTable(Table):
"at a time. To search over multiple text fields, create a "
"separate FTS index for each field."
)
if "." in field_names:
raise ValueError(
"Native FTS indexes can only be created on top-level fields. "
f"Received nested field path: {field_names!r}."
)
if tokenizer_name is None:
tokenizer_configs = {
@@ -3364,7 +2971,6 @@ class LanceTable(Table):
self._location = location
self._namespace_client = namespace_client
self._pushdown_operations = pushdown_operations or set()
self._init_reopen_tracking()
if data_storage_version is not None:
warnings.warn(
@@ -3657,26 +3263,6 @@ class LanceTable(Table):
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
def set_unenforced_primary_key(self, columns: Union[str, Iterable[str]]) -> None:
"""Set the unenforced primary key. See
[`AsyncTable.set_unenforced_primary_key`][lancedb.AsyncTable.set_unenforced_primary_key]."""
return LOOP.run(self._table.set_unenforced_primary_key(columns))
def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Install an LsmWriteSpec. See
[`AsyncTable.set_lsm_write_spec`][lancedb.AsyncTable.set_lsm_write_spec]."""
return LOOP.run(self._table.set_lsm_write_spec(spec))
def unset_lsm_write_spec(self) -> None:
"""Remove the LsmWriteSpec. See
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
return LOOP.run(self._table.unset_lsm_write_spec())
def close_lsm_writers(self) -> None:
"""Close cached MemWAL shard writers. See
[`AsyncTable.close_lsm_writers`][lancedb.AsyncTable.close_lsm_writers]."""
return LOOP.run(self._table.close_lsm_writers())
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
@@ -4222,79 +3808,6 @@ class AsyncTable:
Any attempt to use the table after it has been closed will raise an error."""
return self._inner.close()
async def set_unenforced_primary_key(
self, columns: Union[str, Iterable[str]]
) -> None:
"""Set the unenforced primary key for this table to the given
ordered list of columns.
"Unenforced" means LanceDB does not check uniqueness on writes; the
columns are recorded in the schema as the primary key so that
features such as `merge_insert` can use them. Calling this again
replaces any previously-set primary key.
Parameters
----------
columns : str or Iterable[str]
Either a single column name (single-column key) or an ordered
iterable of column names (composite key). Each column dtype
must be one of: int32, int64, utf8, large_utf8, binary,
large_binary, fixed_size_binary.
"""
if isinstance(columns, str):
columns = [columns]
else:
columns = list(columns)
await self._inner.set_unenforced_primary_key(columns)
async def set_lsm_write_spec(self, spec: "LsmWriteSpec") -> None:
"""Install an LsmWriteSpec on this table.
The spec selects Lance's MemWAL LSM-style write path for future
`merge_insert` calls. ``LsmWriteSpec`` chooses one of three sharding
strategies:
- ``LsmWriteSpec.bucket(column, num_buckets)`` — hash-bucket writes by
the single-column unenforced primary key.
- ``LsmWriteSpec.identity(column)`` — shard by the raw value of a
scalar column.
- ``LsmWriteSpec.unsharded()`` — route every write to a single shard.
All variants require the table to have an unenforced primary key set
via [`set_unenforced_primary_key`]; bucket sharding additionally
requires it to be the single column being bucketed.
Parameters
----------
spec : LsmWriteSpec
The sharding spec to install.
Examples
--------
>>> from lancedb._lancedb import LsmWriteSpec
>>> # table.set_unenforced_primary_key("id")
>>> # table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 16))
"""
await self._inner.set_lsm_write_spec(spec)
async def unset_lsm_write_spec(self) -> None:
"""Remove the LsmWriteSpec from this table.
Reverts to the standard `merge_insert` write path. Errors if no spec
is currently set.
"""
await self._inner.unset_lsm_write_spec()
async def close_lsm_writers(self) -> None:
"""Drain and close any cached MemWAL shard writers for this table.
When an LSM write spec is installed, `merge_insert` opens MemWAL shard
writers and caches them for reuse across calls. This closes them,
flushing pending data; writers reopen lazily on the next
`merge_insert`. It is a no-op when no writers are cached.
"""
await self._inner.close_lsm_writers()
@property
def name(self) -> str:
"""The name of the table."""
@@ -4353,39 +3866,14 @@ class AsyncTable:
"""
return AsyncQuery(self._inner.query())
async def _to_lance(self, **kwargs) -> lance.LanceDataset:
try:
import lance
except ImportError:
raise ImportError(
"The lance library is required to use this function. "
"Please install with `pip install pylance`."
)
return lance.dataset(
await self.uri(),
version=await self.version(),
storage_options=await self.latest_storage_options(),
**kwargs,
)
async def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
async def to_pandas(self) -> "pd.DataFrame":
"""Return the table as a pandas DataFrame.
Parameters
----------
blob_mode: str, default "lazy"
Controls how Lance blob columns are returned.
**kwargs
Forwarded to PyArrow / Lance pandas conversion.
Returns
-------
pd.DataFrame
"""
if blob_mode == "lazy":
return (await self.to_arrow()).to_pandas(**kwargs)
return (await self._to_lance()).to_pandas(blob_mode=blob_mode, **kwargs)
return (await self.to_arrow()).to_pandas()
async def to_arrow(self) -> pa.Table:
"""Return the table as a pyarrow Table.
@@ -4622,10 +4110,6 @@ class AsyncTable:
"""
return await self._inner.latest_storage_options()
async def _table_reopen_state(self) -> dict[str, Any]:
"""Get the Rust-side table state needed to reopen this table."""
return await self._inner._table_reopen_state()
async def add(
self,
data: DATA,
@@ -4749,7 +4233,7 @@ class AsyncTable:
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -5028,8 +4512,6 @@ class AsyncTable:
async_query = async_query.fast_search()
if query.with_row_id:
async_query = async_query.with_row_id()
if query.order_by:
async_query = async_query.order_by(query.order_by)
if query.vector:
async_query = async_query.nearest_to(query.vector).distance_range(
@@ -5129,8 +4611,6 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
use_lsm_write=merge._use_lsm_write,
validate_single_shard=merge._validate_single_shard,
),
)

View File

@@ -10,7 +10,7 @@ import pathlib
import warnings
from datetime import date, datetime
from functools import singledispatch
from typing import Tuple, Union, Optional, Any, List
from typing import Tuple, Union, Optional, Any
from urllib.parse import urlparse
import numpy as np
@@ -189,33 +189,7 @@ def flatten_columns(tbl: pa.Table, flatten: Optional[Union[int, bool]] = None):
return tbl
def _format_field_path(path: List[str]) -> str:
def format_segment(segment: str) -> str:
if all(char.isalnum() or char == "_" for char in segment):
return segment
return f"`{segment.replace('`', '``')}`"
return ".".join(format_segment(segment) for segment in path)
def _iter_vector_columns(
field: pa.Field, path: List[str], dim: Optional[int] = None
) -> List[str]:
field_path = [*path, field.name]
if is_vector_column(field.type):
vector_dim = infer_vector_column_dim(field.type)
if dim is None or vector_dim == dim:
return [_format_field_path(field_path)]
return []
if pa.types.is_struct(field.type):
columns = []
for idx in range(field.type.num_fields):
columns.extend(_iter_vector_columns(field.type.field(idx), field_path, dim))
return columns
return []
def inf_vector_column_query(schema: pa.Schema, dim: Optional[int] = None) -> str:
def inf_vector_column_query(schema: pa.Schema) -> str:
"""
Get the vector column name
@@ -228,21 +202,26 @@ def inf_vector_column_query(schema: pa.Schema, dim: Optional[int] = None) -> str
-------
str: the vector column name.
"""
vector_col_names = []
for field in schema:
vector_col_names.extend(_iter_vector_columns(field, [], dim))
if len(vector_col_names) > 1:
raise ValueError(
"Schema has more than one vector column. "
"Please specify the vector column name "
f"for vector search. Candidates: {vector_col_names}"
)
if len(vector_col_names) == 0:
vector_col_name = ""
vector_col_count = 0
for field_name in schema.names:
field = schema.field(field_name)
if is_vector_column(field.type):
vector_col_count += 1
if vector_col_count > 1:
raise ValueError(
"Schema has more than one vector column. "
"Please specify the vector column name "
"for vector search"
)
elif vector_col_count == 1:
vector_col_name = field_name
if vector_col_count == 0:
raise ValueError(
"There is no vector column in the data. "
"Please specify the vector column name for vector search"
)
return vector_col_names[0]
return vector_col_name
def is_vector_column(data_type: pa.DataType) -> bool:
@@ -268,29 +247,6 @@ def is_vector_column(data_type: pa.DataType) -> bool:
return False
def infer_vector_column_dim(data_type: pa.DataType) -> Optional[int]:
if pa.types.is_fixed_size_list(data_type):
return data_type.list_size
if pa.types.is_list(data_type):
return infer_vector_column_dim(data_type.value_type)
return None
def _query_vector_dim(query: Optional[Any]) -> Optional[int]:
if query is None:
return None
if isinstance(query, np.ndarray):
if query.ndim == 0:
return None
return query.shape[-1]
if isinstance(query, list) and query:
first = query[0]
if isinstance(first, (list, tuple, np.ndarray)):
return len(first)
return len(query)
return None
def infer_vector_column_name(
schema: pa.Schema,
query_type: str,
@@ -306,9 +262,7 @@ def infer_vector_column_name(
if query is not None or query_type == "hybrid":
try:
vector_column_name = inf_vector_column_query(
schema, dim=_query_vector_dim(query)
)
vector_column_name = inf_vector_column_query(schema)
except Exception as e:
raise e

View File

@@ -57,7 +57,7 @@ async def test_upsert_async(mem_db_async):
await table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=1, num_deleted_rows=0, num_rows=2)
# num_inserted_rows=1, num_deleted_rows=0)
# --8<-- [end:upsert_basic_async]
assert await table.count_rows() == 3
assert res.version == 2
@@ -86,7 +86,7 @@ def test_insert_if_not_exists(mem_db):
table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=0,
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
# num_inserted_rows=1, num_deleted_rows=0)
# --8<-- [end:insert_if_not_exists]
assert table.count_rows() == 3
assert res.version == 2
@@ -116,7 +116,7 @@ async def test_insert_if_not_exists_async(mem_db_async):
await table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=0,
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
# num_inserted_rows=1, num_deleted_rows=0)
# --8<-- [end:insert_if_not_exists]
assert await table.count_rows() == 3
assert res.version == 2
@@ -150,7 +150,7 @@ def test_replace_range(mem_db):
table.count_rows("doc_id = 1") # 1
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
# num_inserted_rows=0, num_deleted_rows=1)
# --8<-- [end:insert_if_not_exists]
assert table.count_rows("doc_id = 1") == 1
assert res.version == 2
@@ -185,7 +185,7 @@ async def test_replace_range_async(mem_db_async):
await table.count_rows("doc_id = 1") # 1
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
# num_inserted_rows=0, num_deleted_rows=1)
# --8<-- [end:insert_if_not_exists]
assert await table.count_rows("doc_id = 1") == 1
assert res.version == 2

View File

@@ -1,3 +1,4 @@
segmenter:
mode: "normal"
dictionary: "./python/tests/models/lindera/ipadic/main"
dictionary:
path: "./python/tests/models/lindera/ipadic/main"

View File

@@ -6,7 +6,6 @@ import re
import sys
from datetime import timedelta
import os
from types import SimpleNamespace
import lancedb
import numpy as np
@@ -189,43 +188,6 @@ def test_table_names(tmp_db: lancedb.DBConnection):
assert len(result) == 3
def test_db_contains_and_len_include_all_table_name_pages(tmp_db: lancedb.DBConnection):
for idx in range(20):
tmp_db.create_table(f"table_{idx}", data=[{"id": idx}])
assert len(tmp_db) == 20
for idx in range(20):
assert f"table_{idx}" in tmp_db
assert "does_not_exist" not in tmp_db
def test_db_contains_stops_after_matching_table_page(
tmp_db: lancedb.DBConnection, monkeypatch
):
calls = []
pages = {
None: SimpleNamespace(tables=["table_0", "table_1"], page_token="next"),
"next": SimpleNamespace(tables=["table_2"], page_token=None),
}
def list_tables(*, page_token=None, **_kwargs):
calls.append(page_token)
return pages[page_token]
monkeypatch.setattr(tmp_db, "list_tables", list_tables)
assert "table_1" in tmp_db
assert calls == [None]
calls.clear()
assert "table_2" in tmp_db
assert calls == [None, "next"]
calls.clear()
assert len(tmp_db) == 3
assert calls == [None, "next"]
@pytest.mark.asyncio
async def test_table_names_async(tmp_path):
db = lancedb.connect(tmp_path)
@@ -466,8 +428,7 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d{20}\.manifest", manifest)
assert re.match(r"\d{20}\.manifest", manifest)
# Start a table in V1 mode then migrate
tbl = await db_no_v2_paths.create_table(
@@ -477,15 +438,13 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert not await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d\.manifest", manifest)
assert re.match(r"\d\.manifest", manifest)
await tbl.migrate_manifest_paths_v2()
assert await tbl.uses_v2_manifest_paths()
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d{20}\.manifest", manifest)
assert re.match(r"\d{20}\.manifest", manifest)
@pytest.mark.asyncio
@@ -955,29 +914,6 @@ def test_local_namespace_operations(tmp_path):
assert db.list_namespaces().namespaces == []
def test_create_namespace_invalid_mode_raises(tmp_path):
"""Unrecognized create namespace modes raise a clear error."""
db = lancedb.connect(tmp_path)
with pytest.raises(ValueError, match="Invalid create namespace mode"):
db.create_namespace(["child"], mode="frobnicate")
def test_drop_namespace_invalid_mode_raises(tmp_path):
"""Unrecognized drop namespace modes raise a clear error."""
db = lancedb.connect(tmp_path)
db.create_namespace(["child"])
with pytest.raises(ValueError, match="Invalid drop namespace mode"):
db.drop_namespace(["child"], mode="frobnicate")
def test_drop_namespace_invalid_behavior_raises(tmp_path):
"""Unrecognized drop namespace behaviors raise a clear error."""
db = lancedb.connect(tmp_path)
db.create_namespace(["child"])
with pytest.raises(ValueError, match="Invalid drop namespace behavior"):
db.drop_namespace(["child"], behavior="frobnicate")
def test_clone_table_latest_version(tmp_path):
"""Test cloning a table with the latest version (default behavior)"""
import os

View File

@@ -29,7 +29,6 @@ from lancedb.query import (
MultiMatchQuery,
PhraseQuery,
BooleanQuery,
ColumnOrdering,
Occur,
LanceFtsQueryBuilder,
)
@@ -117,7 +116,8 @@ def lindera_ipadic(language_model_home):
config_path.write_text(
"segmenter:\n"
' mode: "normal"\n'
f' dictionary: "{extracted_model.resolve().as_posix()}"\n',
" dictionary:\n"
f' path: "{extracted_model.resolve().as_posix()}"\n',
encoding="utf-8",
)
@@ -215,12 +215,11 @@ def test_reject_legacy_tantivy_index(table):
@pytest.mark.parametrize("with_position", [True, False])
def test_create_inverted_index(table, with_position):
with pytest.warns(DeprecationWarning, match="create_fts_index"):
table.create_fts_index(
"text",
with_position=with_position,
name="custom_fts_index",
)
table.create_fts_index(
"text",
with_position=with_position,
name="custom_fts_index",
)
indices = table.list_indices()
fts_indices = [i for i in indices if i.index_type == "FTS"]
assert any(i.name == "custom_fts_index" for i in fts_indices)
@@ -501,36 +500,6 @@ async def test_search_fts_specify_column_async(async_table):
pass
def test_search_order_by_descending(table):
table.create_fts_index("text")
rows = (
table.search("puppy")
.order_by([ColumnOrdering(column_name="count", ascending=False)])
.limit(20)
.select(["text", "count"])
.to_list()
)
for r in rows:
assert "puppy" in r["text"]
assert sorted(rows, key=lambda x: x["count"], reverse=True) == rows
def test_search_order_by_ascending(table):
table.create_fts_index("text")
rows = (
table.search("puppy")
.order_by([ColumnOrdering(column_name="count", ascending=True)])
.limit(20)
.select(["text", "count"])
.to_list()
)
for r in rows:
assert "puppy" in r["text"]
assert sorted(rows, key=lambda x: x["count"]) == rows
def test_create_index_from_table(tmp_path, table):
table.create_fts_index("text")
df = table.search("puppy").limit(5).select(["text"]).to_pandas()
@@ -564,111 +533,8 @@ def test_create_index_multiple_columns(tmp_path, table):
def test_nested_schema(tmp_path, table):
table.create_fts_index("nested.text", with_position=True)
indices = table.list_indices()
assert len(indices) == 1
assert indices[0].index_type == "FTS"
assert indices[0].columns == ["nested.text"]
results = (
table.search("puppy", query_type="fts", fts_columns="nested.text")
.limit(5)
.to_list()
)
assert len(results) > 0
assert all("puppy" in row["nested"]["text"] for row in results)
results = table.search(MatchQuery("puppy", "nested.text")).limit(5).to_list()
assert len(results) > 0
assert all("puppy" in row["nested"]["text"] for row in results)
phrase_results = (
table.search(PhraseQuery("puppy runs", "nested.text")).limit(5).to_list()
)
assert len(phrase_results) > 0
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
hybrid_results = (
table.search(query_type="hybrid", fts_columns="nested.text")
.vector([0 for _ in range(128)])
.text("puppy")
.limit(5)
.to_list()
)
assert len(hybrid_results) > 0
@pytest.mark.asyncio
async def test_nested_schema_async(async_table):
await async_table.create_index("nested.text", config=FTS(with_position=True))
indices = await async_table.list_indices()
assert len(indices) == 1
assert indices[0].index_type == "FTS"
assert indices[0].columns == ["nested.text"]
results = await (
async_table.query()
.nearest_to_text("puppy", columns="nested.text")
.limit(5)
.to_list()
)
assert len(results) > 0
assert all("puppy" in row["nested"]["text"] for row in results)
results = await (
async_table.query()
.nearest_to_text(MatchQuery("puppy", "nested.text"))
.limit(5)
.to_list()
)
assert len(results) > 0
assert all("puppy" in row["nested"]["text"] for row in results)
phrase_results = await (
async_table.query()
.nearest_to_text(PhraseQuery("puppy runs", "nested.text"))
.limit(5)
.to_list()
)
assert len(phrase_results) > 0
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
hybrid_results = await (
async_table.query()
.nearest_to([0 for _ in range(128)])
.nearest_to_text("puppy", columns="nested.text")
.limit(5)
.to_list()
)
assert len(hybrid_results) > 0
def test_nested_schema_rejects_invalid_fts_fields(tmp_path):
db = ldb.connect(tmp_path)
data = pa.table(
{
"payload": pa.array(
[
{"text": "puppy runs", "count": 1},
{"text": "car drives", "count": 2},
]
),
"vector": pa.array(
[[0.1, 0.1], [0.2, 0.2]],
type=pa.list_(pa.float32(), list_size=2),
),
}
)
table = db.create_table("test", data=data)
with pytest.raises(ValueError, match="FTS index cannot be created.*payload"):
table.create_fts_index("payload")
with pytest.raises(ValueError, match="FTS index cannot be created.*count"):
table.create_fts_index("payload.count")
with pytest.raises(ValueError, match="Field path `payload.missing` not found"):
table.create_fts_index("payload.missing")
with pytest.raises(ValueError, match="top-level fields"):
table.create_fts_index("nested.text")
def test_search_index_with_filter(table):

View File

@@ -105,46 +105,6 @@ async def test_create_scalar_index(some_table: AsyncTable):
assert len(indices) == 0
@pytest.mark.asyncio
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
metadata_type = pa.struct(
[
pa.field("user_id", pa.int32()),
pa.field("user.id", pa.int32()),
]
)
data = pa.Table.from_arrays(
[
pa.array([1, 2, 3], type=pa.int32()),
pa.array(
[
{"user_id": 10, "user.id": 100},
{"user_id": 20, "user.id": 200},
{"user_id": 30, "user.id": 300},
],
type=metadata_type,
),
],
names=["user_id", "metadata"],
)
table = await db_async.create_table("nested_scalar_index", data)
await table.create_index("user_id", config=BTree(), name="top_user_id_idx")
await table.create_index(
"metadata.user_id", config=BTree(), name="nested_user_id_idx"
)
await table.create_index(
"metadata.`user.id`", config=BTree(), name="escaped_user_id_idx"
)
columns_by_name = {
index.name: index.columns for index in await table.list_indices()
}
assert columns_by_name["top_user_id_idx"] == ["user_id"]
assert columns_by_name["nested_user_id_idx"] == ["metadata.user_id"]
assert columns_by_name["escaped_user_id_idx"] == ["metadata.`user.id`"]
@pytest.mark.asyncio
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
await some_table.create_index("fsb", config=BTree())
@@ -162,13 +122,12 @@ async def test_create_bitmap_index(some_table: AsyncTable):
await some_table.create_index("data", config=Bitmap())
indices = await some_table.list_indices()
assert len(indices) == 3
# list_indices returns indices in alphabetical order by name
assert indices[0].index_type == "Bitmap"
assert indices[0].columns == ["data"]
assert indices[0].columns == ["id"]
assert indices[1].index_type == "Bitmap"
assert indices[1].columns == ["id"]
assert indices[1].columns == ["is_active"]
assert indices[2].index_type == "Bitmap"
assert indices[2].columns == ["is_active"]
assert indices[2].columns == ["data"]
index_name = indices[0].name
stats = await some_table.index_stats(index_name)

View File

@@ -1,138 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for installing and clearing an LsmWriteSpec via
`Table.set_lsm_write_spec` / `Table.unset_lsm_write_spec`.
"""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
from lancedb._lancedb import LsmWriteSpec
SCHEMA = pa.schema(
[
pa.field("id", pa.utf8(), nullable=False),
pa.field("v", pa.int32(), nullable=False),
]
)
def _batch(ids, vs):
return pa.RecordBatch.from_arrays(
[pa.array(ids, type=pa.utf8()), pa.array(vs, type=pa.int32())],
schema=SCHEMA,
)
def _reader(ids, vs):
return pa.RecordBatchReader.from_batches(SCHEMA, [_batch(ids, vs)])
def _make_table(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader(["seed"], [0]))
return db, table
def test_set_lsm_write_spec_validates(tmp_path):
_db, table = _make_table(tmp_path)
# Out-of-range num_buckets.
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1025))
# Happy path then mutation rejected.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
with pytest.raises(Exception, match="mutation"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
def test_unset_lsm_write_spec(tmp_path):
_db, table = _make_table(tmp_path)
# unset errors when no spec is set.
with pytest.raises(Exception, match="no LSM write spec"):
table.unset_lsm_write_spec()
# Install a spec, then remove it; afterwards a fresh spec can be set.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.unset_lsm_write_spec()
# A second unset errors — there is no spec left to remove.
with pytest.raises(Exception, match="no LSM write spec"):
table.unset_lsm_write_spec()
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 8))
def test_set_unsharded_spec(tmp_path):
_db, table = _make_table(tmp_path)
# Lance MemWAL still requires a primary key on the dataset; Unsharded
# just skips per-row hashing.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
table.unset_lsm_write_spec()
def test_lsm_write_spec_repr():
s = LsmWriteSpec.bucket("id", 4)
assert s.spec_type == "bucket"
assert s.column == "id"
assert s.num_buckets == 4
assert s.maintained_indexes == []
assert "bucket" in repr(s)
assert "id" in repr(s)
assert "4" in repr(s)
u = LsmWriteSpec.unsharded()
assert u.spec_type == "unsharded"
assert u.column is None
assert u.num_buckets is None
assert "unsharded" in repr(u)
def test_lsm_write_spec_with_maintained_indexes():
s = LsmWriteSpec.bucket("id", 4).with_maintained_indexes(["idx_a", "idx_b"])
assert s.maintained_indexes == ["idx_a", "idx_b"]
@pytest.mark.asyncio
async def test_async_set_unset_lsm_write_spec(tmp_path):
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=0)
)
table = await db.create_table(
"t",
pa.RecordBatchReader.from_batches(SCHEMA, [_batch(["seed"], [0])]),
)
await table.set_unenforced_primary_key("id")
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
await table.unset_lsm_write_spec()
# A second unset errors.
with pytest.raises(Exception, match="no LSM write spec"):
await table.unset_lsm_write_spec()
def test_set_identity_spec(tmp_path):
_db, table = _make_table(tmp_path)
# Identity sharding still requires an unenforced primary key on the
# table; it shards by the raw value of the given column.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.identity("v"))
table.unset_lsm_write_spec()
def test_lsm_write_spec_identity_and_writer_config_defaults():
s = LsmWriteSpec.identity("v")
assert s.spec_type == "identity"
assert s.column == "v"
assert s.num_buckets is None
assert "identity" in repr(s)
s = s.with_writer_config_defaults({"durable_write": "false"})
assert s.writer_config_defaults == {"durable_write": "false"}
assert "durable_write" in repr(s)

View File

@@ -1,196 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for the MemWAL LSM ``merge_insert`` dispatch."""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
from lancedb._lancedb import LsmWriteSpec
SCHEMA = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("value", pa.int64(), nullable=False),
]
)
REGION_SCHEMA = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("region", pa.utf8(), nullable=False),
]
)
def _reader(ids):
batch = pa.RecordBatch.from_arrays(
[
pa.array(ids, type=pa.int64()),
pa.array(list(range(len(ids))), type=pa.int64()),
],
schema=SCHEMA,
)
return pa.RecordBatchReader.from_batches(SCHEMA, [batch])
def _region_reader(rows):
batch = pa.RecordBatch.from_arrays(
[
pa.array([row[0] for row in rows], type=pa.int64()),
pa.array([row[1] for row in rows], type=pa.utf8()),
],
schema=REGION_SCHEMA,
)
return pa.RecordBatchReader.from_batches(REGION_SCHEMA, [batch])
def _bucket_table(tmp_path):
"""A table with ``id`` as the primary key and a single-bucket LSM spec."""
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
# num_buckets = 1: every row routes to the single bucket.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
return table
def test_lsm_merge_insert_bucket(tmp_path):
table = _bucket_table(tmp_path)
# Empty `on` defaults to the primary key.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([3, 4, 5]))
)
# LSM path: rows go to the MemWAL, so only num_rows is populated.
assert result.num_rows == 3
assert result.version == 0
assert result.num_inserted_rows == 0
assert result.num_updated_rows == 0
def test_lsm_merge_insert_unsharded(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
result = (
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([10, 11, 12, 13]))
)
assert result.num_rows == 4
def test_lsm_merge_insert_identity(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _region_reader([(1, "us"), (2, "us")]))
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.identity("region"))
# All rows share one identity value, so they route to one shard.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_region_reader([(3, "us"), (4, "us")]))
)
assert result.num_rows == 2
def test_lsm_merge_insert_use_lsm_write_false(tmp_path):
table = _bucket_table(tmp_path) # rows id = 1, 2, 3
# use_lsm_write(False) opts out: the standard path runs and commits.
result = (
table.merge_insert("id")
.when_not_matched_insert_all()
.use_lsm_write(False)
.execute(_reader([3, 4, 5]))
)
assert result.num_inserted_rows == 2
assert table.count_rows() == 5
def test_lsm_merge_insert_validate_single_shard_off(tmp_path):
table = _bucket_table(tmp_path)
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.validate_single_shard(False)
.execute(_reader([6, 7, 8]))
)
assert result.num_rows == 3
def test_lsm_merge_insert_use_lsm_write_true_requires_spec(tmp_path):
# A table with a primary key but no LSM write spec installed.
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
with pytest.raises(Exception, match="use_lsm_write"):
(
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.use_lsm_write(True)
.execute(_reader([4]))
)
def test_lsm_merge_insert_rejects_on_not_primary_key(tmp_path):
table = _bucket_table(tmp_path)
with pytest.raises(Exception, match="primary key"):
(
table.merge_insert("value")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([1]))
)
def test_lsm_merge_insert_rejects_non_upsert(tmp_path):
table = _bucket_table(tmp_path)
# Insert-only (no when_matched_update_all) is not the upsert shape.
with pytest.raises(Exception, match="upsert"):
table.merge_insert([]).when_not_matched_insert_all().execute(_reader([4]))
def test_lsm_close_writers(tmp_path):
table = _bucket_table(tmp_path)
(
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([7, 8]))
)
table.close_lsm_writers()
# The writer reopens lazily on the next merge_insert.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([9]))
)
assert result.num_rows == 1
@pytest.mark.asyncio
async def test_async_lsm_merge_insert(tmp_path):
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=0)
)
table = await db.create_table("t", _reader([1, 2, 3]))
await table.set_unenforced_primary_key("id")
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
builder = (
table.merge_insert([]).when_matched_update_all().when_not_matched_insert_all()
)
result = await builder.execute(_reader([3, 4, 5]))
assert result.num_rows == 3
await table.close_lsm_writers()

View File

@@ -1080,29 +1080,3 @@ def test_getitems_invalid_offset(some_permutation: Permutation):
"""Test __getitems__ with an out-of-range offset raises an error."""
with pytest.raises(Exception):
some_permutation.__getitems__([999999])
def test_take_offsets(some_permutation: Permutation):
result = some_permutation.take_offsets([0, 1, 2])
assert isinstance(result, list)
assert "id" in result[0]
assert "value" in result[0]
assert len(result) == 3
def test_take_offsets_empty_identity_permutation(mem_db):
tbl = mem_db.create_table(
"test_table", pa.table({"id": range(10), "value": range(10)})
)
permutation = Permutation.identity(tbl)
result = permutation.take_offsets([])
assert result == []
def test_take_offsets_empty_permutation(some_permutation: Permutation):
result = some_permutation.take_offsets([])
assert result == []

View File

@@ -1,79 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for Table.set_unenforced_primary_key."""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
def _empty_table(path, schema):
db = lancedb.connect(path, read_consistency_interval=timedelta(seconds=0))
return db.create_table("t", schema=schema)
def test_set_unenforced_primary_key_accepts_string_or_one_element_list(tmp_path):
schema = pa.schema([pa.field("id", pa.int64(), nullable=False)])
# Bare string.
table = _empty_table(tmp_path / "s", schema)
table.set_unenforced_primary_key("id")
# One-element list.
table = _empty_table(tmp_path / "l", schema)
table.set_unenforced_primary_key(["id"])
def test_set_unenforced_primary_key_rejects_compound_and_empty(tmp_path):
table = _empty_table(
tmp_path,
pa.schema(
[
pa.field("a", pa.utf8(), nullable=False),
pa.field("b", pa.int64(), nullable=False),
]
),
)
# Compound keys are not supported.
with pytest.raises(Exception, match="compound"):
table.set_unenforced_primary_key(["a", "b"])
# Empty input.
with pytest.raises(Exception, match="required"):
table.set_unenforced_primary_key([])
def test_set_unenforced_primary_key_is_immutable(tmp_path):
table = _empty_table(
tmp_path,
pa.schema(
[
pa.field("a", pa.utf8(), nullable=False),
pa.field("b", pa.int64(), nullable=False),
]
),
)
table.set_unenforced_primary_key("a")
# The primary key cannot be changed or re-set once installed.
with pytest.raises(Exception, match="already set"):
table.set_unenforced_primary_key("b")
with pytest.raises(Exception, match="already set"):
table.set_unenforced_primary_key("a")
def test_set_unenforced_primary_key_validates(tmp_path):
table = _empty_table(
tmp_path / "t", pa.schema([pa.field("id", pa.utf8(), nullable=False)])
)
# Unknown column.
with pytest.raises(Exception, match="not found"):
table.set_unenforced_primary_key("nonexistent")
# Unsupported dtype (Float32 not in the supported set).
bad = _empty_table(
tmp_path / "bad", pa.schema([pa.field("id", pa.float32(), nullable=False)])
)
with pytest.raises(Exception, match="not supported"):
bad.set_unenforced_primary_key("id")

View File

@@ -25,7 +25,6 @@ from lancedb.query import (
AsyncHybridQuery,
AsyncQueryBase,
AsyncVectorQuery,
ColumnOrdering,
LanceVectorQueryBuilder,
MatchQuery,
PhraseQuery,
@@ -165,87 +164,6 @@ def test_offset(table):
assert len(results_with_offset.to_pandas()) == 1
@pytest.mark.asyncio
async def test_query_to_pandas_kwargs(table, table_async):
sync_df = (
LanceVectorQueryBuilder(table, [0, 0], "vector")
.select(["id"])
.limit(1)
.to_pandas(split_blocks=True)
)
assert sync_df["id"].tolist() == [1]
async_df = await (
table_async.query().select(["id"]).limit(2).to_pandas(split_blocks=True)
)
assert async_df["id"].tolist() == [1, 2]
def test_order_by_plain_query(mem_db):
table = mem_db.create_table(
"test_order_by",
pa.table(
{
"group": [1, 1, 1, 2],
"score": [None, 1.0, 1.0, 0.5],
"name": ["z", "b", "a", "c"],
}
),
)
res = (
table.search()
.order_by(
[
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
]
)
.to_arrow()
)
assert res.select(["group", "score", "name"]).to_pylist() == [
{"group": 1, "score": None, "name": "z"},
{"group": 1, "score": 1.0, "name": "a"},
{"group": 1, "score": 1.0, "name": "b"},
{"group": 2, "score": 0.5, "name": "c"},
]
@pytest.mark.asyncio
async def test_order_by_async_query(mem_db_async: AsyncConnection):
table = await mem_db_async.create_table(
"test_order_by_async",
pa.table(
{
"group": [1, 1, 1, 2],
"score": [None, 1.0, 1.0, 0.5],
"name": ["z", "b", "a", "c"],
}
),
)
res = await (
table.query()
.order_by(
[
ColumnOrdering(column_name="group", ascending=True, nulls_first=False),
ColumnOrdering(column_name="score", ascending=True, nulls_first=True),
ColumnOrdering(column_name="name", ascending=True, nulls_first=False),
]
)
.to_arrow()
)
assert res.select(["group", "score", "name"]).to_pylist() == [
{"group": 1, "score": None, "name": "z"},
{"group": 1, "score": 1.0, "name": "a"},
{"group": 1, "score": 1.0, "name": "b"},
{"group": 2, "score": 0.5, "name": "c"},
]
def test_query_builder(table):
rs = (
LanceVectorQueryBuilder(table, [0, 0], "vector")
@@ -1512,37 +1430,6 @@ def test_take_queries(tmp_path):
]
def test_take_queries_to_batches(tmp_path):
# Regression test for the sync take-query path: `to_batches` previously
# raised ``AttributeError: 'AsyncTakeQuery' object has no attribute
# 'execute'`` because the inherited ``BaseQueryBuilder.to_batches`` called
# ``execute`` on the async wrapper instead of the native query.
db = lancedb.connect(tmp_path)
data = pa.table({"idx": list(range(100)), "label": [str(i) for i in range(100)]})
table = db.create_table("test", data)
# Take by offset → to_batches
rs = list(table.take_offsets([5, 2, 17]).to_batches())
assert all(isinstance(b, pa.RecordBatch) for b in rs)
assert sum(b.num_rows for b in rs) == 3
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
# Take by row id → to_batches
rs = list(table.take_row_ids([5, 2, 17]).to_batches())
assert all(isinstance(b, pa.RecordBatch) for b in rs)
assert sum(b.num_rows for b in rs) == 3
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
# Take with select projection → to_batches preserves the projection
rs = list(table.take_row_ids([5, 2, 17]).select(["label"]).to_batches())
assert all(b.schema.names == ["label"] for b in rs)
assert sorted(v for b in rs for v in b.column("label").to_pylist()) == [
"17",
"2",
"5",
]
def test_getitems(tmp_path):
db = lancedb.connect(tmp_path)
data = pa.table(

View File

@@ -1,13 +1,12 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import re
from concurrent.futures import ThreadPoolExecutor
import contextlib
from datetime import timedelta
import http.server
import json
import multiprocessing as mp
import pickle
import re
import sys
import threading
import time
@@ -17,7 +16,6 @@ from packaging.version import Version
import lancedb
from lancedb.conftest import MockTextEmbeddingFunction
from lancedb.query import ColumnOrdering
from lancedb.remote import ClientConfig
from lancedb.remote.errors import HttpError, RetryError
import pytest
@@ -172,196 +170,6 @@ def test_table_len_sync():
assert len(table) == 1
def test_remote_connection_serializes():
def handler(request):
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"tables": []}')
with mock_lancedb_connection(handler) as db:
serialized = json.loads(db.serialize())
assert isinstance(serialized["client_config"], dict)
restored = lancedb.deserialize_conn(db.serialize())
assert restored.table_names() == []
def test_remote_table_is_picklable():
def handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
{
"version": 1,
"schema": {
"fields": [
{"name": "id", "type": {"type": "int64"}, "nullable": False}
]
},
}
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"3")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.open_table("test")
state = table.__getstate__()
assert state["table_state"] == {
"name": "test",
"namespace_path": [],
"storage_options": None,
}
restored = pickle.loads(pickle.dumps(table))
assert restored.count_rows() == 3
def test_remote_table_reopens_when_pid_changes_without_cached_state():
def handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
{
"version": 1,
"schema": {
"fields": [
{"name": "id", "type": {"type": "int64"}, "nullable": False}
]
},
}
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"3")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.open_table("test")
table._pid = -1
table._table_state = None
assert table.count_rows() == 3
def test_remote_table_open_does_not_require_picklable_client_config():
from lancedb.remote import HeaderProvider
class LocalHeaderProvider(HeaderProvider):
def get_headers(self):
return {"X-Test-Header": "present"}
def handler(request):
request.close_connection = True
assert request.headers.get("X-Test-Header") == "present"
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"3")
else:
request.send_response(404)
request.end_headers()
with http.server.HTTPServer(
("localhost", 0), make_mock_http_handler(handler)
) as server:
port = server.server_address[1]
handle = threading.Thread(target=server.serve_forever)
handle.start()
try:
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
"header_provider": LocalHeaderProvider(),
},
)
table = db.open_table("test")
assert table.count_rows() == 3
with pytest.raises(ValueError, match="header_provider"):
pickle.dumps(table)
finally:
server.shutdown()
handle.join()
def test_remote_permutation_is_picklable():
from lancedb.permutation import Permutation
rows = list(range(10))
def handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
{
"version": 1,
"schema": {
"fields": [
{"name": "a", "type": {"type": "int64"}, "nullable": False}
]
},
}
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(str(len(rows)).encode())
elif request.path == "/v1/table/test/query/":
content_len = int(request.headers.get("Content-Length"))
body = json.loads(request.rfile.read(content_len))
if "filter" in body:
match = re.search(r"_rowoffset in \((.*?)\)", body["filter"])
offsets = [int(offset.strip()) for offset in match.group(1).split(",")]
else:
offsets = rows
table = pa.table({"a": [rows[offset] for offset in offsets]})
request.send_response(200)
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
request.end_headers()
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
writer.write_table(table)
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
permutation = Permutation.identity(db.open_table("test"))
restored = pickle.loads(pickle.dumps(permutation))
assert restored.__getitems__([0, 2, 4]) == [{"a": 0}, {"a": 2}, {"a": 4}]
def test_create_table_exist_ok():
def handler(request):
if request.path == "/v1/table/test/create/?mode=exist_ok":
@@ -460,25 +268,6 @@ def test_table_unimplemented_functions():
table.to_pandas()
def test_table_to_pandas_not_supported():
def handler(request):
if request.path == "/v1/table/test/create/?mode=create":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}])
with pytest.raises(NotImplementedError):
table.to_pandas()
with pytest.raises(NotImplementedError):
table.to_pandas(blob_mode="bytes", split_blocks=True)
def test_table_add_in_threadpool():
def handler(request):
if request.path == "/v1/table/test/insert/":
@@ -553,22 +342,6 @@ def test_table_create_indices():
schema=dict(
fields=[
dict(name="id", type={"type": "int64"}, nullable=False),
dict(name="text", type={"type": "string"}, nullable=False),
dict(
name="vector",
type={
"type": "fixed_size_list",
"fields": [
dict(
name="item",
type={"type": "float"},
nullable=True,
)
],
"length": 2,
},
nullable=False,
),
]
),
)
@@ -627,25 +400,22 @@ def test_table_create_indices():
# This is a smoke-test.
table = db.create_table("test", [{"id": 1}])
# Test create_scalar_index with custom name (legacy method)
with pytest.warns(DeprecationWarning, match="create_scalar_index"):
table.create_scalar_index(
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
)
# Test create_scalar_index with custom name
table.create_scalar_index(
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
)
# Test create_fts_index with custom name (legacy method)
with pytest.warns(DeprecationWarning, match="create_fts_index"):
table.create_fts_index(
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
)
# Test create_fts_index with custom name
table.create_fts_index(
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
)
# Test create_index with custom name (legacy form: vector_column_name kwarg)
with pytest.warns(DeprecationWarning, match="create_index"):
table.create_index(
vector_column_name="vector",
wait_timeout=timedelta(seconds=10),
name="custom_vector_idx",
)
# Test create_index with custom name
table.create_index(
vector_column_name="vector",
wait_timeout=timedelta(seconds=10),
name="custom_vector_idx",
)
# Validate that the name parameter was passed correctly in requests
assert len(received_requests) == 3
@@ -674,98 +444,6 @@ def test_table_create_indices():
table.drop_index("custom_fts_idx")
def test_remote_create_index_new_api():
received_requests = []
def handler(request):
if request.path == "/v1/table/test/create_index/":
content_len = int(request.headers.get("Content-Length", 0))
body = request.rfile.read(content_len) if content_len > 0 else b""
received_requests.append(json.loads(body) if body else {})
request.send_response(200)
request.end_headers()
elif request.path == "/v1/table/test/create/?mode=create":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
elif request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(
json.dumps(
dict(
version=1,
schema=dict(
fields=[
dict(name="id", type={"type": "int64"}, nullable=False),
dict(
name="category",
type={"type": "string"},
nullable=False,
),
dict(
name="text", type={"type": "string"}, nullable=False
),
dict(
name="vector",
type={
"type": "fixed_size_list",
"fields": [
dict(
name="item",
type={"type": "float"},
nullable=True,
)
],
"length": 2,
},
nullable=False,
),
]
),
)
).encode()
)
else:
request.send_response(404)
request.end_headers()
from lancedb.index import BTree, FTS, IvfPq, IvfRq
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}])
# New API: column-first, config= kwarg. Should NOT emit DeprecationWarning.
import warnings as _warnings
with _warnings.catch_warnings():
_warnings.simplefilter("error", DeprecationWarning)
table.create_index("vector", config=IvfPq(distance_type="l2"))
table.create_index("category", config=BTree())
table.create_index("text", config=FTS())
# IvfRq via new API
table.create_index("vector", config=IvfRq(distance_type="l2"))
# Legacy index_type="IVF_RQ" routes to IvfRq config under the hood.
with pytest.warns(DeprecationWarning, match="create_index"):
table.create_index(
vector_column_name="vector",
index_type="IVF_RQ",
num_partitions=8,
)
assert len(received_requests) == 5
assert [req["column"] for req in received_requests] == [
"vector",
"category",
"text",
"vector",
"vector",
]
def test_table_wait_for_index_timeout():
def handler(request):
index_stats = dict(
@@ -982,18 +660,6 @@ def test_query_sync_maximal():
"ef": None,
"filter": "id > 0",
"columns": ["id", "name"],
"order_by": [
{
"column_name": "score",
"ascending": False,
"nulls_first": True,
},
{
"column_name": "id",
"ascending": True,
"nulls_first": False,
},
],
"vector_column": "vector2",
"fast_search": True,
"with_row_id": True,
@@ -1011,14 +677,6 @@ def test_query_sync_maximal():
.refine_factor(10)
.nprobes(5)
.where("id > 0", prefilter=True)
.order_by(
[
ColumnOrdering(
column_name="score", ascending=False, nulls_first=True
),
ColumnOrdering(column_name="id", ascending=True, nulls_first=False),
]
)
.with_row_id(True)
.select(["id", "name"])
.to_list()
@@ -1591,10 +1249,6 @@ def _remote_fork_child(port: int, queue) -> None:
queue.put(db.table_names())
def _remote_table_fork_child(table, queue) -> None:
queue.put(table.count_rows())
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
@@ -1657,65 +1311,3 @@ def test_remote_connection_after_fork():
finally:
server.shutdown()
server_thread.join()
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
"fork() is unavailable on Windows and unsafe on macOS "
"(Apple frameworks/TLS are not fork-safe)"
),
)
def test_inherited_remote_table_reopens_after_fork():
def handler(request):
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"7")
else:
request.send_response(404)
request.end_headers()
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
port = server.server_address[1]
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
try:
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
table = db.open_table("test")
assert table.count_rows() == 7
ctx = mp.get_context("fork")
queue = ctx.Queue()
proc = ctx.Process(target=_remote_table_fork_child, args=(table, queue))
proc.start()
proc.join(timeout=15)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
pytest.fail("Remote table hung after fork")
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no result"
assert queue.get() == 7
finally:
server.shutdown()
server_thread.join()

View File

@@ -603,89 +603,3 @@ def test_cross_encoder_reranker_return_all(tmp_path):
assert "_relevance_score" in result.column_names
assert "_score" in result.column_names
assert "_distance" in result.column_names
# ---------------------------------------------------------------------------
# Regression tests for LinearCombinationReranker scoring bugs (issue #3154)
# ---------------------------------------------------------------------------
def test_linear_combination_best_match_ranks_first():
"""
The document that is BOTH the closest vector match AND the only FTS match
must rank first. Previously _combine_score subtracted from 1, inverting
the ranking so the worst document ranked highest.
"""
reranker = LinearCombinationReranker(weight=0.7, return_score="all")
# rowid 0: perfect vector match, sole FTS match → should rank 1st
# rowid 1: mediocre vector, no FTS match
# rowid 2: bad vector, no FTS match
vector_results = pa.Table.from_pydict(
{
"_rowid": [0, 1, 2],
"_distance": [0.0, 0.5, 0.9],
}
)
fts_results = pa.Table.from_pydict(
{
"_rowid": [0],
"_score": [1.0],
}
)
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
scores = dict(
zip(
combined["_rowid"].to_pylist(),
combined["_relevance_score"].to_pylist(),
)
)
# rowid 0 must have the highest relevance score
assert scores[0] > scores[1], (
f"Best match (rowid 0, score={scores[0]:.4f}) should beat "
f"mid match (rowid 1, score={scores[1]:.4f})"
)
assert scores[1] > scores[2], (
f"Mid match (rowid 1, score={scores[1]:.4f}) should beat "
f"bad match (rowid 2, score={scores[2]:.4f})"
)
def test_linear_combination_missing_fts_is_penalised():
"""
A document with no FTS match must score *lower* than a document that
has a mediocre FTS match, everything else being equal. Previously
missing-FTS entries used fill=1.0 directly, which gave them a reward
(via the 1-(...) inversion) instead of a penalty.
"""
reranker = LinearCombinationReranker(weight=0.5, return_score="all")
vector_results = pa.Table.from_pydict(
{
"_rowid": [0, 1],
"_distance": [0.2, 0.2], # identical vector scores
}
)
fts_results = pa.Table.from_pydict(
{
"_rowid": [0], # rowid 1 has no FTS match
"_score": [0.3], # small FTS score
}
)
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
scores = dict(
zip(
combined["_rowid"].to_pylist(),
combined["_relevance_score"].to_pylist(),
)
)
# rowid 0 has a small FTS score; rowid 1 has none.
# Even a small FTS contribution should beat having none at all.
assert scores[0] > scores[1], (
f"Document with FTS score (rowid 0, {scores[0]:.4f}) should beat "
f"document with no FTS match (rowid 1, {scores[1]:.4f})"
)

View File

@@ -3,9 +3,7 @@
import os
import pickle
import sys
import warnings
from datetime import date, datetime, timedelta
from time import sleep
from typing import List
@@ -13,7 +11,7 @@ from unittest.mock import patch
import lancedb
from lancedb.dependencies import _PANDAS_AVAILABLE
from lancedb.index import BTree, FTS, HnswFlat, HnswPq, HnswSq, IvfPq
from lancedb.index import HnswFlat, HnswPq, HnswSq, IvfPq
import numpy as np
import polars as pl
import pyarrow as pa
@@ -35,7 +33,7 @@ def test_basic(mem_db: DBConnection):
table = mem_db.create_table("test", data=data)
assert table.name == "test"
assert "LanceTable(name='test', _conn=LanceDBConnection(" in repr(table)
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
expected_schema = pa.schema(
{
"vector": pa.list_(pa.float32(), 2),
@@ -49,115 +47,6 @@ def test_basic(mem_db: DBConnection):
assert table.to_arrow() == expected_data
def test_lance_table_is_picklable(tmp_db: DBConnection):
table = tmp_db.create_table("pickle_table", pa.table({"id": [1, 2, 3]}))
restored = pickle.loads(pickle.dumps(table))
assert restored.name == "pickle_table"
assert restored.count_rows() == 3
assert restored.to_arrow().column("id").to_pylist() == [1, 2, 3]
def test_lance_table_pickle_preserves_checkout(tmp_db: DBConnection):
table = tmp_db.create_table("pickle_checkout", pa.table({"id": [1]}))
table.add(pa.table({"id": [2]}))
table.checkout(1)
restored = pickle.loads(pickle.dumps(table))
assert restored.count_rows() == 1
assert restored.to_arrow().column("id").to_pylist() == [1]
restored.checkout_latest()
assert restored.count_rows() == 2
def test_memory_lance_table_pickle_is_unsupported(mem_db: DBConnection):
table = mem_db.create_table("memory_pickle", pa.table({"id": [1]}))
with pytest.raises(ValueError, match="in-memory LanceTable"):
pickle.dumps(table)
def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection):
pd = pytest.importorskip("pandas")
data = pa.table({"id": [1, 2], "text": ["one", "two"]})
table = tmp_db.create_table("test_to_pandas_old_call", data=data)
expected = data.to_pandas()
pd.testing.assert_frame_equal(table.to_pandas(), expected)
def test_table_to_pandas_blob_bytes(tmp_db: DBConnection):
pytest.importorskip("lance")
data = pa.table(
{
"id": pa.array([1, 2], pa.int64()),
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
},
schema=pa.schema(
[
pa.field("id", pa.int64()),
pa.field(
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
),
]
),
)
table = tmp_db.create_table("test_to_pandas_blob_bytes", data=data)
df = table.to_pandas(blob_mode="bytes")
assert df["blob"].tolist() == [b"hello", b"world"]
def test_table_to_pandas_kwargs(tmp_db: DBConnection):
pd = pytest.importorskip("pandas")
data = pa.table({"id": pa.array([1, 2], pa.int64())})
table = tmp_db.create_table("test_to_pandas_kwargs", data=data)
df = table.to_pandas(types_mapper=pd.ArrowDtype)
assert str(df["id"].dtype) == "int64[pyarrow]"
@pytest.mark.asyncio
async def test_async_table_to_pandas_blob_bytes(tmp_db_async: AsyncConnection):
pytest.importorskip("lance")
data = pa.table(
{
"id": pa.array([1, 2], pa.int64()),
"blob": pa.array([b"hello", b"world"], pa.large_binary()),
},
schema=pa.schema(
[
pa.field("id", pa.int64()),
pa.field(
"blob", pa.large_binary(), metadata={"lance-encoding:blob": "true"}
),
]
),
)
table = await tmp_db_async.create_table(
"test_async_to_pandas_blob_bytes", data=data
)
df = await table.to_pandas(blob_mode="bytes")
assert df["blob"].tolist() == [b"hello", b"world"]
@pytest.mark.asyncio
async def test_async_table_to_pandas_kwargs(tmp_db_async: AsyncConnection):
pd = pytest.importorskip("pandas")
data = pa.table({"id": pa.array([1, 2], pa.int64())})
table = await tmp_db_async.create_table("test_async_to_pandas_kwargs", data=data)
df = await table.to_pandas(types_mapper=pd.ArrowDtype)
assert str(df["id"].dtype) == "int64[pyarrow]"
def test_create_table_infers_large_int_vectors(mem_db: DBConnection):
data = [{"vector": [0, 300]}]
@@ -960,12 +849,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
num_bits=4,
)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name=None, train=True
)
# Test with target_partition_size
@@ -985,12 +869,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
target_partition_size=8192,
)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name=None, train=True
)
# target_partition_size has a default value,
@@ -1009,12 +888,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
num_bits=4,
)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name=None, train=True
)
table.create_index(
@@ -1025,12 +899,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
)
expected_config = HnswPq(distance_type="dot")
mock_create_index.assert_called_with(
"my_vector",
replace=False,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"my_vector", replace=False, config=expected_config, name=None, train=True
)
table.create_index(
@@ -1045,12 +914,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
)
mock_create_index.assert_called_with(
"my_vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"my_vector", replace=True, config=expected_config, name=None, train=True
)
table.create_index(
@@ -1065,12 +929,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
)
mock_create_index.assert_called_with(
"my_vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"my_vector", replace=True, config=expected_config, name=None, train=True
)
@@ -1094,7 +953,6 @@ def test_create_index_name_and_train_parameters(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name="my_custom_index",
train=True,
)
@@ -1102,82 +960,13 @@ def test_create_index_name_and_train_parameters(
# Test with train=False
table.create_index(vector_column_name="vector", train=False)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=False,
"vector", replace=True, config=expected_config, name=None, train=False
)
# Test with both name and train
table.create_index(vector_column_name="vector", name="my_index_name", train=True)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name="my_index_name",
train=True,
)
@patch("lancedb.table.AsyncTable.create_index")
def test_create_index_legacy_emits_deprecation_warning(
mock_create_index, mem_db: DBConnection
):
table = mem_db.create_table(
"test",
data=[{"vector": [3.1, 4.1]}, {"vector": [5.9, 26.5]}],
)
with pytest.warns(DeprecationWarning, match="create_index"):
table.create_index(metric="l2", num_partitions=8, vector_column_name="vector")
@patch("lancedb.table.AsyncTable.create_index")
def test_create_index_new_api(mock_create_index, mem_db: DBConnection):
table = mem_db.create_table(
"test",
data=[
{"vector": [3.1, 4.1], "category": "a", "text": "hello world"},
{"vector": [5.9, 26.5], "category": "b", "text": "goodbye"},
],
)
# Vector index via new API should not warn
with warnings.catch_warnings():
warnings.simplefilter("error", DeprecationWarning)
table.create_index("vector", config=IvfPq(distance_type="l2"))
mock_create_index.assert_called_with(
"vector",
replace=True,
config=IvfPq(distance_type="l2"),
wait_timeout=None,
name=None,
train=True,
)
# Scalar index via new API
table.create_index("category", config=BTree())
mock_create_index.assert_called_with(
"category",
replace=True,
config=BTree(),
wait_timeout=None,
name=None,
train=True,
)
# FTS index via new API
table.create_index("text", config=FTS(with_position=True))
mock_create_index.assert_called_with(
"text",
replace=True,
config=FTS(with_position=True),
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name="my_index_name", train=True
)
@@ -1993,9 +1782,8 @@ def test_create_scalar_index(mem_db: DBConnection):
"my_table",
data=test_data,
)
# Test with default name; confirm DeprecationWarning fires
with pytest.warns(DeprecationWarning, match="create_scalar_index"):
table.create_scalar_index("x")
# Test with default name
table.create_scalar_index("x")
indices = table.list_indices()
assert len(indices) == 1
scalar_index = indices[0]
@@ -2023,59 +1811,6 @@ def test_create_scalar_index(mem_db: DBConnection):
assert scalar_index.name == "custom_y_index"
def test_create_index_nested_field_paths(mem_db: DBConnection):
schema = pa.schema(
[
pa.field("metadata", pa.struct([pa.field("user_id", pa.int32())])),
pa.field(
"image",
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
),
]
)
data = pa.Table.from_pylist(
[
{
"metadata": {"user_id": i},
"image": {"embedding": [float(i), float(i + 1)]},
}
for i in range(256)
],
schema=schema,
)
table = mem_db.create_table("nested_index_paths", data=data)
table.create_scalar_index("metadata.user_id", name="metadata_user_id_idx")
table.create_index(
vector_column_name="image.embedding",
num_partitions=1,
num_sub_vectors=1,
name="image_embedding_idx",
)
indices = sorted(table.list_indices(), key=lambda idx: idx.name)
assert [(idx.name, idx.index_type, idx.columns) for idx in indices] == [
("image_embedding_idx", "IvfPq", ["image.embedding"]),
("metadata_user_id_idx", "BTree", ["metadata.user_id"]),
]
vector_results = (
table.search([0.0, 1.0], vector_column_name="image.embedding")
.limit(1)
.to_list()
)
assert len(vector_results) == 1
assert vector_results[0]["metadata"]["user_id"] == 0
default_vector_results = table.search([0.0, 1.0]).limit(1).to_list()
assert len(default_vector_results) == 1
assert default_vector_results[0]["metadata"]["user_id"] == 0
filtered_results = table.search().where("metadata.user_id = 42").limit(1).to_list()
assert len(filtered_results) == 1
assert filtered_results[0]["metadata"]["user_id"] == 42
def test_empty_query(mem_db: DBConnection):
table = mem_db.create_table(
"my_table",
@@ -2150,74 +1885,6 @@ def test_search_with_schema_inf_multiple_vector(mem_db: DBConnection):
table.search(q).limit(1).to_arrow()
def test_search_infers_single_nested_vector(mem_db: DBConnection):
schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field(
"image",
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
),
]
)
data = pa.Table.from_pylist(
[
{"id": 0, "image": {"embedding": [0.0, 1.0]}},
{"id": 1, "image": {"embedding": [10.0, 11.0]}},
],
schema=schema,
)
table = mem_db.create_table("nested_vector_default_search", data=data)
result = table.search([0.0, 1.0]).limit(1).to_list()
assert result[0]["id"] == 0
def test_search_nested_vector_multiple_candidates(mem_db: DBConnection):
schema = pa.schema(
[
pa.field(
"image",
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
),
pa.field(
"text",
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
),
]
)
data = pa.Table.from_pylist(
[
{
"image": {"embedding": [0.0, 1.0]},
"text": {"embedding": [2.0, 3.0]},
}
],
schema=schema,
)
table = mem_db.create_table("nested_vector_multiple_candidates", data=data)
with pytest.raises(ValueError, match="image.embedding.*text.embedding"):
table.search([0.0, 1.0]).limit(1).to_arrow()
def test_search_nested_vector_no_candidates(mem_db: DBConnection):
schema = pa.schema(
[
pa.field("id", pa.int32()),
pa.field("metadata", pa.struct([pa.field("label", pa.string())])),
]
)
data = pa.Table.from_pylist(
[{"id": 0, "metadata": {"label": "cat"}}],
schema=schema,
)
table = mem_db.create_table("nested_vector_no_candidates", data=data)
with pytest.raises(ValueError, match="no vector column"):
table.search([0.0, 1.0]).limit(1).to_arrow()
def test_compact_cleanup(tmp_db: DBConnection):
pytest.importorskip("lance")
table = tmp_db.create_table(

View File

@@ -1,15 +1,10 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import contextlib
import functools
import http.server
import json
import multiprocessing as mp
import pickle
import re
import sys
import threading
import lancedb
import pyarrow as pa
@@ -20,107 +15,6 @@ from lancedb.util import tbl_to_tensor
torch = pytest.importorskip("torch")
REMOTE_ROWS = list(range(100))
def _make_mock_http_handler(handler):
class MockLanceDBHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
handler(self)
def do_POST(self):
handler(self)
return MockLanceDBHandler
def _remote_schema_payload():
return {
"version": 1,
"schema": {
"fields": [
{"name": "a", "type": {"type": "int64"}, "nullable": False},
]
},
}
def _offsets_from_filter(filter_sql: str | None) -> list[int]:
if filter_sql is None:
return REMOTE_ROWS
match = re.search(r"_rowoffset in \((.*?)\)", filter_sql)
if match is None:
return REMOTE_ROWS
raw_offsets = match.group(1).strip()
if raw_offsets == "":
return []
return [int(offset.strip()) for offset in raw_offsets.split(",")]
def _remote_dataset_handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(json.dumps(_remote_schema_payload()).encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(str(len(REMOTE_ROWS)).encode())
elif request.path == "/v1/table/test/query/":
content_len = int(request.headers.get("Content-Length"))
body = json.loads(request.rfile.read(content_len))
offsets = _offsets_from_filter(body.get("filter"))
requested_columns = body.get("columns") or ["a"]
if isinstance(requested_columns, dict):
requested_columns = list(requested_columns)
data = {}
for column in requested_columns:
if column == "a":
data[column] = [REMOTE_ROWS[offset] for offset in offsets]
elif column == "_rowoffset":
data[column] = offsets
elif column == "_rowid":
data[column] = offsets
table = pa.table(data)
request.send_response(200)
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
request.end_headers()
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
writer.write_table(table)
else:
request.send_response(404)
request.end_headers()
@contextlib.contextmanager
def _remote_dataset_table():
with http.server.ThreadingHTTPServer(
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
) as server:
port = server.server_address[1]
handle = threading.Thread(target=server.serve_forever)
handle.start()
try:
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
yield db.open_table("test")
finally:
server.shutdown()
handle.join()
def _open_native_table(uri: str, table_name: str):
"""Top-level connection factory used by the explicit-factory pickle test.
@@ -213,39 +107,6 @@ def test_permutation_dataloader_multiprocessing(tmp_db):
assert seen == 1000
def test_remote_table_dataloader_multiprocessing():
with _remote_dataset_table() as table:
dataloader = torch.utils.data.DataLoader(
table,
collate_fn=tbl_to_tensor,
batch_size=10,
num_workers=2,
multiprocessing_context="spawn",
)
seen = 0
for batch in dataloader:
assert batch.size(0) == 1
assert batch.size(1) == 10
seen += batch.size(1)
assert seen == len(REMOTE_ROWS)
def test_remote_permutation_dataloader_multiprocessing():
with _remote_dataset_table() as table:
permutation = Permutation.identity(table)
dataloader = torch.utils.data.DataLoader(
permutation,
batch_size=10,
num_workers=2,
multiprocessing_context="spawn",
)
seen = 0
for batch in dataloader:
assert batch["a"].size(0) == 10
seen += batch["a"].size(0)
assert seen == len(REMOTE_ROWS)
def test_permutation_pickle_with_connection_factory(tmp_path):
"""When the user provides a connection_factory, pickling should round-trip
through that factory rather than introspecting the connection URI. Useful
@@ -310,35 +171,6 @@ def _multiworker_dataloader_target(db_uri: str, result_queue):
result_queue.put(count)
def _remote_multiworker_dataloader_target(port: int, result_queue):
import lancedb
from lancedb.permutation import Permutation
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
table = db.open_table("test")
permutation = Permutation.identity(table)
dataloader = torch.utils.data.DataLoader(
permutation,
batch_size=10,
num_workers=2,
multiprocessing_context="fork",
)
count = 0
for batch in dataloader:
assert batch["a"].size(0) == 10
count += 1
result_queue.put(count)
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
@@ -376,46 +208,3 @@ def test_permutation_dataloader_fork_workers(tmp_path):
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no batches"
assert queue.get() == 100
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
"fork() is unavailable on Windows and unsafe on macOS "
"(Apple frameworks/TLS are not fork-safe)"
),
)
def test_remote_permutation_dataloader_fork_workers():
with http.server.ThreadingHTTPServer(
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
) as server:
port = server.server_address[1]
handle = threading.Thread(target=server.serve_forever)
handle.start()
try:
ctx = mp.get_context("spawn")
queue = ctx.Queue()
proc = ctx.Process(
target=_remote_multiworker_dataloader_target,
args=(port, queue),
)
proc.start()
proc.join(timeout=30)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
pytest.fail(
"Remote permutation hung when iterated in a fork-based "
"DataLoader worker"
)
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no batches"
assert queue.get() == 10
finally:
server.shutdown()
handle.join()

View File

@@ -395,17 +395,12 @@ impl Connection {
future_into_py(py, async move {
use lance_namespace::models::CreateNamespaceRequest;
// Mode is now a string field
let mode_str = mode
.map(|m| match m.to_lowercase().as_str() {
"create" => Ok("Create".to_string()),
"exist_ok" => Ok("ExistOk".to_string()),
"overwrite" => Ok("Overwrite".to_string()),
_ => Err(PyValueError::new_err(format!(
"Invalid mode {:?}: expected one of 'create', 'exist_ok', 'overwrite'",
m
))),
})
.transpose()?;
let mode_str = mode.and_then(|m| match m.to_lowercase().as_str() {
"create" => Some("Create".to_string()),
"exist_ok" => Some("ExistOk".to_string()),
"overwrite" => Some("Overwrite".to_string()),
_ => None,
});
let request = CreateNamespaceRequest {
id: Some(namespace_path),
mode: mode_str,
@@ -433,26 +428,16 @@ impl Connection {
future_into_py(py, async move {
use lance_namespace::models::DropNamespaceRequest;
// Mode and Behavior are now string fields
let mode_str = mode
.map(|m| match m.to_uppercase().as_str() {
"SKIP" => Ok("Skip".to_string()),
"FAIL" => Ok("Fail".to_string()),
_ => Err(PyValueError::new_err(format!(
"Invalid mode {:?}: expected one of 'skip', 'fail'",
m
))),
})
.transpose()?;
let behavior_str = behavior
.map(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Ok("Restrict".to_string()),
"CASCADE" => Ok("Cascade".to_string()),
_ => Err(PyValueError::new_err(format!(
"Invalid behavior {:?}: expected one of 'restrict', 'cascade'",
b
))),
})
.transpose()?;
let mode_str = mode.and_then(|m| match m.to_uppercase().as_str() {
"SKIP" => Some("Skip".to_string()),
"FAIL" => Some("Fail".to_string()),
_ => None,
});
let behavior_str = behavior.and_then(|b| match b.to_uppercase().as_str() {
"RESTRICT" => Some("Restrict".to_string()),
"CASCADE" => Some("Cascade".to_string()),
_ => None,
});
let request = DropNamespaceRequest {
id: Some(namespace_path),
mode: mode_str,

View File

@@ -8,9 +8,7 @@
//! DataFusion [`Expr`] nodes, bypassing SQL string parsing.
use arrow::{datatypes::DataType, pyarrow::PyArrowType};
use datafusion_common::ScalarValue;
use lancedb::expr::{DfExpr, col as ldb_col, contains, expr_cast, lit as df_lit, lower, upper};
use pyo3::types::PyBytes;
use pyo3::{Bound, PyAny, PyResult, exceptions::PyValueError, prelude::*, pyfunction};
/// A type-safe DataFusion expression.
@@ -143,7 +141,7 @@ pub fn expr_col(name: &str) -> PyExpr {
/// Create a literal value expression.
///
/// Supported Python types: `bool`, `int`, `float`, `str`, `bytes`.
/// Supported Python types: `bool`, `int`, `float`, `str`.
#[pyfunction]
pub fn expr_lit(value: Bound<'_, PyAny>) -> PyResult<PyExpr> {
// bool must be checked before int because bool is a subclass of int in Python
@@ -159,12 +157,8 @@ pub fn expr_lit(value: Bound<'_, PyAny>) -> PyResult<PyExpr> {
if let Ok(s) = value.extract::<String>() {
return Ok(PyExpr(df_lit(s)));
}
if value.is_instance_of::<PyBytes>() {
let bytes = value.extract::<Vec<u8>>()?;
return Ok(PyExpr(df_lit(ScalarValue::Binary(Some(bytes)))));
}
Err(PyValueError::new_err(format!(
"unsupported literal type: {}. Supported: bool, int, float, str, bytes",
"unsupported literal type: {}. Supported: bool, int, float, str",
value.get_type().name()?
)))
}

View File

@@ -15,8 +15,8 @@ use pyo3::{
use query::{FTSQuery, HybridQuery, Query, VectorQuery};
use session::Session;
use table::{
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
MergeResult, Table, UpdateResult,
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, MergeResult,
Table, UpdateResult,
};
pub mod arrow;
@@ -52,7 +52,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<AlterColumnsResult>()?;
m.add_class::<AddResult>()?;
m.add_class::<MergeResult>()?;
m.add_class::<LsmWriteSpec>()?;
m.add_class::<DeleteResult>()?;
m.add_class::<DropColumnsResult>()?;
m.add_class::<UpdateResult>()?;

View File

@@ -23,7 +23,7 @@ use lancedb::query::QueryBase;
use lancedb::query::QueryExecutionOptions;
use lancedb::query::QueryFilter;
use lancedb::query::{
ColumnOrdering, ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
ExecutableQuery, Query as LanceDbQuery, Select, TakeQuery as LanceDbTakeQuery,
VectorQuery as LanceDbVectorQuery,
};
use lancedb::table::AnyQuery;
@@ -207,48 +207,6 @@ impl<'py> IntoPyObject<'py> for PyLanceDB<FtsQuery> {
#[derive(Clone)]
pub struct PyQueryVectors(Vec<Arc<dyn Array>>);
#[derive(Clone, FromPyObject)]
#[pyo3(from_item_all)]
pub struct PyColumnOrdering {
pub column_name: String,
pub ascending: bool,
pub nulls_first: bool,
}
impl From<ColumnOrdering> for PyColumnOrdering {
fn from(ordering: ColumnOrdering) -> Self {
Self {
column_name: ordering.column_name,
ascending: ordering.ascending,
nulls_first: ordering.nulls_first,
}
}
}
impl From<PyColumnOrdering> for ColumnOrdering {
fn from(ordering: PyColumnOrdering) -> Self {
Self {
column_name: ordering.column_name,
ascending: ordering.ascending,
nulls_first: ordering.nulls_first,
}
}
}
impl<'py> IntoPyObject<'py> for PyColumnOrdering {
type Target = PyDict;
type Output = Bound<'py, Self::Target>;
type Error = PyErr;
fn into_pyobject(self, py: pyo3::Python<'py>) -> PyResult<Self::Output> {
let dict = PyDict::new(py);
dict.set_item("column_name", self.column_name)?;
dict.set_item("ascending", self.ascending)?;
dict.set_item("nulls_first", self.nulls_first)?;
Ok(dict)
}
}
impl<'py> IntoPyObject<'py> for PyQueryVectors {
type Target = PyList;
type Output = Bound<'py, Self::Target>;
@@ -288,7 +246,6 @@ pub struct PyQueryRequest {
pub bypass_vector_index: Option<bool>,
pub postfilter: Option<bool>,
pub norm: Option<String>,
pub order_by: Option<Vec<PyColumnOrdering>>,
}
impl From<AnyQuery> for PyQueryRequest {
@@ -316,9 +273,6 @@ impl From<AnyQuery> for PyQueryRequest {
bypass_vector_index: None,
postfilter: None,
norm: None,
order_by: query_request
.order_by
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
},
AnyQuery::VectorQuery(vector_query) => Self {
limit: vector_query.base.limit,
@@ -343,10 +297,6 @@ impl From<AnyQuery> for PyQueryRequest {
bypass_vector_index: Some(!vector_query.use_index),
postfilter: Some(!vector_query.base.prefilter),
norm: vector_query.base.norm.map(|n| n.to_string()),
order_by: vector_query
.base
.order_by
.map(|order_by| order_by.into_iter().map(PyColumnOrdering::from).collect()),
},
}
}
@@ -525,13 +475,6 @@ impl Query {
})
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
#[pyo3(signature = ())]
pub fn output_schema(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
@@ -704,13 +647,6 @@ impl FTSQuery {
self.inner = self.inner.clone().offset(offset as usize);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search();
}
@@ -846,13 +782,6 @@ impl VectorQuery {
self.inner = self.inner.clone().offset(offset as usize);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
let ordering =
ordering.map(|ordering| ordering.into_iter().map(ColumnOrdering::from).collect());
self.inner = self.inner.clone().order_by(ordering);
Ok(())
}
pub fn fast_search(&mut self) {
self.inner = self.inner.clone().fast_search();
}
@@ -1025,12 +954,6 @@ impl HybridQuery {
self.inner_fts.offset(offset);
}
pub fn order_by(&mut self, ordering: Option<Vec<PyColumnOrdering>>) -> PyResult<()> {
self.inner_vec.order_by(ordering.clone())?;
self.inner_fts.order_by(ordering)?;
Ok(())
}
pub fn fast_search(&mut self) {
self.inner_vec.fast_search();
self.inner_fts.fast_search();

View File

@@ -143,20 +143,18 @@ pub struct MergeResult {
pub num_inserted_rows: u64,
pub num_deleted_rows: u64,
pub num_attempts: u32,
pub num_rows: u64,
}
#[pymethods]
impl MergeResult {
pub fn __repr__(&self) -> String {
format!(
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={}, num_rows={})",
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={})",
self.version,
self.num_updated_rows,
self.num_inserted_rows,
self.num_deleted_rows,
self.num_attempts,
self.num_rows
self.num_attempts
)
}
}
@@ -169,152 +167,10 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_inserted_rows: result.num_inserted_rows,
num_deleted_rows: result.num_deleted_rows,
num_attempts: result.num_attempts,
num_rows: result.num_rows,
}
}
}
/// Specification selecting Lance's MemWAL LSM-style write path for
/// `merge_insert`.
///
/// Constructed via the `bucket(...)`, `identity(...)`, or `unsharded()`
/// classmethods, then optionally chain `with_maintained_indexes(...)` and
/// `with_writer_config_defaults(...)`.
#[pyclass(from_py_object)]
#[derive(Clone, Debug)]
pub struct LsmWriteSpec {
inner: lancedb::table::LsmWriteSpec,
}
#[pymethods]
impl LsmWriteSpec {
/// Hash-bucket sharding by the unenforced primary key column.
#[staticmethod]
pub fn bucket(column: String, num_buckets: u32) -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::bucket(column, num_buckets),
}
}
/// Identity sharding — shard by the raw value of `column`.
///
/// `column` must be a deterministic function of the unenforced primary
/// key: every row with a given primary key must always produce the same
/// `column` value, or upserts of that key can land in different shards
/// and a stale version can win. Typically `column` is the primary key
/// itself or a stable attribute of it.
#[staticmethod]
pub fn identity(column: String) -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::identity(column),
}
}
/// No sharding — every `merge_insert` call writes to a single
/// MemWAL shard.
#[staticmethod]
pub fn unsharded() -> Self {
Self {
inner: lancedb::table::LsmWriteSpec::unsharded(),
}
}
/// Replace the list of indexes the MemWAL should keep up to date as
/// rows are appended. Each name must reference an index that
/// already exists on the table at the time `set_lsm_write_spec`
/// is called.
pub fn with_maintained_indexes(&self, indexes: Vec<String>) -> Self {
Self {
inner: self.inner.clone().with_maintained_indexes(indexes),
}
}
/// Replace the default `ShardWriter` configuration recorded in the
/// MemWAL index, so every writer starts from the same defaults.
pub fn with_writer_config_defaults(&self, defaults: HashMap<String, String>) -> Self {
Self {
inner: self.inner.clone().with_writer_config_defaults(defaults),
}
}
pub fn __repr__(&self) -> String {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket {
column,
num_buckets,
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.bucket(column={:?}, num_buckets={}, maintained_indexes={:?}, writer_config_defaults={:?})",
column, num_buckets, maintained_indexes, writer_config_defaults,
),
lancedb::table::LsmWriteSpec::Identity {
column,
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.identity(column={:?}, maintained_indexes={:?}, writer_config_defaults={:?})",
column, maintained_indexes, writer_config_defaults,
),
lancedb::table::LsmWriteSpec::Unsharded {
maintained_indexes,
writer_config_defaults,
} => format!(
"LsmWriteSpec.unsharded(maintained_indexes={:?}, writer_config_defaults={:?})",
maintained_indexes, writer_config_defaults,
),
}
}
/// Discriminator string identifying the variant ("bucket", "identity",
/// or "unsharded").
#[getter]
pub fn spec_type(&self) -> &'static str {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { .. } => "bucket",
lancedb::table::LsmWriteSpec::Identity { .. } => "identity",
lancedb::table::LsmWriteSpec::Unsharded { .. } => "unsharded",
}
}
/// Bucket and identity variants: the sharding column. `None` for unsharded.
#[getter]
pub fn column(&self) -> Option<String> {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { column, .. }
| lancedb::table::LsmWriteSpec::Identity { column, .. } => Some(column.clone()),
lancedb::table::LsmWriteSpec::Unsharded { .. } => None,
}
}
/// Bucket variant only: the number of buckets.
#[getter]
pub fn num_buckets(&self) -> Option<u32> {
match &self.inner {
lancedb::table::LsmWriteSpec::Bucket { num_buckets, .. } => Some(*num_buckets),
_ => None,
}
}
/// Names of indexes the MemWAL should keep up to date during writes.
#[getter]
pub fn maintained_indexes(&self) -> Vec<String> {
self.inner.maintained_indexes().to_vec()
}
/// Default `ShardWriter` configuration recorded by this spec.
#[getter]
pub fn writer_config_defaults(&self) -> HashMap<String, String> {
self.inner.writer_config_defaults().clone()
}
}
impl From<LsmWriteSpec> for lancedb::table::LsmWriteSpec {
fn from(spec: LsmWriteSpec) -> Self {
spec.inner
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddColumnsResult {
@@ -755,23 +611,6 @@ impl Table {
})
}
pub fn _table_reopen_state(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let name = inner.name().to_string();
let namespace_path = inner.namespace().to_vec();
let storage_options = inner.initial_storage_options().await;
Python::attach(|py| {
let dict = PyDict::new(py);
dict.set_item("name", name)?;
dict.set_item("namespace_path", namespace_path)?;
dict.set_item("storage_options", storage_options)?;
Ok(dict.unbind())
})
})
}
pub fn __repr__(&self) -> String {
match &self.inner {
None => format!("ClosedTable({})", self.name),
@@ -959,12 +798,6 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(use_lsm_write) = parameters.use_lsm_write {
builder.use_lsm_write(use_lsm_write);
}
if let Some(validate_single_shard) = parameters.validate_single_shard {
builder.validate_single_shard(validate_single_shard);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -972,44 +805,6 @@ impl Table {
})
}
pub fn set_unenforced_primary_key<'a>(
self_: PyRef<'a, Self>,
columns: Vec<String>,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner
.set_unenforced_primary_key(columns)
.await
.infer_error()
})
}
pub fn set_lsm_write_spec<'a>(
self_: PyRef<'a, Self>,
spec: LsmWriteSpec,
) -> PyResult<Bound<'a, PyAny>> {
let inner = self_.inner_ref()?.clone();
let native_spec = lancedb::table::LsmWriteSpec::from(spec);
future_into_py(self_.py(), async move {
inner.set_lsm_write_spec(native_spec).await.infer_error()
})
}
pub fn unset_lsm_write_spec(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.unset_lsm_write_spec().await.infer_error()
})
}
pub fn close_lsm_writers(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.close_lsm_writers().await.infer_error()
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
@@ -1163,8 +958,6 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
use_lsm_write: Option<bool>,
validate_single_shard: Option<bool>,
}
#[pyclass]

View File

@@ -33,14 +33,6 @@ class TestExprConstruction:
e = lit(True)
assert isinstance(e, Expr)
def test_lit_bytes(self):
e = lit(b"\xde\xad\xbe\xef")
assert isinstance(e, Expr)
def test_lit_bytes_empty(self):
e = lit(b"")
assert isinstance(e, Expr)
def test_lit_unsupported_type_raises(self):
with pytest.raises(Exception):
lit([1, 2, 3])
@@ -143,43 +135,6 @@ class TestExprOperators:
assert e.to_sql() == "(name = 'alice')"
class TestExprBytesLiteral:
def test_bytes_to_sql(self):
e = lit(b"\xde\xad\xbe\xef")
assert e.to_sql() == "X'DEADBEEF'"
def test_empty_bytes_to_sql(self):
e = lit(b"")
assert e.to_sql() == "X''"
def test_bytes_repr(self):
e = lit(b"\x01\x02")
assert repr(e) == "Expr(X'0102')"
def test_bytes_equality_expr_sql(self):
e = col("data") == lit(b"\xca\xfe")
assert e.to_sql() == "(data = X'CAFE')"
def test_bytes_ne_expr_sql(self):
e = col("data") != lit(b"\xff")
assert e.to_sql() == "(data <> X'FF')"
def test_bytes_compound_expr_sql(self):
e = (col("data") == lit(b"\x01")) & (col("id") > lit(5))
assert e.to_sql() == "((data = X'01') AND (id > 5))"
def test_bytes_in_function_call(self):
# Regression test: binary literals inside scalar function calls
# used to fail because DataFusion's unparser does not support Binary
# scalars. Now handled via a placeholder-substitution rewrite.
e = func("contains", col("data"), lit(b"\xff"))
assert e.to_sql() == "contains(data, X'FF')"
def test_bytes_in_not(self):
e = ~(col("data") == lit(b"\xff"))
assert e.to_sql() == "NOT (data = X'FF')"
class TestExprStringMethods:
def test_lower(self):
e = col("name").lower()
@@ -430,44 +385,3 @@ class TestColNamingIntegration:
)
assert "upper_name" in result.schema.names
assert sorted(result["upper_name"].to_pylist()) == ["ALICE", "BOB", "CHARLIE"]
# ── bytes / binary column integration tests ───────────────────────────────────
@pytest.fixture
def binary_table(tmp_path):
db = lancedb.connect(str(tmp_path))
data = pa.table(
{
"id": [1, 2, 3],
"payload": pa.array(
[b"\x01\x02", b"\xca\xfe", b"\xff\x00"],
type=pa.binary(),
),
}
)
return db.create_table("binary_test", data)
class TestExprBytesIntegration:
def test_binary_equality_filter(self, binary_table):
result = (
binary_table.search().where(col("payload") == lit(b"\xca\xfe")).to_arrow()
)
assert result.num_rows == 1
assert result["id"][0].as_py() == 2
def test_binary_ne_filter(self, binary_table):
result = (
binary_table.search().where(col("payload") != lit(b"\x01\x02")).to_arrow()
)
assert result.num_rows == 2
def test_binary_compound_filter(self, binary_table):
result = (
binary_table.search()
.where((col("payload") == lit(b"\x01\x02")) | (col("id") == lit(3)))
.to_arrow()
)
assert result.num_rows == 2

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.95.0"
channel = "1.94.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.30.1-beta.0"
version = "0.28.0-beta.11"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -75,7 +75,7 @@ reqwest = { version = "0.12.0", default-features = false, features = [
"stream",
], optional = true }
http = { version = "1", optional = true } # Matching what is in reqwest
uuid = { version = "1.7.0", features = ["v4", "v5"] }
uuid = { version = "1.7.0", features = ["v4"] }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
hf-hub = { version = "0.4.1", optional = true, default-features = false, features = [
@@ -104,7 +104,6 @@ datafusion.workspace = true
http-body = "1" # Matching reqwest
rstest = "0.23.0"
test-log = "0.2"
serial_test = "3"
[features]

Some files were not shown because too many files have changed in this diff Show More