mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-31 19:00:39 +00:00
Compare commits
31 Commits
dependabot
...
sdk-parity
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b5e521c4a1 | ||
|
|
5638907fa5 | ||
|
|
048f52c2aa | ||
|
|
458dcabbd2 | ||
|
|
60ac5c9a7c | ||
|
|
d05fe8ec44 | ||
|
|
ab982d7f65 | ||
|
|
a3339b7bdd | ||
|
|
b20cdc4f93 | ||
|
|
e77a62e35a | ||
|
|
a9f49c8150 | ||
|
|
a7d9f2e99d | ||
|
|
7dba793629 | ||
|
|
87bd6694b6 | ||
|
|
15e75804c4 | ||
|
|
df2b6d3dd4 | ||
|
|
ccec91d957 | ||
|
|
ec82e36317 | ||
|
|
da2a1c4a2c | ||
|
|
8463a10ebe | ||
|
|
7168d64af1 | ||
|
|
403c33dff0 | ||
|
|
a0001043b6 | ||
|
|
1bb7acb74f | ||
|
|
4ce175276c | ||
|
|
4bccb43e56 | ||
|
|
d5dc4c0f06 | ||
|
|
55ae6197c1 | ||
|
|
15bd821825 | ||
|
|
cf162c8a10 | ||
|
|
2eba7ebd02 |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.29.1-beta.0"
|
||||
current_version = "0.30.0-beta.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -29,7 +29,3 @@ runs:
|
||||
args: ${{ inputs.args }}
|
||||
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
|
||||
working-directory: python
|
||||
- uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: windows-wheels
|
||||
path: python\target\wheels
|
||||
|
||||
110
.github/workflows/pypi-publish.yml
vendored
110
.github/workflows/pypi-publish.yml
vendored
@@ -8,6 +8,9 @@ on:
|
||||
# This should trigger a dry run (we skip the final publish step)
|
||||
paths:
|
||||
- .github/workflows/pypi-publish.yml
|
||||
- .github/workflows/build_linux_wheel/action.yml
|
||||
- .github/workflows/build_mac_wheel/action.yml
|
||||
- .github/workflows/build_windows_wheel/action.yml
|
||||
- Cargo.toml # Change in dependency frequently breaks builds
|
||||
- Cargo.lock
|
||||
|
||||
@@ -21,32 +24,21 @@ jobs:
|
||||
linux:
|
||||
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
|
||||
timeout-minutes: 60
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
strategy:
|
||||
matrix:
|
||||
config:
|
||||
- platform: x86_64
|
||||
manylinux: "2_17"
|
||||
extra_args: ""
|
||||
runner: ubuntu-22.04
|
||||
- platform: x86_64
|
||||
manylinux: "2_28"
|
||||
extra_args: "--features fp16kernels"
|
||||
runner: ubuntu-22.04
|
||||
- platform: aarch64
|
||||
manylinux: "2_17"
|
||||
extra_args: ""
|
||||
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
|
||||
runner: ubuntu-2404-8x-arm64
|
||||
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
|
||||
- platform: aarch64
|
||||
manylinux: "2_28"
|
||||
extra_args: "--features fp16kernels"
|
||||
runner: ubuntu-2404-8x-arm64
|
||||
runs-on: ${{ matrix.config.runner }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -60,15 +52,14 @@ jobs:
|
||||
args: "--release --strip ${{ matrix.config.extra_args }}"
|
||||
arm-build: ${{ matrix.config.platform == 'aarch64' }}
|
||||
manylinux: ${{ matrix.config.manylinux }}
|
||||
- uses: ./.github/workflows/upload_wheel
|
||||
- uses: actions/upload-artifact@v7
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
with:
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
name: wheels-linux-${{ matrix.config.platform }}-${{ matrix.config.manylinux }}
|
||||
path: target/wheels/lancedb-*.whl
|
||||
if-no-files-found: error
|
||||
mac:
|
||||
timeout-minutes: 90
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
runs-on: ${{ matrix.config.runner }}
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -78,7 +69,7 @@ jobs:
|
||||
env:
|
||||
MACOSX_DEPLOYMENT_TARGET: 10.15
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -90,18 +81,21 @@ jobs:
|
||||
with:
|
||||
python-minor-version: 10
|
||||
args: "--release --strip --target ${{ matrix.config.target }} --features fp16kernels"
|
||||
- uses: ./.github/workflows/upload_wheel
|
||||
- uses: actions/upload-artifact@v7
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
with:
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
name: wheels-mac-${{ matrix.config.target }}
|
||||
path: target/wheels/lancedb-*.whl
|
||||
if-no-files-found: error
|
||||
windows:
|
||||
timeout-minutes: 60
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
timeout-minutes: 90
|
||||
runs-on: windows-latest
|
||||
env:
|
||||
# link.exe is single-threaded and the long pole on Windows builds. Use
|
||||
# rustc's bundled lld-link instead.
|
||||
CARGO_TARGET_X86_64_PC_WINDOWS_MSVC_LINKER: rust-lld
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -113,18 +107,70 @@ jobs:
|
||||
with:
|
||||
python-minor-version: 10
|
||||
args: "--release --strip"
|
||||
vcpkg_token: ${{ secrets.VCPKG_GITHUB_PACKAGES }}
|
||||
- uses: ./.github/workflows/upload_wheel
|
||||
- uses: actions/upload-artifact@v7
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
with:
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
name: wheels-windows
|
||||
path: target/wheels/lancedb-*.whl
|
||||
if-no-files-found: error
|
||||
publish:
|
||||
name: Publish wheels
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
needs: [linux, mac, windows]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- name: Download wheel artifacts
|
||||
uses: actions/download-artifact@v8
|
||||
with:
|
||||
pattern: wheels-*
|
||||
path: target/wheels
|
||||
merge-multiple: true
|
||||
- name: List wheels
|
||||
run: ls -la target/wheels
|
||||
- name: Choose repo
|
||||
id: choose_repo
|
||||
run: |
|
||||
if [[ ${{ github.ref }} == *beta* ]]; then
|
||||
echo "repo=fury" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "repo=pypi" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
- name: Publish to Fury
|
||||
if: steps.choose_repo.outputs.repo == 'fury'
|
||||
env:
|
||||
FURY_TOKEN: ${{ secrets.FURY_TOKEN }}
|
||||
run: |
|
||||
shopt -s nullglob
|
||||
WHEELS=(target/wheels/lancedb-*.whl)
|
||||
if [[ ${#WHEELS[@]} -eq 0 ]]; then
|
||||
echo "No wheels found in target/wheels/" >&2
|
||||
exit 1
|
||||
fi
|
||||
for WHEEL in "${WHEELS[@]}"; do
|
||||
echo "Uploading $WHEEL to Fury"
|
||||
curl -f -F package=@"$WHEEL" "https://$FURY_TOKEN@push.fury.io/lancedb/"
|
||||
done
|
||||
# NOTE: pypa/gh-action-pypi-publish must be invoked directly from a
|
||||
# workflow file, not from inside a composite action. When called from a
|
||||
# composite, `github.action_repository` is empty (actions/runner#2473)
|
||||
# and the action falls back to `github.repository`, producing a bogus
|
||||
# `docker://ghcr.io/<repo>:<ref>` image reference that GHA tries to pull.
|
||||
- name: Publish to PyPI
|
||||
if: steps.choose_repo.outputs.repo == 'pypi'
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
packages-dir: target/wheels/
|
||||
gh-release:
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -187,13 +233,13 @@ jobs:
|
||||
report-failure:
|
||||
name: Report Workflow Failure
|
||||
runs-on: ubuntu-latest
|
||||
needs: [linux, mac, windows]
|
||||
needs: [linux, mac, windows, publish]
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
if: always() && failure() && startsWith(github.ref, 'refs/tags/python-v')
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/checkout@v6
|
||||
- uses: ./.github/actions/create-failure-issue
|
||||
with:
|
||||
job-results: ${{ toJSON(needs) }}
|
||||
|
||||
34
.github/workflows/upload_wheel/action.yml
vendored
34
.github/workflows/upload_wheel/action.yml
vendored
@@ -1,34 +0,0 @@
|
||||
name: upload-wheel
|
||||
|
||||
description: "Upload wheels to Pypi"
|
||||
inputs:
|
||||
fury_token:
|
||||
required: true
|
||||
description: "release token for the fury repo"
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Choose repo
|
||||
shell: bash
|
||||
id: choose_repo
|
||||
run: |
|
||||
if [[ ${{ github.ref }} == *beta* ]]; then
|
||||
echo "repo=fury" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "repo=pypi" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
- name: Publish to Fury
|
||||
if: steps.choose_repo.outputs.repo == 'fury'
|
||||
shell: bash
|
||||
env:
|
||||
FURY_TOKEN: ${{ inputs.fury_token }}
|
||||
run: |
|
||||
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
|
||||
echo "Uploading $WHEEL to Fury"
|
||||
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
|
||||
- name: Publish to PyPI
|
||||
if: steps.choose_repo.outputs.repo == 'pypi'
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
packages-dir: target/wheels/
|
||||
11
AGENTS.md
11
AGENTS.md
@@ -37,10 +37,13 @@ Before committing changes, run formatting for every language you touched. At min
|
||||
and run targeted tests through `cd python && uv run ...`.
|
||||
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
|
||||
|
||||
Before creating a PR, make sure the PR title follows Conventional Commits, such as
|
||||
`fix: support nested field paths in native index creation` or
|
||||
`feat(python): add dataset multiprocessing support`. The semantic-release check uses the
|
||||
PR title and body as the merge commit message, so a non-conventional PR title will fail CI.
|
||||
Before creating a PR, the exact value passed to `gh pr create --title` must follow
|
||||
Conventional Commits, such as `fix: support nested field paths in native index creation`
|
||||
or `feat(python): add dataset multiprocessing support`. Do not use a plain natural
|
||||
language summary like `Support nested field paths in native index creation` as the PR
|
||||
title. The semantic-release check uses the PR title and body as the merge commit message,
|
||||
so a non-conventional PR title will fail CI. After creating a PR, read the remote PR title
|
||||
back and fix it immediately if it is not conventional.
|
||||
|
||||
## Coding tips
|
||||
|
||||
|
||||
242
Cargo.lock
generated
242
Cargo.lock
generated
@@ -1399,6 +1399,12 @@ dependencies = [
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bytecount"
|
||||
version = "0.6.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck"
|
||||
version = "1.25.0"
|
||||
@@ -1522,9 +1528,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cedarwood"
|
||||
version = "0.4.6"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d910bedd62c24733263d0bed247460853c9d22e8956bd4cd964302095e04e90"
|
||||
checksum = "c0524a528a6a0288df1863c3c20fe92c301875b4941e7b6c4b394ab08c5a4c55"
|
||||
dependencies = [
|
||||
"smallvec",
|
||||
]
|
||||
@@ -3284,8 +3290,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.4",
|
||||
@@ -4077,6 +4083,21 @@ dependencies = [
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "icu_locale"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5a396343c7208121dc86e35623d3dfe19814a7613cfd14964994cdc9c9a2e26"
|
||||
dependencies = [
|
||||
"icu_collections",
|
||||
"icu_locale_core",
|
||||
"icu_locale_data",
|
||||
"icu_provider",
|
||||
"potential_utf",
|
||||
"tinystr",
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "icu_locale_core"
|
||||
version = "2.2.0"
|
||||
@@ -4085,11 +4106,18 @@ checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29"
|
||||
dependencies = [
|
||||
"displaydoc",
|
||||
"litemap",
|
||||
"serde",
|
||||
"tinystr",
|
||||
"writeable",
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "icu_locale_data"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5fdcc9ac77c6d74ff5cf6e65ef3181d6af32003b16fce3a77fb451d2f695993"
|
||||
|
||||
[[package]]
|
||||
name = "icu_normalizer"
|
||||
version = "2.2.0"
|
||||
@@ -4138,6 +4166,8 @@ checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421"
|
||||
dependencies = [
|
||||
"displaydoc",
|
||||
"icu_locale_core",
|
||||
"serde",
|
||||
"stable_deref_trait",
|
||||
"writeable",
|
||||
"yoke",
|
||||
"zerofrom",
|
||||
@@ -4145,6 +4175,27 @@ dependencies = [
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "icu_segmenter"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c0794db0b1a86193ac9c48768d0e6c52c54448e0870ad87907d456ee0dac964"
|
||||
dependencies = [
|
||||
"icu_collections",
|
||||
"icu_locale",
|
||||
"icu_provider",
|
||||
"icu_segmenter_data",
|
||||
"potential_utf",
|
||||
"utf8_iter",
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "icu_segmenter_data"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e4a2c462a4d927d512f5f882a033ddd62f33a05bb9f230d98f736ac3dc85938f"
|
||||
|
||||
[[package]]
|
||||
name = "id-arena"
|
||||
version = "2.3.0"
|
||||
@@ -4306,19 +4357,20 @@ checksum = "9028f49264629065d057f340a86acb84867925865f73bbf8d47b4d149a7e88b8"
|
||||
|
||||
[[package]]
|
||||
name = "jieba-macros"
|
||||
version = "0.9.0"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a29cfc5dcd898604c6f80363411fa6b6b08e27d1d253d6225b9cb6702ea02fc0"
|
||||
checksum = "46adade69b634535a8f495cf87710ed893cff53e1dbc9dd750c2ab81c5defb82"
|
||||
dependencies = [
|
||||
"phf_codegen 0.13.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jieba-rs"
|
||||
version = "0.9.0"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3245d6e9d1d5facbd6a23848d6b67e3439738ccbb4fa5a3d65da315ba1a910a2"
|
||||
checksum = "11b53580aaa8ec8b713da271da434f8947409242c537a9ab3f7b76bdbb19e8a9"
|
||||
dependencies = [
|
||||
"bytecount",
|
||||
"cedarwood",
|
||||
"jieba-macros",
|
||||
"phf 0.13.1",
|
||||
@@ -4506,8 +4558,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4525,6 +4577,7 @@ dependencies = [
|
||||
"async_cell",
|
||||
"aws-credential-types",
|
||||
"aws-sdk-dynamodb",
|
||||
"bitpacking",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -4551,9 +4604,11 @@ dependencies = [
|
||||
"lance-io",
|
||||
"lance-linalg",
|
||||
"lance-namespace",
|
||||
"lance-select",
|
||||
"lance-table",
|
||||
"lance-tokenizer",
|
||||
"log",
|
||||
"moka",
|
||||
"object_store",
|
||||
"permutation",
|
||||
"pin-project",
|
||||
@@ -4577,8 +4632,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4596,10 +4651,34 @@ dependencies = [
|
||||
"rand 0.9.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow-scalar"
|
||||
version = "58.0.0"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"arrow-cast",
|
||||
"arrow-data",
|
||||
"arrow-row",
|
||||
"arrow-schema",
|
||||
"half",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow-stats"
|
||||
version = "58.0.0"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
"lance-arrow-scalar",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4608,8 +4687,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4644,8 +4723,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4675,8 +4754,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4694,8 +4773,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4730,8 +4809,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4762,8 +4841,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4793,6 +4872,7 @@ dependencies = [
|
||||
"jieba-rs",
|
||||
"jsonb",
|
||||
"lance-arrow",
|
||||
"lance-arrow-stats",
|
||||
"lance-core",
|
||||
"lance-datafusion",
|
||||
"lance-datagen",
|
||||
@@ -4800,6 +4880,7 @@ dependencies = [
|
||||
"lance-file",
|
||||
"lance-io",
|
||||
"lance-linalg",
|
||||
"lance-select",
|
||||
"lance-table",
|
||||
"lance-tokenizer",
|
||||
"libm",
|
||||
@@ -4827,8 +4908,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4870,8 +4951,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4887,8 +4968,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -4900,8 +4981,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -4936,9 +5017,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-reqwest-client"
|
||||
version = "0.7.6"
|
||||
version = "0.7.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f65e31bdaa13e01dab6e7cf566da31df243c34a542f0d915d3601ec0e01e61d2"
|
||||
checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99"
|
||||
dependencies = [
|
||||
"reqwest 0.12.28",
|
||||
"serde",
|
||||
@@ -4948,10 +5029,25 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-select"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"deepsize",
|
||||
"itertools 0.13.0",
|
||||
"lance-core",
|
||||
"roaring",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4970,6 +5066,7 @@ dependencies = [
|
||||
"lance-core",
|
||||
"lance-file",
|
||||
"lance-io",
|
||||
"lance-select",
|
||||
"log",
|
||||
"object_store",
|
||||
"prost",
|
||||
@@ -4990,8 +5087,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -5002,9 +5099,10 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-tokenizer"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.2.0-beta.1"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
|
||||
dependencies = [
|
||||
"icu_segmenter",
|
||||
"jieba-rs",
|
||||
"lindera",
|
||||
"rust-stemmers",
|
||||
@@ -5014,7 +5112,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.1"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
@@ -5084,6 +5182,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"serial_test",
|
||||
"snafu 0.8.9",
|
||||
"tempfile",
|
||||
"test-log",
|
||||
@@ -5096,7 +5195,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -5119,7 +5218,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.32.1-beta.0"
|
||||
version = "0.33.0-beta.1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -6934,6 +7033,8 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564"
|
||||
dependencies = [
|
||||
"serde_core",
|
||||
"writeable",
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
@@ -8128,6 +8229,15 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scc"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
|
||||
dependencies = [
|
||||
"sdd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.29"
|
||||
@@ -8194,6 +8304,12 @@ dependencies = [
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sdd"
|
||||
version = "3.0.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
|
||||
|
||||
[[package]]
|
||||
name = "sec1"
|
||||
version = "0.3.0"
|
||||
@@ -8384,6 +8500,32 @@ dependencies = [
|
||||
"unsafe-libyaml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f"
|
||||
dependencies = [
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"log",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"scc",
|
||||
"serial_test_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test_derive"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.6"
|
||||
@@ -8406,6 +8548,12 @@ dependencies = [
|
||||
"digest 0.11.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1_smol"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.10.9"
|
||||
@@ -9125,6 +9273,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d"
|
||||
dependencies = [
|
||||
"displaydoc",
|
||||
"serde_core",
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
@@ -9629,6 +9778,7 @@ dependencies = [
|
||||
"getrandom 0.4.2",
|
||||
"js-sys",
|
||||
"serde_core",
|
||||
"sha1_smol",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
@@ -10592,6 +10742,7 @@ dependencies = [
|
||||
"displaydoc",
|
||||
"yoke",
|
||||
"zerofrom",
|
||||
"zerovec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -10600,6 +10751,7 @@ version = "0.11.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"yoke",
|
||||
"zerofrom",
|
||||
"zerovec-derive",
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
26
REVIEW.md
Normal file
26
REVIEW.md
Normal file
@@ -0,0 +1,26 @@
|
||||
# Code review guidelines
|
||||
|
||||
Repo-specific guidance for automated PR reviews.
|
||||
|
||||
## Cross-SDK parity
|
||||
|
||||
LanceDB exposes the same core (`rust/lancedb`) through Python, TypeScript (`nodejs`),
|
||||
and Java bindings. Behavioral drift between SDKs is a recurring problem, so watch for
|
||||
parity gaps when reviewing — but only flag real ones:
|
||||
|
||||
* If the change adds or modifies user-facing API or behavior in the shared core
|
||||
(`rust/lancedb`), check whether each binding that should expose it (`python`,
|
||||
`nodejs`) does. A core change with no corresponding binding update is worth a note.
|
||||
* If the change adds or modifies a public API in one SDK but not the other, open the
|
||||
sibling SDK's corresponding module and state whether an equivalent exists. If not,
|
||||
note it as a possible parity gap and suggest a follow-up issue.
|
||||
* For bug fixes, first read the sibling SDK's analogous code path to check whether the
|
||||
same bug exists there. Only raise parity if it actually does. Do not ask to "port" a
|
||||
fix for a bug that only ever existed in one binding.
|
||||
* Stay silent on internal-only refactors, tests, docs, and changes with no cross-SDK
|
||||
surface.
|
||||
* Parity expectations apply to the Python and TypeScript (`nodejs`) SDKs. Java currently
|
||||
implements only the remote table, not the local/embedded backend, so it is expected to
|
||||
be partial — do not flag Java for missing local-only functionality.
|
||||
* Keep parity feedback to a short, clearly-labeled note (e.g. "Possible SDK parity
|
||||
gap: …"). It is advisory, not a merge blocker.
|
||||
@@ -112,25 +112,25 @@ def fetch_remote_tags() -> List[TagInfo]:
|
||||
"api",
|
||||
"-X",
|
||||
"GET",
|
||||
f"repos/{LANCE_REPO}/git/refs/tags",
|
||||
"--paginate",
|
||||
f"repos/{LANCE_REPO}/releases",
|
||||
"--jq",
|
||||
".[].ref",
|
||||
".[].tag_name",
|
||||
"-F",
|
||||
"per_page=20",
|
||||
]
|
||||
)
|
||||
tags: List[TagInfo] = []
|
||||
for line in output.splitlines():
|
||||
ref = line.strip()
|
||||
if not ref.startswith("refs/tags/v"):
|
||||
tag = line.strip()
|
||||
if not tag.startswith("v"):
|
||||
continue
|
||||
tag = ref.split("refs/tags/")[-1]
|
||||
version = tag.lstrip("v")
|
||||
try:
|
||||
tags.append(TagInfo(tag=tag, version=version, semver=parse_semver(version)))
|
||||
except ValueError:
|
||||
continue
|
||||
if not tags:
|
||||
raise RuntimeError("No Lance tags could be parsed from GitHub API output")
|
||||
raise RuntimeError("No Lance releases could be parsed from GitHub API output")
|
||||
return tags
|
||||
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<version>0.30.0-beta.1</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -76,6 +76,57 @@ the query optimizer chooses a suboptimal path.
|
||||
|
||||
***
|
||||
|
||||
### useLsmWrite()
|
||||
|
||||
```ts
|
||||
useLsmWrite(useLsmWrite): MergeInsertBuilder
|
||||
```
|
||||
|
||||
Controls whether the merge uses the MemWAL LSM write path.
|
||||
|
||||
By default (unset), a `mergeInsert` on a table with an LSM write spec is
|
||||
routed through Lance's MemWAL shard writer, and a table without one uses
|
||||
the standard path. Pass `false` to force the standard path even when a
|
||||
spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
|
||||
is installed.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **useLsmWrite**: `boolean`
|
||||
Whether to use the LSM write path.
|
||||
|
||||
#### Returns
|
||||
|
||||
[`MergeInsertBuilder`](MergeInsertBuilder.md)
|
||||
|
||||
***
|
||||
|
||||
### validateSingleShard()
|
||||
|
||||
```ts
|
||||
validateSingleShard(validateSingleShard): MergeInsertBuilder
|
||||
```
|
||||
|
||||
Controls how an LSM merge checks that its input targets a single shard.
|
||||
|
||||
When a table has an LSM write spec, every row in a `mergeInsert` call must
|
||||
route to the same shard. When `true` (the default), every row is inspected
|
||||
to verify this. When `false`, only the first row is inspected and the
|
||||
shard it routes to is used for the whole input — a faster path for callers
|
||||
that have already pre-sharded their input. Has no effect on tables without
|
||||
an LSM write spec.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **validateSingleShard**: `boolean`
|
||||
Whether to check every row routes to one shard. Defaults to `true`.
|
||||
|
||||
#### Returns
|
||||
|
||||
[`MergeInsertBuilder`](MergeInsertBuilder.md)
|
||||
|
||||
***
|
||||
|
||||
### whenMatchedUpdateAll()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -187,6 +187,25 @@ Any attempt to use the table after it is closed will result in an error.
|
||||
|
||||
***
|
||||
|
||||
### closeLsmWriters()
|
||||
|
||||
```ts
|
||||
abstract closeLsmWriters(): Promise<void>
|
||||
```
|
||||
|
||||
Drain and close any cached MemWAL shard writers held for this table.
|
||||
|
||||
When an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) is installed, `mergeInsert` opens MemWAL
|
||||
shard writers and caches them for reuse across calls. This closes them,
|
||||
flushing pending data; writers reopen lazily on the next `mergeInsert`.
|
||||
It is a no-op when no writers are cached.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### countRows()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -70,16 +70,20 @@ client used by manifest-enabled native connections.
|
||||
optional readConsistencyInterval: number;
|
||||
```
|
||||
|
||||
(For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
updates to the table from other processes. If None, then consistency is not
|
||||
checked. For performance reasons, this is the default. For strong
|
||||
consistency, set this to zero seconds. Then every read will check for
|
||||
updates from other processes. As a compromise, you can set this to a
|
||||
non-zero value for eventual consistency. If more than that interval
|
||||
has passed since the last check, then the table will be checked for updates.
|
||||
Note: this consistency only applies to read operations. Write operations are
|
||||
The interval, in seconds, at which to check for updates to the table
|
||||
from other processes. If None, then consistency is not checked. For
|
||||
performance reasons, this is the default. For strong consistency, set
|
||||
this to zero seconds. Then every read will check for updates from other
|
||||
processes. As a compromise, you can set this to a non-zero value for
|
||||
eventual consistency. If more than that interval has passed since the
|
||||
last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
|
||||
***
|
||||
|
||||
### region?
|
||||
|
||||
@@ -11,7 +11,10 @@ Specification selecting Lance's MemWAL LSM-style write path for
|
||||
|
||||
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
`column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
required.
|
||||
required and must be a deterministic function of the unenforced primary
|
||||
key (every row with a given primary key must always produce the same
|
||||
`column` value, or upserts of that key can land in different shards and a
|
||||
stale version can win).
|
||||
|
||||
## Properties
|
||||
|
||||
|
||||
@@ -32,6 +32,14 @@ numInsertedRows: number;
|
||||
|
||||
***
|
||||
|
||||
### numRows
|
||||
|
||||
```ts
|
||||
numRows: number;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### numUpdatedRows
|
||||
|
||||
```ts
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.29.1-beta.0</version>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.0.0-beta.13</lance-core.version>
|
||||
<lance-core.version>7.2.0-beta.1</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.1"
|
||||
publish = false
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
|
||||
@@ -171,18 +171,22 @@ describe("given a connection", () => {
|
||||
|
||||
let manifestDir =
|
||||
tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions";
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
|
||||
table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], {
|
||||
enableV2ManifestPaths: true,
|
||||
})) as LocalTable;
|
||||
expect(await table.usesV2ManifestPaths()).toBe(true);
|
||||
manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions";
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
});
|
||||
|
||||
it("should be able to migrate tables to the V2 manifest paths", async () => {
|
||||
@@ -199,16 +203,20 @@ describe("given a connection", () => {
|
||||
|
||||
const manifestDir =
|
||||
tmpDir.name + "/test_manifest_path_migration.lance/_versions";
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d\.manifest$/);
|
||||
});
|
||||
|
||||
await table.migrateManifestPathsV2();
|
||||
expect(await table.usesV2ManifestPaths()).toBe(true);
|
||||
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ import {
|
||||
List,
|
||||
Schema,
|
||||
SchemaLike,
|
||||
Struct,
|
||||
Type,
|
||||
Uint8,
|
||||
Utf8,
|
||||
@@ -780,6 +781,113 @@ describe("When creating an index", () => {
|
||||
expect(indices2.length).toBe(0);
|
||||
});
|
||||
|
||||
it("should create and search a nested vector index", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const nestedSchema = new Schema([
|
||||
new Field("id", new Int32(), true),
|
||||
new Field(
|
||||
"image",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
const nestedTable = await db.createTable(
|
||||
"nested_vector",
|
||||
makeArrowTable(
|
||||
Array.from({ length: 300 }, (_, id) => ({
|
||||
id,
|
||||
image: { embedding: [id, id + 1] },
|
||||
})),
|
||||
{ schema: nestedSchema },
|
||||
),
|
||||
);
|
||||
|
||||
await nestedTable.createIndex("image.embedding", {
|
||||
name: "image_embedding_idx",
|
||||
});
|
||||
const indices = await nestedTable.listIndices();
|
||||
expect(indices).toContainEqual({
|
||||
name: "image_embedding_idx",
|
||||
indexType: "IvfPq",
|
||||
columns: ["image.embedding"],
|
||||
});
|
||||
|
||||
const explicit = await nestedTable
|
||||
.query()
|
||||
.nearestTo([0.0, 1.0])
|
||||
.column("image.embedding")
|
||||
.limit(1)
|
||||
.toArray();
|
||||
const inferred = await nestedTable
|
||||
.query()
|
||||
.nearestTo([0.0, 1.0])
|
||||
.limit(1)
|
||||
.toArray();
|
||||
expect(inferred[0].id).toEqual(explicit[0].id);
|
||||
});
|
||||
|
||||
it("should report multiple nested vector candidates", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const nestedSchema = new Schema([
|
||||
new Field(
|
||||
"image",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
new Field(
|
||||
"text",
|
||||
new Struct([
|
||||
new Field(
|
||||
"embedding",
|
||||
new FixedSizeList(2, new Field("item", new Float32(), true)),
|
||||
true,
|
||||
),
|
||||
]),
|
||||
true,
|
||||
),
|
||||
]);
|
||||
const nestedTable = await db.createTable(
|
||||
"multiple_nested_vectors",
|
||||
makeArrowTable(
|
||||
[
|
||||
{
|
||||
image: { embedding: [0.0, 1.0] },
|
||||
text: { embedding: [2.0, 3.0] },
|
||||
},
|
||||
],
|
||||
{ schema: nestedSchema },
|
||||
),
|
||||
);
|
||||
|
||||
await expect(
|
||||
nestedTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
|
||||
).rejects.toThrow(/image\.embedding.*text\.embedding/);
|
||||
});
|
||||
|
||||
it("should report when no default vector column exists", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
const noVectorTable = await db.createTable(
|
||||
"no_vector",
|
||||
makeArrowTable([{ id: 0, label: "cat" }]),
|
||||
);
|
||||
|
||||
await expect(
|
||||
noVectorTable.query().nearestTo([0.0, 1.0]).limit(1).toArray(),
|
||||
).rejects.toThrow(/No vector column/);
|
||||
});
|
||||
|
||||
it("should wait for index readiness", async () => {
|
||||
// Create an index and then wait for it to be ready
|
||||
await tbl.createIndex("vec");
|
||||
@@ -2517,3 +2625,97 @@ describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
|
||||
).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
describe("LSM merge insert", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => tmpDir.removeCallback());
|
||||
|
||||
async function bucketTable(conn: Connection): Promise<Table> {
|
||||
// The primary key column must be non-nullable.
|
||||
const table = await conn.createEmptyTable(
|
||||
"t",
|
||||
new arrow.Schema([
|
||||
new arrow.Field("id", new arrow.Utf8(), false),
|
||||
new arrow.Field("value", new arrow.Float64(), true),
|
||||
]),
|
||||
);
|
||||
await table.add([
|
||||
{ id: "a", value: 1 },
|
||||
{ id: "b", value: 2 },
|
||||
]);
|
||||
await table.setUnenforcedPrimaryKey("id");
|
||||
// numBuckets = 1: every row routes to the single bucket.
|
||||
await table.setLsmWriteSpec({
|
||||
specType: "bucket",
|
||||
column: "id",
|
||||
numBuckets: 1,
|
||||
});
|
||||
return table;
|
||||
}
|
||||
|
||||
it("routes merge_insert through the shard writer", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await bucketTable(conn);
|
||||
|
||||
const res = await table
|
||||
.mergeInsert("id")
|
||||
.whenMatchedUpdateAll()
|
||||
.whenNotMatchedInsertAll()
|
||||
.execute([
|
||||
{ id: "c", value: 3 },
|
||||
{ id: "d", value: 4 },
|
||||
]);
|
||||
// LSM path: rows go to the MemWAL, so only numRows is populated.
|
||||
expect(res.numRows).toBe(2);
|
||||
expect(res.version).toBe(0);
|
||||
expect(res.numInsertedRows).toBe(0);
|
||||
|
||||
await table.closeLsmWriters();
|
||||
});
|
||||
|
||||
it("falls back to the standard path with useLsmWrite(false)", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await bucketTable(conn);
|
||||
|
||||
const res = await table
|
||||
.mergeInsert("id")
|
||||
.whenNotMatchedInsertAll()
|
||||
.useLsmWrite(false)
|
||||
.execute([
|
||||
{ id: "b", value: 9 },
|
||||
{ id: "e", value: 5 },
|
||||
]);
|
||||
// Standard path commits: id="e" inserted ("b" already exists).
|
||||
expect(res.numInsertedRows).toBe(1);
|
||||
expect(await table.countRows()).toBe(3);
|
||||
});
|
||||
|
||||
it("supports validateSingleShard(false)", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await bucketTable(conn);
|
||||
|
||||
const res = await table
|
||||
.mergeInsert("id")
|
||||
.whenMatchedUpdateAll()
|
||||
.whenNotMatchedInsertAll()
|
||||
.validateSingleShard(false)
|
||||
.execute([{ id: "f", value: 6 }]);
|
||||
expect(res.numRows).toBe(1);
|
||||
});
|
||||
|
||||
it("rejects a non-upsert merge under an LSM spec", async () => {
|
||||
const conn = await connect(tmpDir.name);
|
||||
const table = await bucketTable(conn);
|
||||
|
||||
await expect(
|
||||
table
|
||||
.mergeInsert("id")
|
||||
.whenNotMatchedInsertAll()
|
||||
.execute([{ id: "g", value: 7 }]),
|
||||
).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -87,6 +87,41 @@ export class MergeInsertBuilder {
|
||||
this.#schema,
|
||||
);
|
||||
}
|
||||
/**
|
||||
* Controls whether the merge uses the MemWAL LSM write path.
|
||||
*
|
||||
* By default (unset), a `mergeInsert` on a table with an LSM write spec is
|
||||
* routed through Lance's MemWAL shard writer, and a table without one uses
|
||||
* the standard path. Pass `false` to force the standard path even when a
|
||||
* spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
|
||||
* is installed.
|
||||
*
|
||||
* @param useLsmWrite - Whether to use the LSM write path.
|
||||
*/
|
||||
useLsmWrite(useLsmWrite: boolean): MergeInsertBuilder {
|
||||
return new MergeInsertBuilder(
|
||||
this.#native.useLsmWrite(useLsmWrite),
|
||||
this.#schema,
|
||||
);
|
||||
}
|
||||
/**
|
||||
* Controls how an LSM merge checks that its input targets a single shard.
|
||||
*
|
||||
* When a table has an LSM write spec, every row in a `mergeInsert` call must
|
||||
* route to the same shard. When `true` (the default), every row is inspected
|
||||
* to verify this. When `false`, only the first row is inspected and the
|
||||
* shard it routes to is used for the whole input — a faster path for callers
|
||||
* that have already pre-sharded their input. Has no effect on tables without
|
||||
* an LSM write spec.
|
||||
*
|
||||
* @param validateSingleShard - Whether to check every row routes to one shard. Defaults to `true`.
|
||||
*/
|
||||
validateSingleShard(validateSingleShard: boolean): MergeInsertBuilder {
|
||||
return new MergeInsertBuilder(
|
||||
this.#native.validateSingleShard(validateSingleShard),
|
||||
this.#schema,
|
||||
);
|
||||
}
|
||||
/**
|
||||
* Executes the merge insert operation
|
||||
*
|
||||
|
||||
@@ -161,7 +161,10 @@ export interface Version {
|
||||
*
|
||||
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
|
||||
* `column` and `numBuckets` are required; for `"identity"`, `column` is
|
||||
* required.
|
||||
* required and must be a deterministic function of the unenforced primary
|
||||
* key (every row with a given primary key must always produce the same
|
||||
* `column` value, or upserts of that key can land in different shards and a
|
||||
* stale version can win).
|
||||
*/
|
||||
export interface LsmWriteSpec {
|
||||
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
|
||||
@@ -567,6 +570,16 @@ export abstract class Table {
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract unsetLsmWriteSpec(): Promise<void>;
|
||||
/**
|
||||
* Drain and close any cached MemWAL shard writers held for this table.
|
||||
*
|
||||
* When an {@link LsmWriteSpec} is installed, `mergeInsert` opens MemWAL
|
||||
* shard writers and caches them for reuse across calls. This closes them,
|
||||
* flushing pending data; writers reopen lazily on the next `mergeInsert`.
|
||||
* It is a no-op when no writers are cached.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
abstract closeLsmWriters(): Promise<void>;
|
||||
/** Retrieve the version of the table */
|
||||
|
||||
abstract version(): Promise<number>;
|
||||
@@ -1041,6 +1054,10 @@ export class LocalTable extends Table {
|
||||
return await this.inner.unsetLsmWriteSpec();
|
||||
}
|
||||
|
||||
async closeLsmWriters(): Promise<void> {
|
||||
return await this.inner.closeLsmWriters();
|
||||
}
|
||||
|
||||
async version(): Promise<number> {
|
||||
return await this.inner.version();
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.29.1-beta.0",
|
||||
"version": "0.30.0-beta.1",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -24,15 +24,19 @@ mod util;
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
/// updates to the table from other processes. If None, then consistency is not
|
||||
/// checked. For performance reasons, this is the default. For strong
|
||||
/// consistency, set this to zero seconds. Then every read will check for
|
||||
/// updates from other processes. As a compromise, you can set this to a
|
||||
/// non-zero value for eventual consistency. If more than that interval
|
||||
/// has passed since the last check, then the table will be checked for updates.
|
||||
/// Note: this consistency only applies to read operations. Write operations are
|
||||
/// The interval, in seconds, at which to check for updates to the table
|
||||
/// from other processes. If None, then consistency is not checked. For
|
||||
/// performance reasons, this is the default. For strong consistency, set
|
||||
/// this to zero seconds. Then every read will check for updates from other
|
||||
/// processes. As a compromise, you can set this to a non-zero value for
|
||||
/// eventual consistency. If more than that interval has passed since the
|
||||
/// last check, then the table will be checked for updates. Note: this
|
||||
/// consistency only applies to read operations. Write operations are
|
||||
/// always consistent.
|
||||
///
|
||||
/// Stronger consistency is not free. The smaller the interval, the more
|
||||
/// often each read pays the cost of checking for updates against object
|
||||
/// storage, raising per-read latency and cost.
|
||||
pub read_consistency_interval: Option<f64>,
|
||||
/// (For LanceDB OSS only): configuration for object storage.
|
||||
///
|
||||
|
||||
@@ -50,6 +50,20 @@ impl NativeMergeInsertBuilder {
|
||||
this
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn use_lsm_write(&self, use_lsm_write: bool) -> Self {
|
||||
let mut this = self.clone();
|
||||
this.inner.use_lsm_write(use_lsm_write);
|
||||
this
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub fn validate_single_shard(&self, validate_single_shard: bool) -> Self {
|
||||
let mut this = self.clone();
|
||||
this.inner.validate_single_shard(validate_single_shard);
|
||||
this
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
|
||||
let data = ipc_file_to_batches(buf.to_vec())
|
||||
|
||||
@@ -391,6 +391,11 @@ impl Table {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn close_lsm_writers(&self) -> napi::Result<()> {
|
||||
self.inner_ref()?.close_lsm_writers().await.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn version(&self) -> napi::Result<i64> {
|
||||
self.inner_ref()?
|
||||
@@ -940,6 +945,7 @@ pub struct MergeResult {
|
||||
pub num_updated_rows: i64,
|
||||
pub num_deleted_rows: i64,
|
||||
pub num_attempts: i64,
|
||||
pub num_rows: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
@@ -950,6 +956,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
num_updated_rows: value.num_updated_rows as i64,
|
||||
num_deleted_rows: value.num_deleted_rows as i64,
|
||||
num_attempts: value.num_attempts as i64,
|
||||
num_rows: value.num_rows as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.32.1-beta.0"
|
||||
current_version = "0.33.0-beta.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.32.1-beta.0"
|
||||
version = "0.33.0-beta.1"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
|
||||
@@ -94,7 +94,6 @@ def connect(
|
||||
host_override: str, optional
|
||||
The override url for LanceDB Cloud.
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
@@ -104,6 +103,10 @@ def connect(
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
client_config: ClientConfig or dict, optional
|
||||
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
||||
the keys are the attributes of the ClientConfig class. If None, then the
|
||||
@@ -147,6 +150,13 @@ def connect(
|
||||
>>> db = lancedb.connect("s3://my-bucket/lancedb",
|
||||
... storage_options={"aws_access_key_id": "***"})
|
||||
|
||||
For tests and temporary data, use an in-memory database:
|
||||
|
||||
>>> db = lancedb.connect("memory://")
|
||||
|
||||
In-memory databases are not persisted. Tables are dropped when the last
|
||||
connection or table handle referencing them is closed.
|
||||
|
||||
Connect to LanceDB cloud:
|
||||
|
||||
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
|
||||
@@ -210,6 +220,7 @@ def connect(
|
||||
request_thread_pool=request_thread_pool,
|
||||
client_config=client_config,
|
||||
storage_options=storage_options,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
**kwargs,
|
||||
)
|
||||
_check_s3_bucket_with_dots(str(uri), storage_options)
|
||||
@@ -336,7 +347,6 @@ async def connect_async(
|
||||
host_override: str, optional
|
||||
The override url for LanceDB Cloud.
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
@@ -346,6 +356,10 @@ async def connect_async(
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
client_config: ClientConfig or dict, optional
|
||||
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
||||
the keys are the attributes of the ClientConfig class. If None, then the
|
||||
@@ -378,6 +392,8 @@ async def connect_async(
|
||||
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
|
||||
... storage_options={
|
||||
... "aws_access_key_id": "***"})
|
||||
... # For tests and temporary data, use an in-memory database
|
||||
... db = await lancedb.connect_async("memory://")
|
||||
... # Connect to LanceDB cloud
|
||||
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
|
||||
... client_config={
|
||||
|
||||
@@ -220,6 +220,7 @@ class Table:
|
||||
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
|
||||
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
|
||||
async def unset_lsm_write_spec(self) -> None: ...
|
||||
async def close_lsm_writers(self) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
@@ -420,6 +421,7 @@ class MergeResult:
|
||||
num_inserted_rows: int
|
||||
num_deleted_rows: int
|
||||
num_attempts: int
|
||||
num_rows: int
|
||||
|
||||
class LsmWriteSpec:
|
||||
"""Specification selecting Lance's MemWAL LSM-style write path for
|
||||
|
||||
@@ -8,7 +8,17 @@ from abc import abstractmethod
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Literal, Optional, Union
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
Generator,
|
||||
Iterable,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Union,
|
||||
)
|
||||
|
||||
if sys.version_info >= (3, 12):
|
||||
from typing import override
|
||||
@@ -313,7 +323,7 @@ class DBConnection(EnforceOverrides):
|
||||
>>> data = [{"vector": [1.1, 1.2], "lat": 45.5, "long": -122.7},
|
||||
... {"vector": [0.2, 1.8], "lat": 40.1, "long": -74.1}]
|
||||
>>> db.create_table("my_table", data)
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> db["my_table"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -334,7 +344,7 @@ class DBConnection(EnforceOverrides):
|
||||
... "long": [-122.7, -74.1]
|
||||
... })
|
||||
>>> db.create_table("table2", data)
|
||||
LanceTable(name='table2', version=1, ...)
|
||||
LanceTable(name='table2', ...)
|
||||
>>> db["table2"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -357,7 +367,7 @@ class DBConnection(EnforceOverrides):
|
||||
... pa.field("long", pa.float32())
|
||||
... ])
|
||||
>>> db.create_table("table3", data, schema = custom_schema)
|
||||
LanceTable(name='table3', version=1, ...)
|
||||
LanceTable(name='table3', ...)
|
||||
>>> db["table3"].head()
|
||||
pyarrow.Table
|
||||
vector: fixed_size_list<item: float>[2]
|
||||
@@ -391,7 +401,7 @@ class DBConnection(EnforceOverrides):
|
||||
... pa.field("price", pa.float32()),
|
||||
... ])
|
||||
>>> db.create_table("table4", make_batches(), schema=schema)
|
||||
LanceTable(name='table4', version=1, ...)
|
||||
LanceTable(name='table4', ...)
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
@@ -568,15 +578,15 @@ class LanceDBConnection(DBConnection):
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> db.create_table("my_table", data=[{"vector": [1.1, 1.2], "b": 2},
|
||||
... {"vector": [0.5, 1.3], "b": 4}])
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> db.create_table("another_table", data=[{"vector": [0.4, 0.4], "b": 6}])
|
||||
LanceTable(name='another_table', version=1, ...)
|
||||
LanceTable(name='another_table', ...)
|
||||
>>> sorted(db.table_names())
|
||||
['another_table', 'my_table']
|
||||
>>> len(db)
|
||||
2
|
||||
>>> db["my_table"]
|
||||
LanceTable(name='my_table', version=1, ...)
|
||||
LanceTable(name='my_table', ...)
|
||||
>>> "my_table" in db
|
||||
True
|
||||
>>> db.drop_table("my_table")
|
||||
@@ -847,11 +857,20 @@ class LanceDBConnection(DBConnection):
|
||||
)
|
||||
)
|
||||
|
||||
def _all_table_names(self) -> Generator[str, None, None]:
|
||||
page_token = None
|
||||
while True:
|
||||
response = self.list_tables(page_token=page_token)
|
||||
yield from response.tables
|
||||
page_token = response.page_token
|
||||
if not page_token:
|
||||
return
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.table_names())
|
||||
return sum(1 for _ in self._all_table_names())
|
||||
|
||||
def __contains__(self, name: str) -> bool:
|
||||
return name in self.table_names()
|
||||
return name in self._all_table_names()
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
|
||||
@@ -281,6 +281,9 @@ class HnswPq:
|
||||
m: int = 20
|
||||
ef_construction: int = 300
|
||||
target_partition_size: Optional[int] = None
|
||||
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
|
||||
# create_index() dispatches to pylance to build the index on the accelerator.
|
||||
accelerator: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -386,6 +389,9 @@ class HnswSq:
|
||||
m: int = 20
|
||||
ef_construction: int = 300
|
||||
target_partition_size: Optional[int] = None
|
||||
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
|
||||
# create_index() dispatches to pylance to build the index on the accelerator.
|
||||
accelerator: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -579,6 +585,9 @@ class IvfFlat:
|
||||
max_iterations: int = 50
|
||||
sample_rate: int = 256
|
||||
target_partition_size: Optional[int] = None
|
||||
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
|
||||
# create_index() dispatches to pylance to build the index on the accelerator.
|
||||
accelerator: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -609,6 +618,9 @@ class IvfSq:
|
||||
max_iterations: int = 50
|
||||
sample_rate: int = 256
|
||||
target_partition_size: Optional[int] = None
|
||||
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
|
||||
# create_index() dispatches to pylance to build the index on the accelerator.
|
||||
accelerator: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -739,6 +751,9 @@ class IvfPq:
|
||||
max_iterations: int = 50
|
||||
sample_rate: int = 256
|
||||
target_partition_size: Optional[int] = None
|
||||
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
|
||||
# create_index() dispatches to pylance to build the index on the accelerator.
|
||||
accelerator: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -792,6 +807,9 @@ class IvfRq:
|
||||
max_iterations: int = 50
|
||||
sample_rate: int = 256
|
||||
target_partition_size: Optional[int] = None
|
||||
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
|
||||
# create_index() dispatches to pylance to build the index on the accelerator.
|
||||
accelerator: Optional[str] = None
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
||||
@@ -34,6 +34,8 @@ class LanceMergeInsertBuilder(object):
|
||||
self._when_not_matched_by_source_condition = None
|
||||
self._timeout = None
|
||||
self._use_index = True
|
||||
self._use_lsm_write = None
|
||||
self._validate_single_shard = None
|
||||
|
||||
def when_matched_update_all(
|
||||
self, *, where: Optional[str] = None
|
||||
@@ -96,6 +98,46 @@ class LanceMergeInsertBuilder(object):
|
||||
self._use_index = use_index
|
||||
return self
|
||||
|
||||
def use_lsm_write(self, use_lsm_write: bool) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Controls whether the merge uses the MemWAL LSM write path.
|
||||
|
||||
By default (unset), a `merge_insert` on a table with an LSM write spec
|
||||
is routed through Lance's MemWAL shard writer, and a table without one
|
||||
uses the standard path. Pass `False` to force the standard path even
|
||||
when a spec is set. Pass `True` to require a spec — `merge_insert`
|
||||
raises an error if none is installed.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
use_lsm_write: bool
|
||||
Whether to use the LSM write path.
|
||||
"""
|
||||
self._use_lsm_write = use_lsm_write
|
||||
return self
|
||||
|
||||
def validate_single_shard(
|
||||
self, validate_single_shard: bool
|
||||
) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Controls how an LSM merge checks that its input targets a single shard.
|
||||
|
||||
When a table has an LSM write spec, every row in a `merge_insert` call
|
||||
must route to the same shard. When `True` (the default), every row is
|
||||
inspected to verify this. When `False`, only the first row is inspected
|
||||
and the shard it routes to is used for the whole input — a faster path
|
||||
for callers that have already pre-sharded their input.
|
||||
|
||||
Has no effect on tables without an LSM write spec.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
validate_single_shard: bool
|
||||
Whether to check every row routes to one shard. Defaults to `True`.
|
||||
"""
|
||||
self._validate_single_shard = validate_single_shard
|
||||
return self
|
||||
|
||||
def execute(
|
||||
self,
|
||||
new_data: DATA,
|
||||
|
||||
@@ -3,12 +3,14 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from abc import ABC, abstractmethod
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from enum import Enum
|
||||
from datetime import timedelta
|
||||
from enum import Enum
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
@@ -17,41 +19,40 @@ from typing import (
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
Any,
|
||||
)
|
||||
|
||||
import asyncio
|
||||
import deprecation
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
import pyarrow.compute as pc
|
||||
import pydantic
|
||||
from typing_extensions import Annotated
|
||||
|
||||
from lancedb.pydantic import PYDANTIC_VERSION
|
||||
from lancedb._lancedb import fts_query_to_json
|
||||
from lancedb.background_loop import LOOP
|
||||
from lancedb.pydantic import PYDANTIC_VERSION
|
||||
|
||||
from . import __version__
|
||||
from .arrow import AsyncRecordBatchReader
|
||||
from .dependencies import pandas as pd
|
||||
from .expr import Expr
|
||||
from .rerankers.base import Reranker
|
||||
from .rerankers.rrf import RRFReranker
|
||||
from .rerankers.util import check_reranker_result
|
||||
from .util import flatten_columns
|
||||
from .expr import Expr
|
||||
from lancedb._lancedb import fts_query_to_json
|
||||
from typing_extensions import Annotated
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import sys
|
||||
|
||||
import PIL
|
||||
import polars as pl
|
||||
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import FTSQuery as LanceFTSQuery
|
||||
from ._lancedb import HybridQuery as LanceHybridQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from ._lancedb import TakeQuery as LanceTakeQuery
|
||||
from ._lancedb import PyQueryRequest
|
||||
from ._lancedb import Query as LanceQuery
|
||||
from ._lancedb import TakeQuery as LanceTakeQuery
|
||||
from ._lancedb import VectorQuery as LanceVectorQuery
|
||||
from .common import VEC
|
||||
from .pydantic import LanceModel
|
||||
from .table import Table
|
||||
@@ -3348,16 +3349,18 @@ class BaseQueryBuilder(object):
|
||||
If not specified, no timeout is applied. If the query does not
|
||||
complete within the specified time, an error will be raised.
|
||||
"""
|
||||
async_iter = LOOP.run(self._inner.execute(max_batch_length, timeout))
|
||||
async_reader = LOOP.run(
|
||||
self._inner.to_batches(max_batch_length=max_batch_length, timeout=timeout)
|
||||
)
|
||||
|
||||
def iter_sync():
|
||||
try:
|
||||
while True:
|
||||
yield LOOP.run(async_iter.__anext__())
|
||||
yield LOOP.run(async_reader.__anext__())
|
||||
except StopAsyncIteration:
|
||||
return
|
||||
|
||||
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
|
||||
return pa.RecordBatchReader.from_batches(async_reader.schema, iter_sync())
|
||||
|
||||
def to_arrow(self, timeout: Optional[timedelta] = None) -> pa.Table:
|
||||
"""
|
||||
|
||||
@@ -50,6 +50,7 @@ class RemoteDBConnection(DBConnection):
|
||||
connection_timeout: Optional[float] = None,
|
||||
read_timeout: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
):
|
||||
"""Connect to a remote LanceDB database."""
|
||||
if isinstance(client_config, dict):
|
||||
@@ -103,6 +104,7 @@ class RemoteDBConnection(DBConnection):
|
||||
host_override=host_override,
|
||||
client_config=client_config,
|
||||
storage_options=storage_options,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -2,11 +2,24 @@
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
from datetime import timedelta
|
||||
import deprecation
|
||||
import logging
|
||||
from functools import cached_property
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, Literal
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Union,
|
||||
Literal,
|
||||
overload,
|
||||
)
|
||||
import warnings
|
||||
|
||||
from lancedb import __version__
|
||||
|
||||
from lancedb._lancedb import (
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
@@ -32,6 +45,7 @@ from lancedb.index import (
|
||||
LabelList,
|
||||
)
|
||||
from lancedb.remote.db import LOOP
|
||||
from lancedb.table import IndexConfigType, KNOWN_METRICS
|
||||
import pyarrow as pa
|
||||
|
||||
from lancedb.common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||
@@ -122,6 +136,11 @@ class RemoteTable(Table):
|
||||
"""List all the stats of a specified index"""
|
||||
return LOOP.run(self._table.index_stats(index_uuid))
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.25.0",
|
||||
current_version=__version__,
|
||||
details="Use create_index() with config=BTree()/Bitmap()/LabelList() instead.",
|
||||
)
|
||||
def create_scalar_index(
|
||||
self,
|
||||
column: str,
|
||||
@@ -131,7 +150,12 @@ class RemoteTable(Table):
|
||||
wait_timeout: Optional[timedelta] = None,
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
"""Creates a scalar index
|
||||
"""Creates a scalar index.
|
||||
|
||||
.. deprecated:: 0.25.0
|
||||
Use :meth:`create_index` with a BTree, Bitmap, or LabelList config instead.
|
||||
Example: ``table.create_index("column", config=BTree())``
|
||||
|
||||
Parameters
|
||||
----------
|
||||
column : str
|
||||
@@ -162,6 +186,11 @@ class RemoteTable(Table):
|
||||
)
|
||||
)
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.25.0",
|
||||
current_version=__version__,
|
||||
details="Use create_index() with config=FTS() instead.",
|
||||
)
|
||||
def create_fts_index(
|
||||
self,
|
||||
column: str,
|
||||
@@ -182,6 +211,12 @@ class RemoteTable(Table):
|
||||
prefix_only: bool = False,
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
"""Create a full-text search index on a column.
|
||||
|
||||
.. deprecated:: 0.25.0
|
||||
Use :meth:`create_index` with an FTS config instead.
|
||||
Example: ``table.create_index("text_column", config=FTS())``
|
||||
"""
|
||||
config = FTS(
|
||||
with_position=with_position,
|
||||
base_tokenizer=base_tokenizer,
|
||||
@@ -205,9 +240,43 @@ class RemoteTable(Table):
|
||||
)
|
||||
)
|
||||
|
||||
# New unified API overload
|
||||
@overload
|
||||
def create_index(
|
||||
self,
|
||||
metric="l2",
|
||||
column: str,
|
||||
/,
|
||||
*,
|
||||
config: IndexConfigType,
|
||||
wait_timeout: Optional[timedelta] = ...,
|
||||
name: Optional[str] = ...,
|
||||
train: bool = ...,
|
||||
) -> None: ...
|
||||
|
||||
# Legacy API overload (deprecated)
|
||||
@overload
|
||||
def create_index(
|
||||
self,
|
||||
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
|
||||
vector_column_name: str = ...,
|
||||
index_cache_size: Optional[int] = ...,
|
||||
num_partitions: Optional[int] = ...,
|
||||
num_sub_vectors: Optional[int] = ...,
|
||||
replace: Optional[bool] = ...,
|
||||
accelerator: Optional[str] = ...,
|
||||
index_type: Literal[
|
||||
"VECTOR", "IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
|
||||
] = ...,
|
||||
wait_timeout: Optional[timedelta] = ...,
|
||||
*,
|
||||
num_bits: int = ...,
|
||||
name: Optional[str] = ...,
|
||||
train: bool = ...,
|
||||
) -> None: ...
|
||||
|
||||
def create_index(
|
||||
self,
|
||||
metric: str = "l2",
|
||||
vector_column_name: str = VECTOR_COLUMN_NAME,
|
||||
index_cache_size: Optional[int] = None,
|
||||
num_partitions: Optional[int] = None,
|
||||
@@ -218,89 +287,113 @@ class RemoteTable(Table):
|
||||
wait_timeout: Optional[timedelta] = None,
|
||||
*,
|
||||
num_bits: int = 8,
|
||||
config: Optional[IndexConfigType] = None,
|
||||
name: Optional[str] = None,
|
||||
train: bool = True,
|
||||
):
|
||||
"""Create an index on the table.
|
||||
"""Create an index on a column.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
metric : str
|
||||
The metric to use for the index. Default is "l2".
|
||||
vector_column_name : str
|
||||
The name of the vector column. Default is "vector".
|
||||
This method supports both the new unified API and the legacy API
|
||||
for backwards compatibility. The new API takes the column name as the
|
||||
first positional argument and an index configuration object via
|
||||
``config``; the legacy API takes the distance metric as the first
|
||||
argument plus separate ``vector_column_name`` / ``num_partitions`` /
|
||||
etc. parameters, and emits a ``DeprecationWarning``.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> import uuid
|
||||
>>> from lancedb.schema import vector
|
||||
>>> db = lancedb.connect("db://...", api_key="...", # doctest: +SKIP
|
||||
... region="...") # doctest: +SKIP
|
||||
>>> table_name = uuid.uuid4().hex
|
||||
>>> schema = pa.schema(
|
||||
... [
|
||||
... pa.field("id", pa.uint32(), False),
|
||||
... pa.field("vector", vector(128), False),
|
||||
... pa.field("s", pa.string(), False),
|
||||
... ]
|
||||
New API (recommended):
|
||||
|
||||
>>> table.create_index( # doctest: +SKIP
|
||||
... "vector", config=IvfPq(distance_type="l2")
|
||||
... )
|
||||
>>> table = db.create_table( # doctest: +SKIP
|
||||
... table_name, # doctest: +SKIP
|
||||
... schema=schema, # doctest: +SKIP
|
||||
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
|
||||
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
|
||||
|
||||
Legacy API (deprecated):
|
||||
|
||||
>>> table.create_index( # doctest: +SKIP
|
||||
... "l2", vector_column_name="vector"
|
||||
... )
|
||||
>>> table.create_index("l2", "vector") # doctest: +SKIP
|
||||
"""
|
||||
# Detect whether this is a legacy API call
|
||||
is_legacy = self._is_legacy_create_index_call(
|
||||
metric,
|
||||
config,
|
||||
num_partitions,
|
||||
num_sub_vectors,
|
||||
vector_column_name,
|
||||
accelerator,
|
||||
index_cache_size,
|
||||
replace,
|
||||
)
|
||||
|
||||
if accelerator is not None:
|
||||
logging.warning(
|
||||
"GPU accelerator is not yet supported on LanceDB cloud."
|
||||
"If you have 100M+ vectors to index,"
|
||||
"please contact us at contact@lancedb.com"
|
||||
)
|
||||
if replace is not None:
|
||||
logging.warning(
|
||||
"replace is not supported on LanceDB cloud."
|
||||
"Existing indexes will always be replaced."
|
||||
if is_legacy:
|
||||
warnings.warn(
|
||||
"The create_index() API with metric/num_partitions parameters is "
|
||||
"deprecated and will be removed in a future version. "
|
||||
"Please migrate to the new unified API:\n"
|
||||
" # Old (deprecated):\n"
|
||||
" table.create_index('l2', vector_column_name='my_vector')\n"
|
||||
" # New (recommended):\n"
|
||||
" table.create_index('my_vector', config=IvfPq(distance_type='l2'))",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
index_type = index_type.upper()
|
||||
if index_type == "VECTOR" or index_type == "IVF_PQ":
|
||||
config = IvfPq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_sub_vectors=num_sub_vectors,
|
||||
num_bits=num_bits,
|
||||
)
|
||||
elif index_type == "IVF_RQ":
|
||||
config = IvfRq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_bits=num_bits,
|
||||
)
|
||||
elif index_type == "IVF_SQ":
|
||||
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
|
||||
elif index_type == "IVF_HNSW_PQ":
|
||||
raise ValueError(
|
||||
"IVF_HNSW_PQ is not supported on LanceDB cloud."
|
||||
"Please use IVF_HNSW_SQ instead."
|
||||
)
|
||||
elif index_type == "IVF_HNSW_SQ":
|
||||
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
|
||||
elif index_type == "IVF_HNSW_FLAT":
|
||||
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
|
||||
elif index_type == "IVF_FLAT":
|
||||
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
|
||||
column = vector_column_name
|
||||
|
||||
if accelerator is not None:
|
||||
logging.warning(
|
||||
"GPU accelerator is not yet supported on LanceDB cloud."
|
||||
"If you have 100M+ vectors to index,"
|
||||
"please contact us at contact@lancedb.com"
|
||||
)
|
||||
if replace is not None:
|
||||
logging.warning(
|
||||
"replace is not supported on LanceDB cloud."
|
||||
"Existing indexes will always be replaced."
|
||||
)
|
||||
|
||||
idx_type = index_type.upper()
|
||||
if idx_type == "VECTOR" or idx_type == "IVF_PQ":
|
||||
config = IvfPq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_sub_vectors=num_sub_vectors,
|
||||
num_bits=num_bits,
|
||||
)
|
||||
elif idx_type == "IVF_RQ":
|
||||
config = IvfRq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_bits=num_bits,
|
||||
)
|
||||
elif idx_type == "IVF_SQ":
|
||||
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
|
||||
elif idx_type == "IVF_HNSW_PQ":
|
||||
raise ValueError(
|
||||
"IVF_HNSW_PQ is not supported on LanceDB cloud."
|
||||
"Please use IVF_HNSW_SQ instead."
|
||||
)
|
||||
elif idx_type == "IVF_HNSW_SQ":
|
||||
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
|
||||
elif idx_type == "IVF_HNSW_FLAT":
|
||||
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
|
||||
elif idx_type == "IVF_FLAT":
|
||||
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unknown vector index type: {idx_type}. Valid options are"
|
||||
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
|
||||
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unknown vector index type: {index_type}. Valid options are"
|
||||
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
|
||||
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
|
||||
)
|
||||
column = metric
|
||||
|
||||
LOOP.run(
|
||||
self._table.create_index(
|
||||
vector_column_name,
|
||||
column,
|
||||
config=config,
|
||||
wait_timeout=wait_timeout,
|
||||
name=name,
|
||||
@@ -308,6 +401,37 @@ class RemoteTable(Table):
|
||||
)
|
||||
)
|
||||
|
||||
def _is_legacy_create_index_call(
|
||||
self,
|
||||
first_arg: str,
|
||||
config: Optional[IndexConfigType],
|
||||
num_partitions: Optional[int],
|
||||
num_sub_vectors: Optional[int],
|
||||
vector_column_name: str,
|
||||
accelerator: Optional[str],
|
||||
index_cache_size: Optional[int],
|
||||
replace: Optional[bool],
|
||||
) -> bool:
|
||||
"""Detect if this is a legacy create_index call."""
|
||||
if config is not None:
|
||||
return False
|
||||
if any(
|
||||
x is not None
|
||||
for x in (
|
||||
num_partitions,
|
||||
num_sub_vectors,
|
||||
accelerator,
|
||||
index_cache_size,
|
||||
replace,
|
||||
)
|
||||
):
|
||||
return True
|
||||
if vector_column_name != VECTOR_COLUMN_NAME:
|
||||
return True
|
||||
if first_arg.lower() in KNOWN_METRICS:
|
||||
return True
|
||||
return False
|
||||
|
||||
def add(
|
||||
self,
|
||||
data: DATA,
|
||||
@@ -668,6 +792,10 @@ class RemoteTable(Table):
|
||||
"""Not supported on LanceDB Cloud."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def close_lsm_writers(self) -> None:
|
||||
"""No-op on LanceDB Cloud (no local shard writers)."""
|
||||
return LOOP.run(self._table.close_lsm_writers())
|
||||
|
||||
def drop_index(self, index_name: str):
|
||||
return LOOP.run(self._table.drop_index(index_name))
|
||||
|
||||
|
||||
@@ -102,8 +102,15 @@ class LinearCombinationReranker(Reranker):
|
||||
|
||||
combined_list = []
|
||||
for row_id, result in results.items():
|
||||
# Convert vector distance to a relevance score in [0, 1] where
|
||||
# higher is better. Missing vector entries are penalised with
|
||||
# `_invert_score(fill)` = 1 - fill (= 0.0 for the default fill=1).
|
||||
vector_score = self._invert_score(result.get("_distance", fill))
|
||||
fts_score = result.get("_score", fill)
|
||||
# FTS scores (BM25) are already in a "higher = more relevant" space.
|
||||
# Missing FTS entries are penalised symmetrically: we use
|
||||
# `1 - fill` so that the same `fill` value drives both missing-vector
|
||||
# and missing-FTS penalties in the same direction.
|
||||
fts_score = result.get("_score", 1 - fill)
|
||||
result["_relevance_score"] = self._combine_score(vector_score, fts_score)
|
||||
combined_list.append(result)
|
||||
|
||||
@@ -123,8 +130,12 @@ class LinearCombinationReranker(Reranker):
|
||||
return tbl
|
||||
|
||||
def _combine_score(self, vector_score, fts_score):
|
||||
# these scores represent distance
|
||||
return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score)
|
||||
# Both vector_score (inverted distance) and fts_score are in a
|
||||
# "higher = more relevant" space. A straight weighted average gives
|
||||
# higher _relevance_score to better matches, as expected.
|
||||
# Previously this returned `1 - (...)` which inverted the final
|
||||
# ranking so that the *least* relevant document ranked first.
|
||||
return self.weight * vector_score + (1 - self.weight) * fts_score
|
||||
|
||||
def _invert_score(self, dist: float):
|
||||
# Invert the score between relevance and distance
|
||||
|
||||
@@ -174,6 +174,24 @@ if TYPE_CHECKING:
|
||||
DistanceType,
|
||||
)
|
||||
|
||||
# Type alias for index configuration objects
|
||||
IndexConfigType = Union[
|
||||
IvfFlat,
|
||||
IvfPq,
|
||||
IvfSq,
|
||||
IvfRq,
|
||||
HnswFlat,
|
||||
HnswPq,
|
||||
HnswSq,
|
||||
BTree,
|
||||
Bitmap,
|
||||
LabelList,
|
||||
FTS,
|
||||
]
|
||||
|
||||
# Known distance metrics for legacy API detection
|
||||
KNOWN_METRICS = {"l2", "cosine", "dot", "hamming"}
|
||||
|
||||
|
||||
def _into_pyarrow_reader(
|
||||
data, schema: Optional[pa.Schema] = None
|
||||
@@ -807,11 +825,49 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
# New unified API overload
|
||||
@overload
|
||||
def create_index(
|
||||
self,
|
||||
metric="l2",
|
||||
num_partitions=256,
|
||||
num_sub_vectors=96,
|
||||
column: str,
|
||||
/,
|
||||
*,
|
||||
config: IndexConfigType,
|
||||
replace: bool = ...,
|
||||
wait_timeout: Optional[timedelta] = ...,
|
||||
name: Optional[str] = ...,
|
||||
train: bool = ...,
|
||||
) -> None: ...
|
||||
|
||||
# Legacy API overload (deprecated)
|
||||
@overload
|
||||
def create_index(
|
||||
self,
|
||||
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
|
||||
num_partitions: Optional[int] = ...,
|
||||
num_sub_vectors: Optional[int] = ...,
|
||||
vector_column_name: str = ...,
|
||||
replace: bool = ...,
|
||||
accelerator: Optional[str] = ...,
|
||||
index_cache_size: Optional[int] = ...,
|
||||
*,
|
||||
index_type: VectorIndexType = ...,
|
||||
wait_timeout: Optional[timedelta] = ...,
|
||||
num_bits: int = ...,
|
||||
max_iterations: int = ...,
|
||||
sample_rate: int = ...,
|
||||
m: int = ...,
|
||||
ef_construction: int = ...,
|
||||
name: Optional[str] = ...,
|
||||
train: bool = ...,
|
||||
target_partition_size: Optional[int] = ...,
|
||||
) -> None: ...
|
||||
|
||||
def create_index(
|
||||
self,
|
||||
metric: DistanceType = "l2",
|
||||
num_partitions: Optional[int] = None,
|
||||
num_sub_vectors: Optional[int] = None,
|
||||
vector_column_name: str = VECTOR_COLUMN_NAME,
|
||||
replace: bool = True,
|
||||
accelerator: Optional[str] = None,
|
||||
@@ -824,46 +880,53 @@ class Table(ABC):
|
||||
sample_rate: int = 256,
|
||||
m: int = 20,
|
||||
ef_construction: int = 300,
|
||||
config: Optional[IndexConfigType] = None,
|
||||
name: Optional[str] = None,
|
||||
train: bool = True,
|
||||
target_partition_size: Optional[int] = None,
|
||||
):
|
||||
"""Create an index on the table.
|
||||
"""Create an index on a column.
|
||||
|
||||
This method supports both the new unified API and the legacy API
|
||||
for backwards compatibility. The new API takes the column name as the
|
||||
first positional argument and an index configuration object via
|
||||
``config``; the legacy API takes the distance metric as the first
|
||||
argument plus separate ``vector_column_name`` / ``num_partitions`` /
|
||||
etc. parameters, and emits a ``DeprecationWarning``.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
metric: str, default "l2"
|
||||
The distance metric to use when creating the index.
|
||||
Valid values are "l2", "cosine", "dot", or "hamming".
|
||||
l2 is euclidean distance.
|
||||
Hamming is available only for binary vectors.
|
||||
num_partitions: int, default 256
|
||||
The number of IVF partitions to use when creating the index.
|
||||
Default is 256.
|
||||
num_sub_vectors: int, default 96
|
||||
The number of PQ sub-vectors to use when creating the index.
|
||||
Default is 96.
|
||||
vector_column_name: str, default "vector"
|
||||
The vector column name to create the index.
|
||||
replace: bool, default True
|
||||
- If True, replace the existing index if it exists.
|
||||
metric : str
|
||||
For new API: the column name to index.
|
||||
For legacy API: the distance metric ("l2", "cosine", "dot", "hamming").
|
||||
config : IndexConfigType, optional
|
||||
The index configuration object. If provided, uses the new unified API.
|
||||
Can be one of: IvfFlat, IvfPq, IvfSq, IvfRq, HnswPq, HnswSq,
|
||||
BTree, Bitmap, LabelList, FTS.
|
||||
replace : bool, default True
|
||||
Whether to replace an existing index on this column.
|
||||
wait_timeout : timedelta, optional
|
||||
Timeout to wait for async indexing to complete.
|
||||
name : str, optional
|
||||
Custom name for the index.
|
||||
train : bool, default True
|
||||
Whether to train the index with existing data.
|
||||
|
||||
- If False, raise an error if duplicate index exists.
|
||||
accelerator: str, default None
|
||||
If set, use the given accelerator to create the index.
|
||||
Only support "cuda" for now.
|
||||
index_cache_size : int, optional
|
||||
The size of the index cache in number of entries. Default value is 256.
|
||||
num_bits: int
|
||||
The number of bits to encode sub-vectors. Only used with the IVF_PQ index.
|
||||
Only 4 and 8 are supported.
|
||||
wait_timeout: timedelta, optional
|
||||
The timeout to wait if indexing is asynchronous.
|
||||
name: str, optional
|
||||
The name of the index. If not provided, a default name will be generated.
|
||||
train: bool, default True
|
||||
Whether to train the index with existing data. Vector indices always train
|
||||
with existing data.
|
||||
Examples
|
||||
--------
|
||||
New API (recommended):
|
||||
|
||||
>>> table.create_index( # doctest: +SKIP
|
||||
... "vector", config=IvfPq(distance_type="l2")
|
||||
... )
|
||||
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
|
||||
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
|
||||
|
||||
Legacy API (deprecated):
|
||||
|
||||
>>> table.create_index( # doctest: +SKIP
|
||||
... "l2", vector_column_name="vector"
|
||||
... )
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -1188,7 +1251,7 @@ class Table(ABC):
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .execute(new_data)
|
||||
>>> res
|
||||
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
|
||||
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
|
||||
>>> # The order of new rows is non-deterministic since we use
|
||||
>>> # a hash-join as part of this operation and so we sort here
|
||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||
@@ -2178,7 +2241,7 @@ class LanceTable(Table):
|
||||
return LOOP.run(self._table.count_rows(filter))
|
||||
|
||||
def __repr__(self) -> str:
|
||||
val = f"{self.__class__.__name__}(name={self.name!r}, version={self.version}"
|
||||
val = f"{self.__class__.__name__}(name={self.name!r}"
|
||||
if self._conn.read_consistency_interval is not None:
|
||||
val += ", read_consistency_interval={!r}".format(
|
||||
self._conn.read_consistency_interval
|
||||
@@ -2250,11 +2313,51 @@ class LanceTable(Table):
|
||||
dataset, allow_pyarrow_filter=False, batch_size=batch_size
|
||||
)
|
||||
|
||||
# New unified API overload
|
||||
@overload
|
||||
def create_index(
|
||||
self,
|
||||
metric: DistanceType = "l2",
|
||||
num_partitions=None,
|
||||
num_sub_vectors=None,
|
||||
column: str,
|
||||
/,
|
||||
*,
|
||||
config: IndexConfigType,
|
||||
replace: bool = ...,
|
||||
wait_timeout: Optional[timedelta] = ...,
|
||||
name: Optional[str] = ...,
|
||||
train: bool = ...,
|
||||
) -> None: ...
|
||||
|
||||
# Legacy API overload (deprecated)
|
||||
@overload
|
||||
def create_index(
|
||||
self,
|
||||
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
|
||||
num_partitions: Optional[int] = ...,
|
||||
num_sub_vectors: Optional[int] = ...,
|
||||
vector_column_name: str = ...,
|
||||
replace: bool = ...,
|
||||
accelerator: Optional[str] = ...,
|
||||
index_cache_size: Optional[int] = ...,
|
||||
num_bits: int = ...,
|
||||
index_type: Literal[
|
||||
"IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_RQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
|
||||
] = ...,
|
||||
max_iterations: int = ...,
|
||||
sample_rate: int = ...,
|
||||
m: int = ...,
|
||||
ef_construction: int = ...,
|
||||
*,
|
||||
wait_timeout: Optional[timedelta] = ...,
|
||||
name: Optional[str] = ...,
|
||||
train: bool = ...,
|
||||
target_partition_size: Optional[int] = ...,
|
||||
) -> None: ...
|
||||
|
||||
def create_index(
|
||||
self,
|
||||
metric: str = "l2",
|
||||
num_partitions: Optional[int] = None,
|
||||
num_sub_vectors: Optional[int] = None,
|
||||
vector_column_name: str = VECTOR_COLUMN_NAME,
|
||||
replace: bool = True,
|
||||
accelerator: Optional[str] = None,
|
||||
@@ -2274,47 +2377,232 @@ class LanceTable(Table):
|
||||
m: int = 20,
|
||||
ef_construction: int = 300,
|
||||
*,
|
||||
config: Optional[IndexConfigType] = None,
|
||||
wait_timeout: Optional[timedelta] = None,
|
||||
name: Optional[str] = None,
|
||||
train: bool = True,
|
||||
target_partition_size: Optional[int] = None,
|
||||
):
|
||||
"""Create an index on the table."""
|
||||
if accelerator is not None:
|
||||
# accelerator is only supported through pylance.
|
||||
self.to_lance().create_index(
|
||||
column=vector_column_name,
|
||||
index_type=index_type,
|
||||
"""Create an index on a column.
|
||||
|
||||
This method supports both the new unified API and the legacy API
|
||||
for backwards compatibility. The new API takes the column name as the
|
||||
first positional argument and an index configuration object via
|
||||
``config``; the legacy API takes the distance metric as the first
|
||||
argument plus separate ``vector_column_name`` / ``num_partitions`` /
|
||||
etc. parameters, and emits a ``DeprecationWarning``.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
metric : str
|
||||
For new API: the column name to index.
|
||||
For legacy API: the distance metric ("l2", "cosine", "dot", "hamming").
|
||||
config : IndexConfigType, optional
|
||||
The index configuration object. If provided, uses the new unified API.
|
||||
Can be one of: IvfFlat, IvfPq, IvfSq, IvfRq, HnswPq, HnswSq,
|
||||
BTree, Bitmap, LabelList, FTS.
|
||||
replace : bool, default True
|
||||
Whether to replace an existing index on this column.
|
||||
wait_timeout : timedelta, optional
|
||||
Timeout to wait for async indexing to complete.
|
||||
name : str, optional
|
||||
Custom name for the index.
|
||||
train : bool, default True
|
||||
Whether to train the index with existing data.
|
||||
|
||||
Examples
|
||||
--------
|
||||
New API (recommended):
|
||||
|
||||
>>> table.create_index( # doctest: +SKIP
|
||||
... "vector", config=IvfPq(distance_type="l2")
|
||||
... )
|
||||
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
|
||||
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
|
||||
|
||||
Legacy API (deprecated):
|
||||
|
||||
>>> table.create_index( # doctest: +SKIP
|
||||
... "l2", vector_column_name="vector"
|
||||
... )
|
||||
"""
|
||||
# Detect whether this is a legacy API call
|
||||
is_legacy = self._is_legacy_create_index_call(
|
||||
metric,
|
||||
config,
|
||||
num_partitions,
|
||||
num_sub_vectors,
|
||||
vector_column_name,
|
||||
accelerator,
|
||||
index_cache_size,
|
||||
)
|
||||
|
||||
if is_legacy:
|
||||
warnings.warn(
|
||||
"The create_index() API with metric/num_partitions parameters is "
|
||||
"deprecated and will be removed in a future version. "
|
||||
"Please migrate to the new unified API:\n"
|
||||
" # Old (deprecated):\n"
|
||||
" table.create_index('l2', vector_column_name='my_vector')\n"
|
||||
" # New (recommended):\n"
|
||||
" table.create_index('my_vector', config=IvfPq(distance_type='l2'))",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
# Legacy API: first arg is the distance metric
|
||||
column = vector_column_name
|
||||
|
||||
# Build config from legacy parameters
|
||||
config = self._build_vector_config_from_legacy_params(
|
||||
metric=metric,
|
||||
index_type=index_type,
|
||||
num_partitions=num_partitions,
|
||||
num_sub_vectors=num_sub_vectors,
|
||||
replace=replace,
|
||||
accelerator=accelerator,
|
||||
index_cache_size=index_cache_size,
|
||||
num_bits=num_bits,
|
||||
max_iterations=max_iterations,
|
||||
sample_rate=sample_rate,
|
||||
m=m,
|
||||
ef_construction=ef_construction,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
self.checkout_latest()
|
||||
return
|
||||
elif index_type == "IVF_FLAT":
|
||||
config = IvfFlat(
|
||||
|
||||
# Handle accelerator through pylance
|
||||
if accelerator is not None:
|
||||
self.to_lance().create_index(
|
||||
column=column,
|
||||
index_type=index_type,
|
||||
metric=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_sub_vectors=num_sub_vectors,
|
||||
replace=replace,
|
||||
accelerator=accelerator,
|
||||
index_cache_size=index_cache_size,
|
||||
num_bits=num_bits,
|
||||
m=m,
|
||||
ef_construction=ef_construction,
|
||||
target_partition_size=target_partition_size,
|
||||
)
|
||||
self.checkout_latest()
|
||||
return
|
||||
else:
|
||||
# New API: metric is the column name
|
||||
column = metric
|
||||
|
||||
# Check if config has accelerator set and dispatch to pylance
|
||||
if config is not None and hasattr(config, "accelerator"):
|
||||
acc = getattr(config, "accelerator", None)
|
||||
if acc is not None:
|
||||
# Dispatch to pylance for GPU acceleration
|
||||
index_type_map = {
|
||||
"IvfFlat": "IVF_FLAT",
|
||||
"IvfSq": "IVF_SQ",
|
||||
"IvfPq": "IVF_PQ",
|
||||
"IvfRq": "IVF_RQ",
|
||||
"HnswPq": "IVF_HNSW_PQ",
|
||||
"HnswSq": "IVF_HNSW_SQ",
|
||||
}
|
||||
cfg_type = type(config).__name__
|
||||
lance_index_type = index_type_map.get(cfg_type, "IVF_PQ")
|
||||
|
||||
self.to_lance().create_index(
|
||||
column=column,
|
||||
index_type=lance_index_type,
|
||||
metric=getattr(config, "distance_type", "l2"),
|
||||
num_partitions=getattr(config, "num_partitions", None),
|
||||
num_sub_vectors=getattr(config, "num_sub_vectors", None),
|
||||
replace=replace,
|
||||
accelerator=acc,
|
||||
num_bits=getattr(config, "num_bits", 8),
|
||||
m=getattr(config, "m", 20),
|
||||
ef_construction=getattr(config, "ef_construction", 300),
|
||||
target_partition_size=getattr(
|
||||
config, "target_partition_size", None
|
||||
),
|
||||
)
|
||||
self.checkout_latest()
|
||||
return
|
||||
|
||||
return LOOP.run(
|
||||
self._table.create_index(
|
||||
column,
|
||||
replace=replace,
|
||||
config=config,
|
||||
wait_timeout=wait_timeout,
|
||||
name=name,
|
||||
train=train,
|
||||
)
|
||||
)
|
||||
|
||||
def _is_legacy_create_index_call(
|
||||
self,
|
||||
first_arg: str,
|
||||
config: Optional[IndexConfigType],
|
||||
num_partitions: Optional[int],
|
||||
num_sub_vectors: Optional[int],
|
||||
vector_column_name: str,
|
||||
accelerator: Optional[str],
|
||||
index_cache_size: Optional[int],
|
||||
) -> bool:
|
||||
"""Detect if this is a legacy create_index call."""
|
||||
# If config is provided, it's definitely the new API
|
||||
if config is not None:
|
||||
return False
|
||||
|
||||
# If old-style parameters were explicitly set, it's legacy
|
||||
if any(
|
||||
x is not None
|
||||
for x in (num_partitions, num_sub_vectors, accelerator, index_cache_size)
|
||||
):
|
||||
return True
|
||||
|
||||
# If vector_column_name differs from default, it's legacy
|
||||
if vector_column_name != VECTOR_COLUMN_NAME:
|
||||
return True
|
||||
|
||||
# If first arg is a known metric, assume legacy
|
||||
if first_arg.lower() in KNOWN_METRICS:
|
||||
return True
|
||||
|
||||
# Otherwise assume new API
|
||||
return False
|
||||
|
||||
def _build_vector_config_from_legacy_params(
|
||||
self,
|
||||
metric: str,
|
||||
index_type: str,
|
||||
num_partitions: Optional[int],
|
||||
num_sub_vectors: Optional[int],
|
||||
num_bits: int,
|
||||
max_iterations: int,
|
||||
sample_rate: int,
|
||||
m: int,
|
||||
ef_construction: int,
|
||||
target_partition_size: Optional[int],
|
||||
accelerator: Optional[str],
|
||||
) -> IndexConfigType:
|
||||
"""Build an index config object from legacy parameters."""
|
||||
if index_type == "IVF_FLAT":
|
||||
return IvfFlat(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
max_iterations=max_iterations,
|
||||
sample_rate=sample_rate,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
elif index_type == "IVF_SQ":
|
||||
config = IvfSq(
|
||||
return IvfSq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
max_iterations=max_iterations,
|
||||
sample_rate=sample_rate,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
elif index_type == "IVF_PQ":
|
||||
config = IvfPq(
|
||||
return IvfPq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_sub_vectors=num_sub_vectors,
|
||||
@@ -2322,18 +2610,20 @@ class LanceTable(Table):
|
||||
max_iterations=max_iterations,
|
||||
sample_rate=sample_rate,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
elif index_type == "IVF_RQ":
|
||||
config = IvfRq(
|
||||
return IvfRq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_bits=num_bits,
|
||||
max_iterations=max_iterations,
|
||||
sample_rate=sample_rate,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
elif index_type == "IVF_HNSW_PQ":
|
||||
config = HnswPq(
|
||||
return HnswPq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
num_sub_vectors=num_sub_vectors,
|
||||
@@ -2343,9 +2633,10 @@ class LanceTable(Table):
|
||||
m=m,
|
||||
ef_construction=ef_construction,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
elif index_type == "IVF_HNSW_SQ":
|
||||
config = HnswSq(
|
||||
return HnswSq(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
max_iterations=max_iterations,
|
||||
@@ -2353,9 +2644,10 @@ class LanceTable(Table):
|
||||
m=m,
|
||||
ef_construction=ef_construction,
|
||||
target_partition_size=target_partition_size,
|
||||
accelerator=accelerator,
|
||||
)
|
||||
elif index_type == "IVF_HNSW_FLAT":
|
||||
config = HnswFlat(
|
||||
return HnswFlat(
|
||||
distance_type=metric,
|
||||
num_partitions=num_partitions,
|
||||
max_iterations=max_iterations,
|
||||
@@ -2367,16 +2659,6 @@ class LanceTable(Table):
|
||||
else:
|
||||
raise ValueError(f"Unknown index type {index_type}")
|
||||
|
||||
return LOOP.run(
|
||||
self._table.create_index(
|
||||
vector_column_name,
|
||||
replace=replace,
|
||||
config=config,
|
||||
name=name,
|
||||
train=train,
|
||||
)
|
||||
)
|
||||
|
||||
def drop_index(self, name: str) -> None:
|
||||
"""
|
||||
Drops an index from the table
|
||||
@@ -2476,6 +2758,11 @@ class LanceTable(Table):
|
||||
"""
|
||||
return LOOP.run(self._table.latest_storage_options())
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.25.0",
|
||||
current_version=__version__,
|
||||
details="Use create_index() with config=BTree()/Bitmap()/LabelList() instead.",
|
||||
)
|
||||
def create_scalar_index(
|
||||
self,
|
||||
column: str,
|
||||
@@ -2484,6 +2771,12 @@ class LanceTable(Table):
|
||||
index_type: ScalarIndexType = "BTREE",
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
"""Create a scalar index on a column.
|
||||
|
||||
.. deprecated:: 0.25.0
|
||||
Use :meth:`create_index` with a BTree, Bitmap, or LabelList config instead.
|
||||
Example: ``table.create_index("column", config=BTree())``
|
||||
"""
|
||||
if index_type == "BTREE":
|
||||
config = BTree()
|
||||
elif index_type == "BITMAP":
|
||||
@@ -2496,6 +2789,11 @@ class LanceTable(Table):
|
||||
self._table.create_index(column, replace=replace, config=config, name=name)
|
||||
)
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.25.0",
|
||||
current_version=__version__,
|
||||
details="Use create_index() with config=FTS() instead.",
|
||||
)
|
||||
def create_fts_index(
|
||||
self,
|
||||
field_names: Union[str, List[str]],
|
||||
@@ -2519,6 +2817,12 @@ class LanceTable(Table):
|
||||
prefix_only: bool = False,
|
||||
name: Optional[str] = None,
|
||||
):
|
||||
"""Create a full-text search index on a column.
|
||||
|
||||
.. deprecated:: 0.25.0
|
||||
Use :meth:`create_index` with an FTS config instead.
|
||||
Example: ``table.create_index("text_column", config=FTS())``
|
||||
"""
|
||||
self._ensure_no_legacy_fts_index()
|
||||
|
||||
if use_tantivy:
|
||||
@@ -3297,6 +3601,11 @@ class LanceTable(Table):
|
||||
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
|
||||
return LOOP.run(self._table.unset_lsm_write_spec())
|
||||
|
||||
def close_lsm_writers(self) -> None:
|
||||
"""Close cached MemWAL shard writers. See
|
||||
[`AsyncTable.close_lsm_writers`][lancedb.AsyncTable.close_lsm_writers]."""
|
||||
return LOOP.run(self._table.close_lsm_writers())
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
"""
|
||||
Check if the table is using the new v2 manifest paths.
|
||||
@@ -3905,6 +4214,16 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.unset_lsm_write_spec()
|
||||
|
||||
async def close_lsm_writers(self) -> None:
|
||||
"""Drain and close any cached MemWAL shard writers for this table.
|
||||
|
||||
When an LSM write spec is installed, `merge_insert` opens MemWAL shard
|
||||
writers and caches them for reuse across calls. This closes them,
|
||||
flushing pending data; writers reopen lazily on the next
|
||||
`merge_insert`. It is a no-op when no writers are cached.
|
||||
"""
|
||||
await self._inner.close_lsm_writers()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table."""
|
||||
@@ -4355,7 +4674,7 @@ class AsyncTable:
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .execute(new_data)
|
||||
>>> res
|
||||
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
|
||||
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
|
||||
>>> # The order of new rows is non-deterministic since we use
|
||||
>>> # a hash-join as part of this operation and so we sort here
|
||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||
@@ -4735,6 +5054,8 @@ class AsyncTable:
|
||||
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
|
||||
timeout=merge._timeout,
|
||||
use_index=merge._use_index,
|
||||
use_lsm_write=merge._use_lsm_write,
|
||||
validate_single_shard=merge._validate_single_shard,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import pathlib
|
||||
import warnings
|
||||
from datetime import date, datetime
|
||||
from functools import singledispatch
|
||||
from typing import Tuple, Union, Optional, Any
|
||||
from typing import Tuple, Union, Optional, Any, List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import numpy as np
|
||||
@@ -189,7 +189,33 @@ def flatten_columns(tbl: pa.Table, flatten: Optional[Union[int, bool]] = None):
|
||||
return tbl
|
||||
|
||||
|
||||
def inf_vector_column_query(schema: pa.Schema) -> str:
|
||||
def _format_field_path(path: List[str]) -> str:
|
||||
def format_segment(segment: str) -> str:
|
||||
if all(char.isalnum() or char == "_" for char in segment):
|
||||
return segment
|
||||
return f"`{segment.replace('`', '``')}`"
|
||||
|
||||
return ".".join(format_segment(segment) for segment in path)
|
||||
|
||||
|
||||
def _iter_vector_columns(
|
||||
field: pa.Field, path: List[str], dim: Optional[int] = None
|
||||
) -> List[str]:
|
||||
field_path = [*path, field.name]
|
||||
if is_vector_column(field.type):
|
||||
vector_dim = infer_vector_column_dim(field.type)
|
||||
if dim is None or vector_dim == dim:
|
||||
return [_format_field_path(field_path)]
|
||||
return []
|
||||
if pa.types.is_struct(field.type):
|
||||
columns = []
|
||||
for idx in range(field.type.num_fields):
|
||||
columns.extend(_iter_vector_columns(field.type.field(idx), field_path, dim))
|
||||
return columns
|
||||
return []
|
||||
|
||||
|
||||
def inf_vector_column_query(schema: pa.Schema, dim: Optional[int] = None) -> str:
|
||||
"""
|
||||
Get the vector column name
|
||||
|
||||
@@ -202,26 +228,21 @@ def inf_vector_column_query(schema: pa.Schema) -> str:
|
||||
-------
|
||||
str: the vector column name.
|
||||
"""
|
||||
vector_col_name = ""
|
||||
vector_col_count = 0
|
||||
for field_name in schema.names:
|
||||
field = schema.field(field_name)
|
||||
if is_vector_column(field.type):
|
||||
vector_col_count += 1
|
||||
if vector_col_count > 1:
|
||||
raise ValueError(
|
||||
"Schema has more than one vector column. "
|
||||
"Please specify the vector column name "
|
||||
"for vector search"
|
||||
)
|
||||
elif vector_col_count == 1:
|
||||
vector_col_name = field_name
|
||||
if vector_col_count == 0:
|
||||
vector_col_names = []
|
||||
for field in schema:
|
||||
vector_col_names.extend(_iter_vector_columns(field, [], dim))
|
||||
if len(vector_col_names) > 1:
|
||||
raise ValueError(
|
||||
"Schema has more than one vector column. "
|
||||
"Please specify the vector column name "
|
||||
f"for vector search. Candidates: {vector_col_names}"
|
||||
)
|
||||
if len(vector_col_names) == 0:
|
||||
raise ValueError(
|
||||
"There is no vector column in the data. "
|
||||
"Please specify the vector column name for vector search"
|
||||
)
|
||||
return vector_col_name
|
||||
return vector_col_names[0]
|
||||
|
||||
|
||||
def is_vector_column(data_type: pa.DataType) -> bool:
|
||||
@@ -247,6 +268,29 @@ def is_vector_column(data_type: pa.DataType) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def infer_vector_column_dim(data_type: pa.DataType) -> Optional[int]:
|
||||
if pa.types.is_fixed_size_list(data_type):
|
||||
return data_type.list_size
|
||||
if pa.types.is_list(data_type):
|
||||
return infer_vector_column_dim(data_type.value_type)
|
||||
return None
|
||||
|
||||
|
||||
def _query_vector_dim(query: Optional[Any]) -> Optional[int]:
|
||||
if query is None:
|
||||
return None
|
||||
if isinstance(query, np.ndarray):
|
||||
if query.ndim == 0:
|
||||
return None
|
||||
return query.shape[-1]
|
||||
if isinstance(query, list) and query:
|
||||
first = query[0]
|
||||
if isinstance(first, (list, tuple, np.ndarray)):
|
||||
return len(first)
|
||||
return len(query)
|
||||
return None
|
||||
|
||||
|
||||
def infer_vector_column_name(
|
||||
schema: pa.Schema,
|
||||
query_type: str,
|
||||
@@ -262,7 +306,9 @@ def infer_vector_column_name(
|
||||
|
||||
if query is not None or query_type == "hybrid":
|
||||
try:
|
||||
vector_column_name = inf_vector_column_query(schema)
|
||||
vector_column_name = inf_vector_column_query(
|
||||
schema, dim=_query_vector_dim(query)
|
||||
)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ async def test_upsert_async(mem_db_async):
|
||||
await table.count_rows() # 3
|
||||
res
|
||||
# MergeResult(version=2, num_updated_rows=1,
|
||||
# num_inserted_rows=1, num_deleted_rows=0)
|
||||
# num_inserted_rows=1, num_deleted_rows=0, num_rows=2)
|
||||
# --8<-- [end:upsert_basic_async]
|
||||
assert await table.count_rows() == 3
|
||||
assert res.version == 2
|
||||
@@ -86,7 +86,7 @@ def test_insert_if_not_exists(mem_db):
|
||||
table.count_rows() # 3
|
||||
res
|
||||
# MergeResult(version=2, num_updated_rows=0,
|
||||
# num_inserted_rows=1, num_deleted_rows=0)
|
||||
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
|
||||
# --8<-- [end:insert_if_not_exists]
|
||||
assert table.count_rows() == 3
|
||||
assert res.version == 2
|
||||
@@ -116,7 +116,7 @@ async def test_insert_if_not_exists_async(mem_db_async):
|
||||
await table.count_rows() # 3
|
||||
res
|
||||
# MergeResult(version=2, num_updated_rows=0,
|
||||
# num_inserted_rows=1, num_deleted_rows=0)
|
||||
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
|
||||
# --8<-- [end:insert_if_not_exists]
|
||||
assert await table.count_rows() == 3
|
||||
assert res.version == 2
|
||||
@@ -150,7 +150,7 @@ def test_replace_range(mem_db):
|
||||
table.count_rows("doc_id = 1") # 1
|
||||
res
|
||||
# MergeResult(version=2, num_updated_rows=1,
|
||||
# num_inserted_rows=0, num_deleted_rows=1)
|
||||
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
|
||||
# --8<-- [end:insert_if_not_exists]
|
||||
assert table.count_rows("doc_id = 1") == 1
|
||||
assert res.version == 2
|
||||
@@ -185,7 +185,7 @@ async def test_replace_range_async(mem_db_async):
|
||||
await table.count_rows("doc_id = 1") # 1
|
||||
res
|
||||
# MergeResult(version=2, num_updated_rows=1,
|
||||
# num_inserted_rows=0, num_deleted_rows=1)
|
||||
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
|
||||
# --8<-- [end:insert_if_not_exists]
|
||||
assert await table.count_rows("doc_id = 1") == 1
|
||||
assert res.version == 2
|
||||
|
||||
@@ -6,6 +6,7 @@ import re
|
||||
import sys
|
||||
from datetime import timedelta
|
||||
import os
|
||||
from types import SimpleNamespace
|
||||
|
||||
import lancedb
|
||||
import numpy as np
|
||||
@@ -188,6 +189,43 @@ def test_table_names(tmp_db: lancedb.DBConnection):
|
||||
assert len(result) == 3
|
||||
|
||||
|
||||
def test_db_contains_and_len_include_all_table_name_pages(tmp_db: lancedb.DBConnection):
|
||||
for idx in range(20):
|
||||
tmp_db.create_table(f"table_{idx}", data=[{"id": idx}])
|
||||
|
||||
assert len(tmp_db) == 20
|
||||
for idx in range(20):
|
||||
assert f"table_{idx}" in tmp_db
|
||||
assert "does_not_exist" not in tmp_db
|
||||
|
||||
|
||||
def test_db_contains_stops_after_matching_table_page(
|
||||
tmp_db: lancedb.DBConnection, monkeypatch
|
||||
):
|
||||
calls = []
|
||||
pages = {
|
||||
None: SimpleNamespace(tables=["table_0", "table_1"], page_token="next"),
|
||||
"next": SimpleNamespace(tables=["table_2"], page_token=None),
|
||||
}
|
||||
|
||||
def list_tables(*, page_token=None, **_kwargs):
|
||||
calls.append(page_token)
|
||||
return pages[page_token]
|
||||
|
||||
monkeypatch.setattr(tmp_db, "list_tables", list_tables)
|
||||
|
||||
assert "table_1" in tmp_db
|
||||
assert calls == [None]
|
||||
|
||||
calls.clear()
|
||||
assert "table_2" in tmp_db
|
||||
assert calls == [None, "next"]
|
||||
|
||||
calls.clear()
|
||||
assert len(tmp_db) == 3
|
||||
assert calls == [None, "next"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_table_names_async(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
@@ -428,7 +466,8 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
|
||||
assert await tbl.uses_v2_manifest_paths()
|
||||
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
|
||||
for manifest in os.listdir(manifests_dir):
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
if manifest.endswith(".manifest"):
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
|
||||
# Start a table in V1 mode then migrate
|
||||
tbl = await db_no_v2_paths.create_table(
|
||||
@@ -438,13 +477,15 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
|
||||
assert not await tbl.uses_v2_manifest_paths()
|
||||
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
|
||||
for manifest in os.listdir(manifests_dir):
|
||||
assert re.match(r"\d\.manifest", manifest)
|
||||
if manifest.endswith(".manifest"):
|
||||
assert re.match(r"\d\.manifest", manifest)
|
||||
|
||||
await tbl.migrate_manifest_paths_v2()
|
||||
assert await tbl.uses_v2_manifest_paths()
|
||||
|
||||
for manifest in os.listdir(manifests_dir):
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
if manifest.endswith(".manifest"):
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -215,11 +215,12 @@ def test_reject_legacy_tantivy_index(table):
|
||||
|
||||
@pytest.mark.parametrize("with_position", [True, False])
|
||||
def test_create_inverted_index(table, with_position):
|
||||
table.create_fts_index(
|
||||
"text",
|
||||
with_position=with_position,
|
||||
name="custom_fts_index",
|
||||
)
|
||||
with pytest.warns(DeprecationWarning, match="create_fts_index"):
|
||||
table.create_fts_index(
|
||||
"text",
|
||||
with_position=with_position,
|
||||
name="custom_fts_index",
|
||||
)
|
||||
indices = table.list_indices()
|
||||
fts_indices = [i for i in indices if i.index_type == "FTS"]
|
||||
assert any(i.name == "custom_fts_index" for i in fts_indices)
|
||||
@@ -563,7 +564,7 @@ def test_create_index_multiple_columns(tmp_path, table):
|
||||
|
||||
|
||||
def test_nested_schema(tmp_path, table):
|
||||
table.create_fts_index("nested.text")
|
||||
table.create_fts_index("nested.text", with_position=True)
|
||||
indices = table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type == "FTS"
|
||||
@@ -577,6 +578,98 @@ def test_nested_schema(tmp_path, table):
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
results = table.search(MatchQuery("puppy", "nested.text")).limit(5).to_list()
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
phrase_results = (
|
||||
table.search(PhraseQuery("puppy runs", "nested.text")).limit(5).to_list()
|
||||
)
|
||||
assert len(phrase_results) > 0
|
||||
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
|
||||
|
||||
hybrid_results = (
|
||||
table.search(query_type="hybrid", fts_columns="nested.text")
|
||||
.vector([0 for _ in range(128)])
|
||||
.text("puppy")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_schema_async(async_table):
|
||||
await async_table.create_index("nested.text", config=FTS(with_position=True))
|
||||
indices = await async_table.list_indices()
|
||||
assert len(indices) == 1
|
||||
assert indices[0].index_type == "FTS"
|
||||
assert indices[0].columns == ["nested.text"]
|
||||
|
||||
results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text("puppy", columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text(MatchQuery("puppy", "nested.text"))
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(results) > 0
|
||||
assert all("puppy" in row["nested"]["text"] for row in results)
|
||||
|
||||
phrase_results = await (
|
||||
async_table.query()
|
||||
.nearest_to_text(PhraseQuery("puppy runs", "nested.text"))
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(phrase_results) > 0
|
||||
assert all("puppy runs" in row["nested"]["text"] for row in phrase_results)
|
||||
|
||||
hybrid_results = await (
|
||||
async_table.query()
|
||||
.nearest_to([0 for _ in range(128)])
|
||||
.nearest_to_text("puppy", columns="nested.text")
|
||||
.limit(5)
|
||||
.to_list()
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
|
||||
def test_nested_schema_rejects_invalid_fts_fields(tmp_path):
|
||||
db = ldb.connect(tmp_path)
|
||||
data = pa.table(
|
||||
{
|
||||
"payload": pa.array(
|
||||
[
|
||||
{"text": "puppy runs", "count": 1},
|
||||
{"text": "car drives", "count": 2},
|
||||
]
|
||||
),
|
||||
"vector": pa.array(
|
||||
[[0.1, 0.1], [0.2, 0.2]],
|
||||
type=pa.list_(pa.float32(), list_size=2),
|
||||
),
|
||||
}
|
||||
)
|
||||
table = db.create_table("test", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="FTS index cannot be created.*payload"):
|
||||
table.create_fts_index("payload")
|
||||
|
||||
with pytest.raises(ValueError, match="FTS index cannot be created.*count"):
|
||||
table.create_fts_index("payload.count")
|
||||
|
||||
with pytest.raises(ValueError, match="Field path `payload.missing` not found"):
|
||||
table.create_fts_index("payload.missing")
|
||||
|
||||
|
||||
def test_search_index_with_filter(table):
|
||||
table.create_fts_index("text")
|
||||
|
||||
@@ -105,6 +105,46 @@ async def test_create_scalar_index(some_table: AsyncTable):
|
||||
assert len(indices) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_nested_scalar_index_lists_canonical_paths(db_async):
|
||||
metadata_type = pa.struct(
|
||||
[
|
||||
pa.field("user_id", pa.int32()),
|
||||
pa.field("user.id", pa.int32()),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_arrays(
|
||||
[
|
||||
pa.array([1, 2, 3], type=pa.int32()),
|
||||
pa.array(
|
||||
[
|
||||
{"user_id": 10, "user.id": 100},
|
||||
{"user_id": 20, "user.id": 200},
|
||||
{"user_id": 30, "user.id": 300},
|
||||
],
|
||||
type=metadata_type,
|
||||
),
|
||||
],
|
||||
names=["user_id", "metadata"],
|
||||
)
|
||||
table = await db_async.create_table("nested_scalar_index", data)
|
||||
|
||||
await table.create_index("user_id", config=BTree(), name="top_user_id_idx")
|
||||
await table.create_index(
|
||||
"metadata.user_id", config=BTree(), name="nested_user_id_idx"
|
||||
)
|
||||
await table.create_index(
|
||||
"metadata.`user.id`", config=BTree(), name="escaped_user_id_idx"
|
||||
)
|
||||
|
||||
columns_by_name = {
|
||||
index.name: index.columns for index in await table.list_indices()
|
||||
}
|
||||
assert columns_by_name["top_user_id_idx"] == ["user_id"]
|
||||
assert columns_by_name["nested_user_id_idx"] == ["metadata.user_id"]
|
||||
assert columns_by_name["escaped_user_id_idx"] == ["metadata.`user.id`"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_fixed_size_binary_index(some_table: AsyncTable):
|
||||
await some_table.create_index("fsb", config=BTree())
|
||||
@@ -122,12 +162,13 @@ async def test_create_bitmap_index(some_table: AsyncTable):
|
||||
await some_table.create_index("data", config=Bitmap())
|
||||
indices = await some_table.list_indices()
|
||||
assert len(indices) == 3
|
||||
# list_indices returns indices in alphabetical order by name
|
||||
assert indices[0].index_type == "Bitmap"
|
||||
assert indices[0].columns == ["id"]
|
||||
assert indices[0].columns == ["data"]
|
||||
assert indices[1].index_type == "Bitmap"
|
||||
assert indices[1].columns == ["is_active"]
|
||||
assert indices[1].columns == ["id"]
|
||||
assert indices[2].index_type == "Bitmap"
|
||||
assert indices[2].columns == ["data"]
|
||||
assert indices[2].columns == ["is_active"]
|
||||
|
||||
index_name = indices[0].name
|
||||
stats = await some_table.index_stats(index_name)
|
||||
|
||||
@@ -40,16 +40,6 @@ def _make_table(tmp_path):
|
||||
def test_set_lsm_write_spec_validates(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# No PK set yet.
|
||||
with pytest.raises(Exception, match="primary key"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# Column mismatch.
|
||||
with pytest.raises(Exception, match="match"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
|
||||
|
||||
# Out-of-range num_buckets.
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
|
||||
@@ -70,7 +60,6 @@ def test_unset_lsm_write_spec(tmp_path):
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
# Install a spec, then remove it; afterwards a fresh spec can be set.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
table.unset_lsm_write_spec()
|
||||
# A second unset errors — there is no spec left to remove.
|
||||
|
||||
196
python/python/tests/test_merge_insert_lsm.py
Normal file
196
python/python/tests/test_merge_insert_lsm.py
Normal file
@@ -0,0 +1,196 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
"""Tests for the MemWAL LSM ``merge_insert`` dispatch."""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
import pytest
|
||||
from lancedb._lancedb import LsmWriteSpec
|
||||
|
||||
SCHEMA = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64(), nullable=False),
|
||||
pa.field("value", pa.int64(), nullable=False),
|
||||
]
|
||||
)
|
||||
|
||||
REGION_SCHEMA = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int64(), nullable=False),
|
||||
pa.field("region", pa.utf8(), nullable=False),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _reader(ids):
|
||||
batch = pa.RecordBatch.from_arrays(
|
||||
[
|
||||
pa.array(ids, type=pa.int64()),
|
||||
pa.array(list(range(len(ids))), type=pa.int64()),
|
||||
],
|
||||
schema=SCHEMA,
|
||||
)
|
||||
return pa.RecordBatchReader.from_batches(SCHEMA, [batch])
|
||||
|
||||
|
||||
def _region_reader(rows):
|
||||
batch = pa.RecordBatch.from_arrays(
|
||||
[
|
||||
pa.array([row[0] for row in rows], type=pa.int64()),
|
||||
pa.array([row[1] for row in rows], type=pa.utf8()),
|
||||
],
|
||||
schema=REGION_SCHEMA,
|
||||
)
|
||||
return pa.RecordBatchReader.from_batches(REGION_SCHEMA, [batch])
|
||||
|
||||
|
||||
def _bucket_table(tmp_path):
|
||||
"""A table with ``id`` as the primary key and a single-bucket LSM spec."""
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _reader([1, 2, 3]))
|
||||
table.set_unenforced_primary_key("id")
|
||||
# num_buckets = 1: every row routes to the single bucket.
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
|
||||
return table
|
||||
|
||||
|
||||
def test_lsm_merge_insert_bucket(tmp_path):
|
||||
table = _bucket_table(tmp_path)
|
||||
# Empty `on` defaults to the primary key.
|
||||
result = (
|
||||
table.merge_insert([])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(_reader([3, 4, 5]))
|
||||
)
|
||||
# LSM path: rows go to the MemWAL, so only num_rows is populated.
|
||||
assert result.num_rows == 3
|
||||
assert result.version == 0
|
||||
assert result.num_inserted_rows == 0
|
||||
assert result.num_updated_rows == 0
|
||||
|
||||
|
||||
def test_lsm_merge_insert_unsharded(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _reader([1, 2, 3]))
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
|
||||
result = (
|
||||
table.merge_insert("id")
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(_reader([10, 11, 12, 13]))
|
||||
)
|
||||
assert result.num_rows == 4
|
||||
|
||||
|
||||
def test_lsm_merge_insert_identity(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _region_reader([(1, "us"), (2, "us")]))
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.identity("region"))
|
||||
# All rows share one identity value, so they route to one shard.
|
||||
result = (
|
||||
table.merge_insert([])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(_region_reader([(3, "us"), (4, "us")]))
|
||||
)
|
||||
assert result.num_rows == 2
|
||||
|
||||
|
||||
def test_lsm_merge_insert_use_lsm_write_false(tmp_path):
|
||||
table = _bucket_table(tmp_path) # rows id = 1, 2, 3
|
||||
# use_lsm_write(False) opts out: the standard path runs and commits.
|
||||
result = (
|
||||
table.merge_insert("id")
|
||||
.when_not_matched_insert_all()
|
||||
.use_lsm_write(False)
|
||||
.execute(_reader([3, 4, 5]))
|
||||
)
|
||||
assert result.num_inserted_rows == 2
|
||||
assert table.count_rows() == 5
|
||||
|
||||
|
||||
def test_lsm_merge_insert_validate_single_shard_off(tmp_path):
|
||||
table = _bucket_table(tmp_path)
|
||||
result = (
|
||||
table.merge_insert([])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.validate_single_shard(False)
|
||||
.execute(_reader([6, 7, 8]))
|
||||
)
|
||||
assert result.num_rows == 3
|
||||
|
||||
|
||||
def test_lsm_merge_insert_use_lsm_write_true_requires_spec(tmp_path):
|
||||
# A table with a primary key but no LSM write spec installed.
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
|
||||
table = db.create_table("t", _reader([1, 2, 3]))
|
||||
table.set_unenforced_primary_key("id")
|
||||
with pytest.raises(Exception, match="use_lsm_write"):
|
||||
(
|
||||
table.merge_insert("id")
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.use_lsm_write(True)
|
||||
.execute(_reader([4]))
|
||||
)
|
||||
|
||||
|
||||
def test_lsm_merge_insert_rejects_on_not_primary_key(tmp_path):
|
||||
table = _bucket_table(tmp_path)
|
||||
with pytest.raises(Exception, match="primary key"):
|
||||
(
|
||||
table.merge_insert("value")
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(_reader([1]))
|
||||
)
|
||||
|
||||
|
||||
def test_lsm_merge_insert_rejects_non_upsert(tmp_path):
|
||||
table = _bucket_table(tmp_path)
|
||||
# Insert-only (no when_matched_update_all) is not the upsert shape.
|
||||
with pytest.raises(Exception, match="upsert"):
|
||||
table.merge_insert([]).when_not_matched_insert_all().execute(_reader([4]))
|
||||
|
||||
|
||||
def test_lsm_close_writers(tmp_path):
|
||||
table = _bucket_table(tmp_path)
|
||||
(
|
||||
table.merge_insert([])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(_reader([7, 8]))
|
||||
)
|
||||
table.close_lsm_writers()
|
||||
# The writer reopens lazily on the next merge_insert.
|
||||
result = (
|
||||
table.merge_insert([])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(_reader([9]))
|
||||
)
|
||||
assert result.num_rows == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_lsm_merge_insert(tmp_path):
|
||||
db = await lancedb.connect_async(
|
||||
tmp_path, read_consistency_interval=timedelta(seconds=0)
|
||||
)
|
||||
table = await db.create_table("t", _reader([1, 2, 3]))
|
||||
await table.set_unenforced_primary_key("id")
|
||||
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
|
||||
|
||||
builder = (
|
||||
table.merge_insert([]).when_matched_update_all().when_not_matched_insert_all()
|
||||
)
|
||||
result = await builder.execute(_reader([3, 4, 5]))
|
||||
assert result.num_rows == 3
|
||||
await table.close_lsm_writers()
|
||||
@@ -1512,6 +1512,37 @@ def test_take_queries(tmp_path):
|
||||
]
|
||||
|
||||
|
||||
def test_take_queries_to_batches(tmp_path):
|
||||
# Regression test for the sync take-query path: `to_batches` previously
|
||||
# raised ``AttributeError: 'AsyncTakeQuery' object has no attribute
|
||||
# 'execute'`` because the inherited ``BaseQueryBuilder.to_batches`` called
|
||||
# ``execute`` on the async wrapper instead of the native query.
|
||||
db = lancedb.connect(tmp_path)
|
||||
data = pa.table({"idx": list(range(100)), "label": [str(i) for i in range(100)]})
|
||||
table = db.create_table("test", data)
|
||||
|
||||
# Take by offset → to_batches
|
||||
rs = list(table.take_offsets([5, 2, 17]).to_batches())
|
||||
assert all(isinstance(b, pa.RecordBatch) for b in rs)
|
||||
assert sum(b.num_rows for b in rs) == 3
|
||||
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
|
||||
|
||||
# Take by row id → to_batches
|
||||
rs = list(table.take_row_ids([5, 2, 17]).to_batches())
|
||||
assert all(isinstance(b, pa.RecordBatch) for b in rs)
|
||||
assert sum(b.num_rows for b in rs) == 3
|
||||
assert sorted(v for b in rs for v in b.column("idx").to_pylist()) == [2, 5, 17]
|
||||
|
||||
# Take with select projection → to_batches preserves the projection
|
||||
rs = list(table.take_row_ids([5, 2, 17]).select(["label"]).to_batches())
|
||||
assert all(b.schema.names == ["label"] for b in rs)
|
||||
assert sorted(v for b in rs for v in b.column("label").to_pylist()) == [
|
||||
"17",
|
||||
"2",
|
||||
"5",
|
||||
]
|
||||
|
||||
|
||||
def test_getitems(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
data = pa.table(
|
||||
|
||||
@@ -362,6 +362,22 @@ def test_table_create_indices():
|
||||
schema=dict(
|
||||
fields=[
|
||||
dict(name="id", type={"type": "int64"}, nullable=False),
|
||||
dict(name="text", type={"type": "string"}, nullable=False),
|
||||
dict(
|
||||
name="vector",
|
||||
type={
|
||||
"type": "fixed_size_list",
|
||||
"fields": [
|
||||
dict(
|
||||
name="item",
|
||||
type={"type": "float"},
|
||||
nullable=True,
|
||||
)
|
||||
],
|
||||
"length": 2,
|
||||
},
|
||||
nullable=False,
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
@@ -420,22 +436,25 @@ def test_table_create_indices():
|
||||
# This is a smoke-test.
|
||||
table = db.create_table("test", [{"id": 1}])
|
||||
|
||||
# Test create_scalar_index with custom name
|
||||
table.create_scalar_index(
|
||||
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
|
||||
)
|
||||
# Test create_scalar_index with custom name (legacy method)
|
||||
with pytest.warns(DeprecationWarning, match="create_scalar_index"):
|
||||
table.create_scalar_index(
|
||||
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
|
||||
)
|
||||
|
||||
# Test create_fts_index with custom name
|
||||
table.create_fts_index(
|
||||
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
|
||||
)
|
||||
# Test create_fts_index with custom name (legacy method)
|
||||
with pytest.warns(DeprecationWarning, match="create_fts_index"):
|
||||
table.create_fts_index(
|
||||
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
|
||||
)
|
||||
|
||||
# Test create_index with custom name
|
||||
table.create_index(
|
||||
vector_column_name="vector",
|
||||
wait_timeout=timedelta(seconds=10),
|
||||
name="custom_vector_idx",
|
||||
)
|
||||
# Test create_index with custom name (legacy form: vector_column_name kwarg)
|
||||
with pytest.warns(DeprecationWarning, match="create_index"):
|
||||
table.create_index(
|
||||
vector_column_name="vector",
|
||||
wait_timeout=timedelta(seconds=10),
|
||||
name="custom_vector_idx",
|
||||
)
|
||||
|
||||
# Validate that the name parameter was passed correctly in requests
|
||||
assert len(received_requests) == 3
|
||||
@@ -464,6 +483,98 @@ def test_table_create_indices():
|
||||
table.drop_index("custom_fts_idx")
|
||||
|
||||
|
||||
def test_remote_create_index_new_api():
|
||||
received_requests = []
|
||||
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create_index/":
|
||||
content_len = int(request.headers.get("Content-Length", 0))
|
||||
body = request.rfile.read(content_len) if content_len > 0 else b""
|
||||
received_requests.append(json.loads(body) if body else {})
|
||||
request.send_response(200)
|
||||
request.end_headers()
|
||||
elif request.path == "/v1/table/test/create/?mode=create":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"{}")
|
||||
elif request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(
|
||||
json.dumps(
|
||||
dict(
|
||||
version=1,
|
||||
schema=dict(
|
||||
fields=[
|
||||
dict(name="id", type={"type": "int64"}, nullable=False),
|
||||
dict(
|
||||
name="category",
|
||||
type={"type": "string"},
|
||||
nullable=False,
|
||||
),
|
||||
dict(
|
||||
name="text", type={"type": "string"}, nullable=False
|
||||
),
|
||||
dict(
|
||||
name="vector",
|
||||
type={
|
||||
"type": "fixed_size_list",
|
||||
"fields": [
|
||||
dict(
|
||||
name="item",
|
||||
type={"type": "float"},
|
||||
nullable=True,
|
||||
)
|
||||
],
|
||||
"length": 2,
|
||||
},
|
||||
nullable=False,
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
).encode()
|
||||
)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
from lancedb.index import BTree, FTS, IvfPq, IvfRq
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.create_table("test", [{"id": 1}])
|
||||
|
||||
# New API: column-first, config= kwarg. Should NOT emit DeprecationWarning.
|
||||
import warnings as _warnings
|
||||
|
||||
with _warnings.catch_warnings():
|
||||
_warnings.simplefilter("error", DeprecationWarning)
|
||||
table.create_index("vector", config=IvfPq(distance_type="l2"))
|
||||
table.create_index("category", config=BTree())
|
||||
table.create_index("text", config=FTS())
|
||||
# IvfRq via new API
|
||||
table.create_index("vector", config=IvfRq(distance_type="l2"))
|
||||
|
||||
# Legacy index_type="IVF_RQ" routes to IvfRq config under the hood.
|
||||
with pytest.warns(DeprecationWarning, match="create_index"):
|
||||
table.create_index(
|
||||
vector_column_name="vector",
|
||||
index_type="IVF_RQ",
|
||||
num_partitions=8,
|
||||
)
|
||||
|
||||
assert len(received_requests) == 5
|
||||
assert [req["column"] for req in received_requests] == [
|
||||
"vector",
|
||||
"category",
|
||||
"text",
|
||||
"vector",
|
||||
"vector",
|
||||
]
|
||||
|
||||
|
||||
def test_table_wait_for_index_timeout():
|
||||
def handler(request):
|
||||
index_stats = dict(
|
||||
|
||||
@@ -603,3 +603,89 @@ def test_cross_encoder_reranker_return_all(tmp_path):
|
||||
assert "_relevance_score" in result.column_names
|
||||
assert "_score" in result.column_names
|
||||
assert "_distance" in result.column_names
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression tests for LinearCombinationReranker scoring bugs (issue #3154)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_linear_combination_best_match_ranks_first():
|
||||
"""
|
||||
The document that is BOTH the closest vector match AND the only FTS match
|
||||
must rank first. Previously _combine_score subtracted from 1, inverting
|
||||
the ranking so the worst document ranked highest.
|
||||
"""
|
||||
reranker = LinearCombinationReranker(weight=0.7, return_score="all")
|
||||
|
||||
# rowid 0: perfect vector match, sole FTS match → should rank 1st
|
||||
# rowid 1: mediocre vector, no FTS match
|
||||
# rowid 2: bad vector, no FTS match
|
||||
vector_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0, 1, 2],
|
||||
"_distance": [0.0, 0.5, 0.9],
|
||||
}
|
||||
)
|
||||
fts_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0],
|
||||
"_score": [1.0],
|
||||
}
|
||||
)
|
||||
|
||||
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
|
||||
scores = dict(
|
||||
zip(
|
||||
combined["_rowid"].to_pylist(),
|
||||
combined["_relevance_score"].to_pylist(),
|
||||
)
|
||||
)
|
||||
|
||||
# rowid 0 must have the highest relevance score
|
||||
assert scores[0] > scores[1], (
|
||||
f"Best match (rowid 0, score={scores[0]:.4f}) should beat "
|
||||
f"mid match (rowid 1, score={scores[1]:.4f})"
|
||||
)
|
||||
assert scores[1] > scores[2], (
|
||||
f"Mid match (rowid 1, score={scores[1]:.4f}) should beat "
|
||||
f"bad match (rowid 2, score={scores[2]:.4f})"
|
||||
)
|
||||
|
||||
|
||||
def test_linear_combination_missing_fts_is_penalised():
|
||||
"""
|
||||
A document with no FTS match must score *lower* than a document that
|
||||
has a mediocre FTS match, everything else being equal. Previously
|
||||
missing-FTS entries used fill=1.0 directly, which gave them a reward
|
||||
(via the 1-(...) inversion) instead of a penalty.
|
||||
"""
|
||||
reranker = LinearCombinationReranker(weight=0.5, return_score="all")
|
||||
|
||||
vector_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0, 1],
|
||||
"_distance": [0.2, 0.2], # identical vector scores
|
||||
}
|
||||
)
|
||||
fts_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0], # rowid 1 has no FTS match
|
||||
"_score": [0.3], # small FTS score
|
||||
}
|
||||
)
|
||||
|
||||
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
|
||||
scores = dict(
|
||||
zip(
|
||||
combined["_rowid"].to_pylist(),
|
||||
combined["_relevance_score"].to_pylist(),
|
||||
)
|
||||
)
|
||||
|
||||
# rowid 0 has a small FTS score; rowid 1 has none.
|
||||
# Even a small FTS contribution should beat having none at all.
|
||||
assert scores[0] > scores[1], (
|
||||
f"Document with FTS score (rowid 0, {scores[0]:.4f}) should beat "
|
||||
f"document with no FTS match (rowid 1, {scores[1]:.4f})"
|
||||
)
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import warnings
|
||||
from datetime import date, datetime, timedelta
|
||||
from time import sleep
|
||||
from typing import List
|
||||
@@ -11,7 +12,7 @@ from unittest.mock import patch
|
||||
|
||||
import lancedb
|
||||
from lancedb.dependencies import _PANDAS_AVAILABLE
|
||||
from lancedb.index import HnswFlat, HnswPq, HnswSq, IvfPq
|
||||
from lancedb.index import BTree, FTS, HnswFlat, HnswPq, HnswSq, IvfPq
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
import pyarrow as pa
|
||||
@@ -33,7 +34,7 @@ def test_basic(mem_db: DBConnection):
|
||||
table = mem_db.create_table("test", data=data)
|
||||
|
||||
assert table.name == "test"
|
||||
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
|
||||
assert "LanceTable(name='test', _conn=LanceDBConnection(" in repr(table)
|
||||
expected_schema = pa.schema(
|
||||
{
|
||||
"vector": pa.list_(pa.float32(), 2),
|
||||
@@ -928,7 +929,12 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
num_bits=4,
|
||||
)
|
||||
mock_create_index.assert_called_with(
|
||||
"vector", replace=True, config=expected_config, name=None, train=True
|
||||
"vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
# Test with target_partition_size
|
||||
@@ -948,7 +954,12 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
target_partition_size=8192,
|
||||
)
|
||||
mock_create_index.assert_called_with(
|
||||
"vector", replace=True, config=expected_config, name=None, train=True
|
||||
"vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
# target_partition_size has a default value,
|
||||
@@ -967,7 +978,12 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
num_bits=4,
|
||||
)
|
||||
mock_create_index.assert_called_with(
|
||||
"vector", replace=True, config=expected_config, name=None, train=True
|
||||
"vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
table.create_index(
|
||||
@@ -978,7 +994,12 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
)
|
||||
expected_config = HnswPq(distance_type="dot")
|
||||
mock_create_index.assert_called_with(
|
||||
"my_vector", replace=False, config=expected_config, name=None, train=True
|
||||
"my_vector",
|
||||
replace=False,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
table.create_index(
|
||||
@@ -993,7 +1014,12 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
|
||||
)
|
||||
mock_create_index.assert_called_with(
|
||||
"my_vector", replace=True, config=expected_config, name=None, train=True
|
||||
"my_vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
table.create_index(
|
||||
@@ -1008,7 +1034,12 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
|
||||
)
|
||||
mock_create_index.assert_called_with(
|
||||
"my_vector", replace=True, config=expected_config, name=None, train=True
|
||||
"my_vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -1032,6 +1063,7 @@ def test_create_index_name_and_train_parameters(
|
||||
"vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name="my_custom_index",
|
||||
train=True,
|
||||
)
|
||||
@@ -1039,13 +1071,82 @@ def test_create_index_name_and_train_parameters(
|
||||
# Test with train=False
|
||||
table.create_index(vector_column_name="vector", train=False)
|
||||
mock_create_index.assert_called_with(
|
||||
"vector", replace=True, config=expected_config, name=None, train=False
|
||||
"vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=False,
|
||||
)
|
||||
|
||||
# Test with both name and train
|
||||
table.create_index(vector_column_name="vector", name="my_index_name", train=True)
|
||||
mock_create_index.assert_called_with(
|
||||
"vector", replace=True, config=expected_config, name="my_index_name", train=True
|
||||
"vector",
|
||||
replace=True,
|
||||
config=expected_config,
|
||||
wait_timeout=None,
|
||||
name="my_index_name",
|
||||
train=True,
|
||||
)
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_legacy_emits_deprecation_warning(
|
||||
mock_create_index, mem_db: DBConnection
|
||||
):
|
||||
table = mem_db.create_table(
|
||||
"test",
|
||||
data=[{"vector": [3.1, 4.1]}, {"vector": [5.9, 26.5]}],
|
||||
)
|
||||
|
||||
with pytest.warns(DeprecationWarning, match="create_index"):
|
||||
table.create_index(metric="l2", num_partitions=8, vector_column_name="vector")
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_new_api(mock_create_index, mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "category": "a", "text": "hello world"},
|
||||
{"vector": [5.9, 26.5], "category": "b", "text": "goodbye"},
|
||||
],
|
||||
)
|
||||
|
||||
# Vector index via new API should not warn
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("error", DeprecationWarning)
|
||||
table.create_index("vector", config=IvfPq(distance_type="l2"))
|
||||
mock_create_index.assert_called_with(
|
||||
"vector",
|
||||
replace=True,
|
||||
config=IvfPq(distance_type="l2"),
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
# Scalar index via new API
|
||||
table.create_index("category", config=BTree())
|
||||
mock_create_index.assert_called_with(
|
||||
"category",
|
||||
replace=True,
|
||||
config=BTree(),
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
# FTS index via new API
|
||||
table.create_index("text", config=FTS(with_position=True))
|
||||
mock_create_index.assert_called_with(
|
||||
"text",
|
||||
replace=True,
|
||||
config=FTS(with_position=True),
|
||||
wait_timeout=None,
|
||||
name=None,
|
||||
train=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -1861,8 +1962,9 @@ def test_create_scalar_index(mem_db: DBConnection):
|
||||
"my_table",
|
||||
data=test_data,
|
||||
)
|
||||
# Test with default name
|
||||
table.create_scalar_index("x")
|
||||
# Test with default name; confirm DeprecationWarning fires
|
||||
with pytest.warns(DeprecationWarning, match="create_scalar_index"):
|
||||
table.create_scalar_index("x")
|
||||
indices = table.list_indices()
|
||||
assert len(indices) == 1
|
||||
scalar_index = indices[0]
|
||||
@@ -1934,6 +2036,10 @@ def test_create_index_nested_field_paths(mem_db: DBConnection):
|
||||
assert len(vector_results) == 1
|
||||
assert vector_results[0]["metadata"]["user_id"] == 0
|
||||
|
||||
default_vector_results = table.search([0.0, 1.0]).limit(1).to_list()
|
||||
assert len(default_vector_results) == 1
|
||||
assert default_vector_results[0]["metadata"]["user_id"] == 0
|
||||
|
||||
filtered_results = table.search().where("metadata.user_id = 42").limit(1).to_list()
|
||||
assert len(filtered_results) == 1
|
||||
assert filtered_results[0]["metadata"]["user_id"] == 42
|
||||
@@ -2013,6 +2119,74 @@ def test_search_with_schema_inf_multiple_vector(mem_db: DBConnection):
|
||||
table.search(q).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_search_infers_single_nested_vector(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int32()),
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{"id": 0, "image": {"embedding": [0.0, 1.0]}},
|
||||
{"id": 1, "image": {"embedding": [10.0, 11.0]}},
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_default_search", data=data)
|
||||
|
||||
result = table.search([0.0, 1.0]).limit(1).to_list()
|
||||
assert result[0]["id"] == 0
|
||||
|
||||
|
||||
def test_search_nested_vector_multiple_candidates(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field(
|
||||
"image",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
pa.field(
|
||||
"text",
|
||||
pa.struct([pa.field("embedding", pa.list_(pa.float32(), 2))]),
|
||||
),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[
|
||||
{
|
||||
"image": {"embedding": [0.0, 1.0]},
|
||||
"text": {"embedding": [2.0, 3.0]},
|
||||
}
|
||||
],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_multiple_candidates", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="image.embedding.*text.embedding"):
|
||||
table.search([0.0, 1.0]).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_search_nested_vector_no_candidates(mem_db: DBConnection):
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("id", pa.int32()),
|
||||
pa.field("metadata", pa.struct([pa.field("label", pa.string())])),
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_pylist(
|
||||
[{"id": 0, "metadata": {"label": "cat"}}],
|
||||
schema=schema,
|
||||
)
|
||||
table = mem_db.create_table("nested_vector_no_candidates", data=data)
|
||||
|
||||
with pytest.raises(ValueError, match="no vector column"):
|
||||
table.search([0.0, 1.0]).limit(1).to_arrow()
|
||||
|
||||
|
||||
def test_compact_cleanup(tmp_db: DBConnection):
|
||||
pytest.importorskip("lance")
|
||||
table = tmp_db.create_table(
|
||||
|
||||
@@ -143,18 +143,20 @@ pub struct MergeResult {
|
||||
pub num_inserted_rows: u64,
|
||||
pub num_deleted_rows: u64,
|
||||
pub num_attempts: u32,
|
||||
pub num_rows: u64,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl MergeResult {
|
||||
pub fn __repr__(&self) -> String {
|
||||
format!(
|
||||
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={})",
|
||||
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={}, num_rows={})",
|
||||
self.version,
|
||||
self.num_updated_rows,
|
||||
self.num_inserted_rows,
|
||||
self.num_deleted_rows,
|
||||
self.num_attempts
|
||||
self.num_attempts,
|
||||
self.num_rows
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -167,6 +169,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
|
||||
num_inserted_rows: result.num_inserted_rows,
|
||||
num_deleted_rows: result.num_deleted_rows,
|
||||
num_attempts: result.num_attempts,
|
||||
num_rows: result.num_rows,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,6 +197,12 @@ impl LsmWriteSpec {
|
||||
}
|
||||
|
||||
/// Identity sharding — shard by the raw value of `column`.
|
||||
///
|
||||
/// `column` must be a deterministic function of the unenforced primary
|
||||
/// key: every row with a given primary key must always produce the same
|
||||
/// `column` value, or upserts of that key can land in different shards
|
||||
/// and a stale version can win. Typically `column` is the primary key
|
||||
/// itself or a stable attribute of it.
|
||||
#[staticmethod]
|
||||
pub fn identity(column: String) -> Self {
|
||||
Self {
|
||||
@@ -933,6 +942,12 @@ impl Table {
|
||||
if let Some(use_index) = parameters.use_index {
|
||||
builder.use_index(use_index);
|
||||
}
|
||||
if let Some(use_lsm_write) = parameters.use_lsm_write {
|
||||
builder.use_lsm_write(use_lsm_write);
|
||||
}
|
||||
if let Some(validate_single_shard) = parameters.validate_single_shard {
|
||||
builder.validate_single_shard(validate_single_shard);
|
||||
}
|
||||
|
||||
future_into_py(self_.py(), async move {
|
||||
let res = builder.execute(Box::new(batches)).await.infer_error()?;
|
||||
@@ -971,6 +986,13 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn close_lsm_writers(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.close_lsm_writers().await.infer_error()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
@@ -1124,6 +1146,8 @@ pub struct MergeInsertParams {
|
||||
when_not_matched_by_source_condition: Option<String>,
|
||||
timeout: Option<std::time::Duration>,
|
||||
use_index: Option<bool>,
|
||||
use_lsm_write: Option<bool>,
|
||||
validate_single_shard: Option<bool>,
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
[toolchain]
|
||||
channel = "1.94.0"
|
||||
channel = "1.95.0"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.29.1-beta.0"
|
||||
version = "0.30.0-beta.1"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
@@ -75,7 +75,7 @@ reqwest = { version = "0.12.0", default-features = false, features = [
|
||||
"stream",
|
||||
], optional = true }
|
||||
http = { version = "1", optional = true } # Matching what is in reqwest
|
||||
uuid = { version = "1.7.0", features = ["v4"] }
|
||||
uuid = { version = "1.7.0", features = ["v4", "v5"] }
|
||||
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
|
||||
polars = { version = ">=0.37,<0.40.0", optional = true }
|
||||
hf-hub = { version = "0.4.1", optional = true, default-features = false, features = [
|
||||
@@ -104,6 +104,7 @@ datafusion.workspace = true
|
||||
http-body = "1" # Matching reqwest
|
||||
rstest = "0.23.0"
|
||||
test-log = "0.2"
|
||||
serial_test = "3"
|
||||
|
||||
|
||||
[features]
|
||||
|
||||
@@ -812,8 +812,7 @@ impl ConnectBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// The interval at which to check for updates from other processes. This
|
||||
/// only affects LanceDB OSS.
|
||||
/// The interval at which to check for updates from other processes.
|
||||
///
|
||||
/// If left unset, consistency is not checked. For maximum read
|
||||
/// performance, this is the default. For strong consistency, set this to
|
||||
@@ -825,8 +824,11 @@ impl ConnectBuilder {
|
||||
/// This only affects read operations. Write operations are always
|
||||
/// consistent.
|
||||
///
|
||||
/// LanceDB Cloud uses eventual consistency under the hood, and is not
|
||||
/// currently configurable.
|
||||
/// # Cost
|
||||
///
|
||||
/// Stronger consistency is not free. The smaller the interval, the more
|
||||
/// often each read pays the cost of checking for updates against object
|
||||
/// storage, raising per-read latency and cost.
|
||||
pub fn read_consistency_interval(
|
||||
mut self,
|
||||
read_consistency_interval: std::time::Duration,
|
||||
@@ -886,6 +888,7 @@ impl ConnectBuilder {
|
||||
options.host_override,
|
||||
self.request.client_config,
|
||||
storage_options.into(),
|
||||
self.request.read_consistency_interval,
|
||||
)?);
|
||||
Ok(Connection {
|
||||
internal,
|
||||
|
||||
@@ -464,11 +464,9 @@ mod tests {
|
||||
let mut iter = ids.into_iter().map(|o| o.unwrap());
|
||||
while let Some(first) = iter.next() {
|
||||
let rows_left_in_clump = if first == 4470 { 19 } else { 29 };
|
||||
let mut expected_next = first + 1;
|
||||
for _ in 0..rows_left_in_clump {
|
||||
for expected_next in (first + 1)..=(first + rows_left_in_clump) {
|
||||
let next = iter.next().unwrap();
|
||||
assert_eq!(next, expected_next);
|
||||
expected_next += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,17 +23,12 @@ impl VectorIndex {
|
||||
.fields
|
||||
.iter()
|
||||
.map(|field_id| {
|
||||
manifest
|
||||
.schema
|
||||
.field_by_id(*field_id)
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
"field {field_id} of index {} must exist in schema",
|
||||
index.name
|
||||
)
|
||||
})
|
||||
.name
|
||||
.clone()
|
||||
manifest.schema.field_path(*field_id).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"field {field_id} of index {} must exist in schema",
|
||||
index.name
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
|
||||
@@ -245,6 +245,9 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
|
||||
pub(crate) sender: S,
|
||||
pub(crate) id_delimiter: String,
|
||||
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
|
||||
/// Connection-level read consistency interval. Drives the
|
||||
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
|
||||
pub(crate) read_consistency_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
|
||||
@@ -338,6 +341,7 @@ impl RestfulLanceDbClient<Sender> {
|
||||
host_override: Option<String>,
|
||||
default_headers: HeaderMap,
|
||||
client_config: ClientConfig,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> Result<Self> {
|
||||
// Get the timeouts
|
||||
let timeout =
|
||||
@@ -435,6 +439,7 @@ impl RestfulLanceDbClient<Sender> {
|
||||
.clone()
|
||||
.unwrap_or("$".to_string()),
|
||||
header_provider: client_config.header_provider,
|
||||
read_consistency_interval,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -840,6 +845,16 @@ pub mod test_utils {
|
||||
pub fn client_with_handler<T>(
|
||||
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
|
||||
) -> RestfulLanceDbClient<MockSender>
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
client_with_handler_and_interval(handler, None)
|
||||
}
|
||||
|
||||
pub fn client_with_handler_and_interval<T>(
|
||||
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> RestfulLanceDbClient<MockSender>
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
@@ -857,6 +872,7 @@ pub mod test_utils {
|
||||
},
|
||||
id_delimiter: "$".to_string(),
|
||||
header_provider: None,
|
||||
read_consistency_interval,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -881,6 +897,7 @@ pub mod test_utils {
|
||||
},
|
||||
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
|
||||
header_provider: config.header_provider,
|
||||
read_consistency_interval: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -888,8 +905,18 @@ pub mod test_utils {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serial_test::serial;
|
||||
use std::time::Duration;
|
||||
|
||||
// Serializes the env-var-mutating tests below: cargo test runs tests in
|
||||
// parallel, but several of these tests read and write the same process-
|
||||
// global env vars (`LANCEDB_USER_ID*`), so they would race without this.
|
||||
static ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
||||
|
||||
fn lock_env() -> std::sync::MutexGuard<'static, ()> {
|
||||
ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timeout_config_default() {
|
||||
let config = TimeoutConfig::default();
|
||||
@@ -1046,6 +1073,7 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Apply dynamic headers
|
||||
@@ -1081,6 +1109,7 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Apply dynamic headers
|
||||
@@ -1118,6 +1147,7 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Header provider errors should fail the request
|
||||
@@ -1143,7 +1173,9 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_none() {
|
||||
let _guard = lock_env();
|
||||
let config = ClientConfig::default();
|
||||
// Clear env vars that might be set from other tests
|
||||
// SAFETY: This is only called in tests
|
||||
@@ -1155,7 +1187,9 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_from_env() {
|
||||
let _guard = lock_env();
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
|
||||
@@ -1169,7 +1203,9 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_from_env_key() {
|
||||
let _guard = lock_env();
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
std::env::remove_var("LANCEDB_USER_ID");
|
||||
@@ -1189,7 +1225,9 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_direct_takes_precedence() {
|
||||
let _guard = lock_env();
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
|
||||
@@ -1206,7 +1244,9 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_empty_env_ignored() {
|
||||
let _guard = lock_env();
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
std::env::set_var("LANCEDB_USER_ID", "");
|
||||
|
||||
@@ -206,6 +206,7 @@ impl RemoteDatabase {
|
||||
host_override: Option<String>,
|
||||
client_config: ClientConfig,
|
||||
options: RemoteOptions,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Result<Self> {
|
||||
let parsed = super::client::parse_db_url(uri)?;
|
||||
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
|
||||
@@ -233,6 +234,7 @@ impl RemoteDatabase {
|
||||
host_override,
|
||||
header_map,
|
||||
client_config.clone(),
|
||||
read_consistency_interval,
|
||||
)?;
|
||||
|
||||
let table_cache = Cache::builder()
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -89,7 +89,6 @@ use futures::future::join_all;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
|
||||
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
|
||||
@@ -253,6 +252,36 @@ pub enum Filter {
|
||||
Datafusion(Expr),
|
||||
}
|
||||
|
||||
/// A predicate for filtering rows in delete operations.
|
||||
///
|
||||
/// Accepts either a SQL string or a DataFusion [`Expr`]. Use the [`From`]
|
||||
/// implementations to convert from `&str` or `&Expr` automatically.
|
||||
/// See [`Table::delete`] for usage examples.
|
||||
pub enum Predicate<'a> {
|
||||
/// A SQL predicate string
|
||||
String(&'a str),
|
||||
/// A DataFusion logical expression
|
||||
Expr(&'a Expr),
|
||||
}
|
||||
|
||||
impl<'a> From<&'a str> for Predicate<'a> {
|
||||
fn from(s: &'a str) -> Self {
|
||||
Predicate::String(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a String> for Predicate<'a> {
|
||||
fn from(s: &'a String) -> Self {
|
||||
Predicate::String(s.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Expr> for Predicate<'a> {
|
||||
fn from(e: &'a Expr) -> Self {
|
||||
Predicate::Expr(e)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tags: Send + Sync {
|
||||
/// List the tags of the table.
|
||||
@@ -282,17 +311,15 @@ pub use self::merge::MergeResult;
|
||||
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
|
||||
/// `ShardWriter` configuration recorded in the MemWAL index).
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key.
|
||||
///
|
||||
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
|
||||
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
|
||||
/// onto the MemWAL writer is a follow-up.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum LsmWriteSpec {
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
/// Hash-bucket sharding by a scalar column.
|
||||
///
|
||||
/// `column` must equal the table's currently-set single-column
|
||||
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
|
||||
/// `column` must be a non-nested column with a supported scalar type.
|
||||
/// `num_buckets` must be in `[1, 1024]`.
|
||||
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
|
||||
/// `bucket(column, num_buckets)` value is stable across processes.
|
||||
Bucket {
|
||||
@@ -339,6 +366,14 @@ impl LsmWriteSpec {
|
||||
|
||||
/// Construct an identity-sharding spec (shard by the raw value of
|
||||
/// `column`) with no maintained indexes.
|
||||
///
|
||||
/// `column` must be a deterministic function of the unenforced primary
|
||||
/// key: every row with a given primary key must always produce the same
|
||||
/// `column` value. MemWAL dedups upserts by primary key but tracks
|
||||
/// generations per shard, so if the same key is written with two
|
||||
/// different `column` values its versions land in different shards and a
|
||||
/// stale value can win. Typically `column` is the primary key itself, or
|
||||
/// a stable attribute of it (e.g. a tenant id).
|
||||
pub fn identity(column: impl Into<String>) -> Self {
|
||||
Self::Identity {
|
||||
column: column.into(),
|
||||
@@ -491,8 +526,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
|
||||
/// Add new records to the table.
|
||||
async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
|
||||
/// Delete rows from the table.
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
|
||||
/// Delete rows from the table matching the given [`Predicate`].
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult>;
|
||||
/// Update rows in the table.
|
||||
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
|
||||
/// Create an index on the provided column(s).
|
||||
@@ -553,6 +588,13 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
message: "unset_lsm_write_spec is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
/// Drain and close any cached MemWAL shard writers for this table.
|
||||
///
|
||||
/// The default implementation is a no-op; table types that maintain
|
||||
/// MemWAL shard writers override it.
|
||||
async fn close_lsm_writers(&self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
/// Gets the table tag manager.
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||
/// Optimize the dataset.
|
||||
@@ -656,6 +698,30 @@ mod test_utils {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_and_interval<T>(
|
||||
name: impl Into<String>,
|
||||
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Self
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
let inner = Arc::new(
|
||||
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
|
||||
name.into(),
|
||||
handler.clone(),
|
||||
read_consistency_interval,
|
||||
),
|
||||
);
|
||||
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
|
||||
Self {
|
||||
inner,
|
||||
database: Some(database),
|
||||
// Registry is unused.
|
||||
embedding_registry: Arc::new(MemoryRegistry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_version<T>(
|
||||
name: impl Into<String>,
|
||||
version: semver::Version,
|
||||
@@ -860,7 +926,8 @@ impl Table {
|
||||
/// Delete the rows from table that match the predicate.
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `predicate` - The SQL predicate string to filter the rows to be deleted.
|
||||
/// - `predicate` - A SQL string (`&str`) or DataFusion expression (`&Expr`)
|
||||
/// that selects the rows to delete.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -869,6 +936,7 @@ impl Table {
|
||||
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
|
||||
/// # RecordBatchIterator, Int32Array};
|
||||
/// # use arrow_schema::{Schema, Field, DataType};
|
||||
/// use datafusion_expr::{col, lit};
|
||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
/// let tmpdir = tempfile::tempdir().unwrap();
|
||||
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
|
||||
@@ -898,11 +966,17 @@ impl Table {
|
||||
/// .execute()
|
||||
/// .await
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // Using a SQL string:
|
||||
/// tbl.delete("id > 5").await.unwrap();
|
||||
///
|
||||
/// // Using a DataFusion expression:
|
||||
/// let expr = col("id").lt(lit(4));
|
||||
/// tbl.delete(&expr).await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
self.inner.delete(predicate).await
|
||||
pub async fn delete(&self, predicate: impl Into<Predicate<'_>>) -> Result<DeleteResult> {
|
||||
self.inner.delete(predicate.into()).await
|
||||
}
|
||||
|
||||
/// Create an index on the provided column(s).
|
||||
@@ -1298,21 +1372,15 @@ impl Table {
|
||||
///
|
||||
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
|
||||
///
|
||||
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
|
||||
/// unenforced primary key.
|
||||
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by a scalar column.
|
||||
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
|
||||
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key
|
||||
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
|
||||
/// requires it to be the single column being bucketed.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::table::{LsmWriteSpec, Table};
|
||||
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// table.set_unenforced_primary_key(["id"]).await?;
|
||||
/// table
|
||||
/// .set_lsm_write_spec(
|
||||
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
|
||||
@@ -1333,6 +1401,16 @@ impl Table {
|
||||
self.inner.unset_lsm_write_spec().await
|
||||
}
|
||||
|
||||
/// Drain and close any cached MemWAL shard writers held for this table.
|
||||
///
|
||||
/// When an [`LsmWriteSpec`] is installed, `merge_insert` opens MemWAL shard
|
||||
/// writers and caches them for reuse across calls. This closes them,
|
||||
/// flushing pending data; writers reopen lazily on the next `merge_insert`.
|
||||
/// It is a no-op when no writers are cached.
|
||||
pub async fn close_lsm_writers(&self) -> Result<()> {
|
||||
self.inner.close_lsm_writers().await
|
||||
}
|
||||
|
||||
/// Retrieve the version of the table
|
||||
///
|
||||
/// LanceDb supports versioning. Every operation that modifies the table increases
|
||||
@@ -2688,16 +2766,13 @@ impl BaseTable for NativeTable {
|
||||
message: "Multi-column (composite) indices are not yet supported".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let dataset = self.dataset.get().await?;
|
||||
self.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*self.dataset.get().await?).clone();
|
||||
let (column, field) = Self::resolve_index_field(dataset.schema(), &opts.columns[0])?;
|
||||
drop(dataset);
|
||||
|
||||
let lance_idx_params = self.make_index_params(&field, opts.index.clone()).await?;
|
||||
let index_type = self.get_index_type_for_field(&field, &opts.index);
|
||||
let columns = [column.as_str()];
|
||||
self.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*self.dataset.get().await?).clone();
|
||||
let mut builder = dataset
|
||||
.create_index_builder(&columns, index_type, lance_idx_params.as_ref())
|
||||
.train(opts.train)
|
||||
@@ -2779,9 +2854,12 @@ impl BaseTable for NativeTable {
|
||||
merge::lsm::unset_lsm_write_spec(self).await
|
||||
}
|
||||
|
||||
async fn close_lsm_writers(&self) -> Result<()> {
|
||||
merge::lsm::close_lsm_writers(self).await
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
// Delegate to the submodule implementation
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
||||
delete::execute_delete(self, predicate).await
|
||||
}
|
||||
|
||||
@@ -2814,66 +2892,49 @@ impl BaseTable for NativeTable {
|
||||
|
||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||
let dataset = self.dataset.get().await?;
|
||||
let indices = dataset.load_indices().await?;
|
||||
let results = futures::stream::iter(indices.as_slice()).then(|idx| async {
|
||||
|
||||
// skip Lance internal indexes
|
||||
if idx.name == FRAG_REUSE_INDEX_NAME {
|
||||
return None;
|
||||
}
|
||||
|
||||
let stats = match dataset.index_statistics(idx.name.as_str()).await {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to get statistics for index {} ({}): {}", idx.name, idx.uuid, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let stats: serde_json::Value = match serde_json::from_str(&stats) {
|
||||
Ok(stats) => stats,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to deserialize index statistics for index {} ({}): {}", idx.name, idx.uuid, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
|
||||
log::warn!("Index statistics was missing 'index_type' field for index {} ({})", idx.name, idx.uuid);
|
||||
return None;
|
||||
};
|
||||
|
||||
let index_type: crate::index::IndexType = match index_type.parse() {
|
||||
Ok(index_type) => index_type,
|
||||
Err(e) => {
|
||||
log::warn!("Failed to parse index type for index {} ({}): {}", idx.name, idx.uuid, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let mut columns = Vec::with_capacity(idx.fields.len());
|
||||
for field_id in &idx.fields {
|
||||
let column = match dataset.schema().field_path(*field_id) {
|
||||
Ok(column) => column,
|
||||
let indices = dataset
|
||||
.describe_indices(None)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter_map(|idx_desc| {
|
||||
let index_type: crate::index::IndexType = match idx_desc.index_type().parse() {
|
||||
Ok(index_type) => index_type,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"The index {} ({}) referenced a field with id {} which does not exist in the schema: {}",
|
||||
idx.name,
|
||||
idx.uuid,
|
||||
field_id,
|
||||
"Failed to parse index type for index {}: {}",
|
||||
idx_desc.name(),
|
||||
e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
columns.push(column);
|
||||
}
|
||||
|
||||
let name = idx.name.clone();
|
||||
Some(IndexConfig { index_type, columns, name })
|
||||
}).collect::<Vec<_>>().await;
|
||||
let field_ids = idx_desc.field_ids();
|
||||
let mut columns = Vec::with_capacity(field_ids.len());
|
||||
for field_id in field_ids {
|
||||
let field_path = match dataset.schema().field_path(*field_id as i32) {
|
||||
Ok(field_path) => field_path,
|
||||
Err(e) => {
|
||||
log::warn!(
|
||||
"Failed to resolve field path for index {} field id {}: {}",
|
||||
idx_desc.name(),
|
||||
field_id,
|
||||
e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
columns.push(field_path);
|
||||
}
|
||||
|
||||
Ok(results.into_iter().flatten().collect())
|
||||
Some(IndexConfig {
|
||||
name: idx_desc.name().to_string(),
|
||||
index_type,
|
||||
columns,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
Ok(indices)
|
||||
}
|
||||
|
||||
async fn uri(&self) -> Result<String> {
|
||||
@@ -2983,11 +3044,12 @@ impl BaseTable for NativeTable {
|
||||
let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
|
||||
let min = sorted_sizes.first().copied().unwrap_or(0);
|
||||
let max = sorted_sizes.last().copied().unwrap_or(0);
|
||||
let mean = if num_fragments == 0 {
|
||||
0
|
||||
} else {
|
||||
sorted_sizes.iter().copied().sum::<usize>() / num_fragments
|
||||
};
|
||||
let mean = sorted_sizes
|
||||
.iter()
|
||||
.copied()
|
||||
.sum::<usize>()
|
||||
.checked_div(num_fragments)
|
||||
.unwrap_or(0);
|
||||
|
||||
let frag_stats = FragmentStatistics {
|
||||
num_fragments,
|
||||
@@ -3074,6 +3136,7 @@ pub struct FragmentSummaryStats {
|
||||
#[cfg(test)]
|
||||
#[allow(deprecated)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
@@ -3854,6 +3917,25 @@ mod tests {
|
||||
1
|
||||
);
|
||||
|
||||
let default_vector_results = table
|
||||
.query()
|
||||
.nearest_to(&[0.0; 8])
|
||||
.unwrap()
|
||||
.limit(1)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
default_vector_results
|
||||
.iter()
|
||||
.map(|batch| batch.num_rows())
|
||||
.sum::<usize>(),
|
||||
1
|
||||
);
|
||||
|
||||
let fts_results = table
|
||||
.query()
|
||||
.full_text_search(FullTextSearchQuery::new("document".to_string()))
|
||||
@@ -3967,26 +4049,27 @@ mod tests {
|
||||
let index_configs = table.list_indices().await.unwrap();
|
||||
assert_eq!(index_configs.len(), 5);
|
||||
|
||||
// list_indices returns indices in alphabetical order by name
|
||||
let mut configs_iter = index_configs.into_iter();
|
||||
let index = configs_iter.next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
|
||||
assert_eq!(index.columns, vec!["category".to_string()]);
|
||||
|
||||
let index = configs_iter.next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
|
||||
assert_eq!(index.columns, vec!["is_active".to_string()]);
|
||||
|
||||
let index = configs_iter.next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
|
||||
assert_eq!(index.columns, vec!["data".to_string()]);
|
||||
|
||||
let index = configs_iter.next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
|
||||
assert_eq!(index.columns, vec!["large_data".to_string()]);
|
||||
assert_eq!(index.columns, vec!["is_active".to_string()]);
|
||||
|
||||
let index = configs_iter.next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
|
||||
assert_eq!(index.columns, vec!["large_category".to_string()]);
|
||||
|
||||
let index = configs_iter.next().unwrap();
|
||||
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
|
||||
assert_eq!(index.columns, vec!["large_data".to_string()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -4558,21 +4641,6 @@ mod tests {
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Reject when no PK is set.
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.expect_err("should reject without PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Set PK, then a mismatched column on the spec must be rejected.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
|
||||
.await
|
||||
.expect_err("should reject column != PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Reject num_buckets out of range.
|
||||
for bad in [0u32, 1025] {
|
||||
let err = table
|
||||
@@ -4638,9 +4706,6 @@ mod tests {
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Lance's MemWAL still requires *some* unenforced primary key on
|
||||
// the dataset; Unsharded just skips the per-row hashing step.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::unsharded())
|
||||
.await
|
||||
@@ -4687,7 +4752,6 @@ mod tests {
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(
|
||||
LsmWriteSpec::identity("region")
|
||||
@@ -4743,7 +4807,6 @@ mod tests {
|
||||
table.unset_lsm_write_spec().await.unwrap_err();
|
||||
|
||||
// Install a spec, then unset it.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
|
||||
@@ -982,4 +982,105 @@ mod tests {
|
||||
table2.add(struct_batch).execute().await.unwrap();
|
||||
assert_eq!(table2.count_rows(None).await.unwrap(), 2);
|
||||
}
|
||||
|
||||
/// Regression test: appending `arrow.json` (PyArrow `pa.json_()`) data into a table
|
||||
/// whose schema was created with `pa.json_()` (internally stored as `lance.json`, backed
|
||||
/// by `LargeBinary`) must succeed without a schema-mismatch error.
|
||||
///
|
||||
/// Previously `build_field_exprs` would attempt a `Utf8 → LargeBinary` DataFusion cast,
|
||||
/// which produced a field whose Arrow extension metadata still read `arrow.json` instead
|
||||
/// of `lance.json`. Lance-core then rejected the append with
|
||||
/// `"json vs large_binary" schema mismatch`.
|
||||
///
|
||||
/// PyArrow's `pa.json_()` may be backed by either `Utf8` or `LargeUtf8` depending on the
|
||||
/// constructor used, so the test is parameterized over the input backing type.
|
||||
#[rstest::rstest]
|
||||
#[case::utf8(DataType::Utf8)]
|
||||
#[case::large_utf8(DataType::LargeUtf8)]
|
||||
#[tokio::test]
|
||||
async fn test_add_arrow_json_into_lance_json_table(#[case] input_type: DataType) {
|
||||
use arrow_array::{Array, cast::AsArray};
|
||||
use lance_arrow::ARROW_EXT_NAME_KEY;
|
||||
use lance_arrow::json::{ARROW_JSON_EXT_NAME, JSON_EXT_NAME};
|
||||
|
||||
// Build a table whose "data" column is lance.json (LargeBinary +
|
||||
// ARROW:extension:name = "lance.json").
|
||||
let lance_json_field = lance_arrow::json::json_field("data", true);
|
||||
let table_schema = Arc::new(Schema::new(vec![lance_json_field]));
|
||||
|
||||
let db = connect("memory://").execute().await.unwrap();
|
||||
let table = db
|
||||
.create_empty_table("json_test", table_schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Sanity-check the stored schema.
|
||||
let stored_field = table.schema().await.unwrap();
|
||||
let data_field = stored_field.field_with_name("data").unwrap();
|
||||
assert_eq!(data_field.data_type(), &DataType::LargeBinary);
|
||||
assert_eq!(
|
||||
data_field
|
||||
.metadata()
|
||||
.get(ARROW_EXT_NAME_KEY)
|
||||
.map(|s| s.as_str()),
|
||||
Some(JSON_EXT_NAME),
|
||||
);
|
||||
|
||||
// Build an arrow.json input field (Utf8/LargeUtf8 + arrow.json extension).
|
||||
// This is what PyArrow produces for pa.json_() arrays.
|
||||
let arrow_json_metadata = std::collections::HashMap::from([(
|
||||
ARROW_EXT_NAME_KEY.to_string(),
|
||||
ARROW_JSON_EXT_NAME.to_string(),
|
||||
)]);
|
||||
let arrow_json_field =
|
||||
Field::new("data", input_type.clone(), true).with_metadata(arrow_json_metadata);
|
||||
let arrow_json_schema = Arc::new(Schema::new(vec![arrow_json_field]));
|
||||
|
||||
let rows: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)];
|
||||
let string_array: Arc<dyn arrow_array::Array> = match input_type {
|
||||
DataType::Utf8 => Arc::new(arrow_array::StringArray::from(rows.clone())),
|
||||
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(rows.clone())),
|
||||
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
|
||||
};
|
||||
let batch = RecordBatch::try_new(arrow_json_schema, vec![string_array]).unwrap();
|
||||
|
||||
// This must not fail with a schema-mismatch error.
|
||||
table.add(batch).execute().await.unwrap();
|
||||
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), rows.len());
|
||||
|
||||
// A lance.json column is read back as Utf8 carrying arrow.json extension metadata.
|
||||
let results: Vec<RecordBatch> = table
|
||||
.query()
|
||||
.select(Select::columns(&["data"]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
let batch = &results[0];
|
||||
assert_eq!(batch.num_rows(), rows.len());
|
||||
|
||||
let json_col = batch.column(0);
|
||||
assert_eq!(json_col.data_type(), &DataType::Utf8);
|
||||
let json_strs = json_col.as_string::<i32>();
|
||||
|
||||
for (i, expected) in rows.iter().enumerate() {
|
||||
match expected {
|
||||
None => assert!(json_strs.is_null(i), "row {i} expected null"),
|
||||
Some(raw) => {
|
||||
assert!(!json_strs.is_null(i), "row {i} expected non-null");
|
||||
let actual: serde_json::Value = serde_json::from_str(json_strs.value(i))
|
||||
.expect("read-back JSON should be valid");
|
||||
let expected: serde_json::Value =
|
||||
serde_json::from_str(raw).expect("expected JSON should be valid");
|
||||
assert_eq!(actual, expected, "row {i} JSON mismatch");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
|
||||
use datafusion_physical_plan::expressions::Column;
|
||||
use datafusion_physical_plan::projection::ProjectionExec;
|
||||
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
|
||||
use lance_arrow::json::{is_arrow_json_field, is_json_field};
|
||||
|
||||
use crate::{Error, Result};
|
||||
|
||||
@@ -64,6 +65,18 @@ fn build_field_exprs(
|
||||
let input_field = &input_fields[input_idx];
|
||||
let input_expr = get_input_expr(input_idx);
|
||||
|
||||
// Special case: input is arrow.json (PyArrow pa.json_() extension type backed by
|
||||
// Utf8/LargeUtf8) and the table field is lance.json (backed by LargeBinary).
|
||||
// Lance-core's write path already handles the arrow.json → lance.json conversion
|
||||
// (including JSONB encoding), so we pass the expression through unchanged and let
|
||||
// lance-core deal with it. Attempting to cast Utf8 → LargeBinary here would
|
||||
// produce a field whose metadata still identifies it as arrow.json, which then
|
||||
// causes a schema-mismatch error inside lance-core.
|
||||
if is_arrow_json_field(input_field) && is_json_field(table_field) {
|
||||
result.push((input_expr, Arc::clone(input_field) as FieldRef));
|
||||
continue;
|
||||
}
|
||||
|
||||
let expr = match (input_field.data_type(), table_field.data_type()) {
|
||||
// Both are structs: recurse into sub-fields to handle subschemas and casts.
|
||||
(DataType::Struct(in_children), DataType::Struct(tbl_children))
|
||||
@@ -618,4 +631,75 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(a.values(), &[1, 3]);
|
||||
}
|
||||
|
||||
/// `arrow.json` input (PyArrow `pa.json_()`, Utf8/LargeUtf8 + extension metadata) against a
|
||||
/// `lance.json` table field (LargeBinary + extension metadata) must be passed through
|
||||
/// without a cast so that lance-core can perform its own arrow.json → JSONB conversion.
|
||||
///
|
||||
/// Before the fix, `cast_to_table_schema` attempted a `Utf8 → LargeBinary` DataFusion
|
||||
/// cast that preserved the wrong extension metadata, causing lance-core to reject the
|
||||
/// batch with a "json vs large_binary" schema-mismatch error.
|
||||
#[rstest::rstest]
|
||||
#[case::utf8(DataType::Utf8)]
|
||||
#[case::large_utf8(DataType::LargeUtf8)]
|
||||
#[tokio::test]
|
||||
async fn test_arrow_json_passthrough_to_lance_json(#[case] input_type: DataType) {
|
||||
use lance_arrow::ARROW_EXT_NAME_KEY;
|
||||
use lance_arrow::json::{ARROW_JSON_EXT_NAME, json_field};
|
||||
|
||||
// Build a table schema with a lance.json field (LargeBinary + lance.json metadata).
|
||||
let lance_field = json_field("data", true);
|
||||
let table_schema = Schema::new(vec![lance_field]);
|
||||
|
||||
// Build an input batch with an arrow.json field (Utf8/LargeUtf8 + arrow.json metadata).
|
||||
let arrow_meta = std::collections::HashMap::from([(
|
||||
ARROW_EXT_NAME_KEY.to_string(),
|
||||
ARROW_JSON_EXT_NAME.to_string(),
|
||||
)]);
|
||||
let arrow_field = Field::new("data", input_type.clone(), true).with_metadata(arrow_meta);
|
||||
let input_schema = Arc::new(Schema::new(vec![arrow_field]));
|
||||
|
||||
let values = vec![Some(r#"{"x": 1}"#), None, Some(r#"{"y": 2}"#)];
|
||||
let input_array: Arc<dyn arrow_array::Array> = match input_type {
|
||||
DataType::Utf8 => Arc::new(StringArray::from(values)),
|
||||
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(values)),
|
||||
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
|
||||
};
|
||||
let input_batch = RecordBatch::try_new(input_schema, vec![input_array]).unwrap();
|
||||
|
||||
let plan = plan_from_batch(input_batch).await;
|
||||
let projected = cast_to_table_schema(plan, &table_schema).unwrap();
|
||||
|
||||
// The projected schema's "data" field must carry arrow.json metadata
|
||||
// (the input field), not be silently dropped or miscast.
|
||||
let out_field = projected.schema().field_with_name("data").unwrap().clone();
|
||||
assert_eq!(out_field.data_type(), &input_type);
|
||||
assert_eq!(
|
||||
out_field
|
||||
.metadata()
|
||||
.get(ARROW_EXT_NAME_KEY)
|
||||
.map(|s| s.as_str()),
|
||||
Some(ARROW_JSON_EXT_NAME),
|
||||
"output field must still carry arrow.json metadata so lance-core can handle it"
|
||||
);
|
||||
|
||||
// The data must flow through correctly (3 rows, no panic).
|
||||
let result = collect(projected).await;
|
||||
assert_eq!(result.num_rows(), 3);
|
||||
let (v0, v2) = match input_type {
|
||||
DataType::Utf8 => {
|
||||
let col: &StringArray = result.column(0).as_any().downcast_ref().unwrap();
|
||||
(col.value(0).to_string(), col.value(2).to_string())
|
||||
}
|
||||
DataType::LargeUtf8 => {
|
||||
let col: &arrow_array::LargeStringArray =
|
||||
result.column(0).as_any().downcast_ref().unwrap();
|
||||
(col.value(0).to_string(), col.value(2).to_string())
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
assert_eq!(v0, r#"{"x": 1}"#);
|
||||
assert!(result.column(0).is_null(1));
|
||||
assert_eq!(v2, r#"{"y": 2}"#);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -870,8 +870,10 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Should return empty or nearly empty result
|
||||
assert!(result[0].num_rows() <= 1);
|
||||
assert_eq!(
|
||||
result.iter().map(|batch| batch.num_rows()).sum::<usize>(),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::{
|
||||
|
||||
use lance::{Dataset, dataset::refs};
|
||||
|
||||
use crate::table::merge::lsm::ShardWriterCache;
|
||||
use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
|
||||
|
||||
/// A wrapper around a [Dataset] that provides consistency checks.
|
||||
@@ -18,6 +19,10 @@ use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
|
||||
pub struct DatasetConsistencyWrapper {
|
||||
state: Arc<Mutex<DatasetState>>,
|
||||
consistency: ConsistencyMode,
|
||||
/// The single MemWAL `ShardWriter` for this dataset, co-located so it is
|
||||
/// cached for the session and shares the dataset's lifecycle. A dataset
|
||||
/// writes to one shard at a time. Shared by `Arc` across clones.
|
||||
shard_writer: Arc<ShardWriterCache>,
|
||||
}
|
||||
|
||||
/// The current dataset and whether it is pinned to a specific version.
|
||||
@@ -67,9 +72,15 @@ impl DatasetConsistencyWrapper {
|
||||
pinned_version: None,
|
||||
})),
|
||||
consistency,
|
||||
shard_writer: Arc::new(ShardWriterCache::default()),
|
||||
}
|
||||
}
|
||||
|
||||
/// The MemWAL `ShardWriter` cache co-located with this dataset.
|
||||
pub(crate) fn shard_writer(&self) -> &Arc<ShardWriterCache> {
|
||||
&self.shard_writer
|
||||
}
|
||||
|
||||
/// Get the current dataset.
|
||||
///
|
||||
/// Behavior depends on the consistency mode:
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
use lance::dataset::DeleteBuilder;
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::NativeTable;
|
||||
use super::{NativeTable, Predicate};
|
||||
use crate::Result;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
@@ -21,17 +24,39 @@ pub struct DeleteResult {
|
||||
/// Internal implementation of the delete logic
|
||||
///
|
||||
/// This logic was moved from NativeTable::delete to keep table.rs clean.
|
||||
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
|
||||
pub(crate) async fn execute_delete(
|
||||
table: &NativeTable,
|
||||
predicate: Predicate<'_>,
|
||||
) -> Result<DeleteResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let delete_result = dataset.delete(predicate).boxed().await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(DeleteResult {
|
||||
num_deleted_rows,
|
||||
version,
|
||||
})
|
||||
match predicate {
|
||||
Predicate::String(s) => {
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let delete_result = dataset.delete(s).boxed().await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(DeleteResult {
|
||||
num_deleted_rows,
|
||||
version,
|
||||
})
|
||||
}
|
||||
Predicate::Expr(expr) => {
|
||||
let dataset = table.dataset.get().await?;
|
||||
let delete_result = DeleteBuilder::from_expr(Arc::clone(&dataset), expr.clone())
|
||||
.execute()
|
||||
.await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = delete_result.new_dataset.version().version;
|
||||
table.dataset.update(
|
||||
Arc::try_unwrap(delete_result.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
|
||||
);
|
||||
Ok(DeleteResult {
|
||||
num_deleted_rows,
|
||||
version,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -176,4 +201,100 @@ mod tests {
|
||||
"Table version must increment after delete operation"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_expr() {
|
||||
use datafusion_expr::{col, lit};
|
||||
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
// 1. Create a table with values 0 to 9
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Array::from_iter_values(0..10))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("test_delete_expr", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 2. Verify initial state
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
||||
let initial_version = table.version().await.unwrap();
|
||||
|
||||
// 3. Execute Delete with Expr (removes values > 5)
|
||||
let expr = col("i").gt(lit(5));
|
||||
table.delete(&expr).await.unwrap();
|
||||
|
||||
// 4. Verify results
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 6); // 0, 1, 2, 3, 4, 5 remain
|
||||
let current_version = table.version().await.unwrap();
|
||||
assert!(
|
||||
current_version > initial_version,
|
||||
"Table version must increment after delete_expr operation"
|
||||
);
|
||||
|
||||
// 5. Verify specific data consistency
|
||||
let batches = table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
let batch = &batches[0];
|
||||
let array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.unwrap();
|
||||
|
||||
// Ensure no value > 5 exists
|
||||
for val in array.iter() {
|
||||
assert!(val.unwrap() <= 5);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_expr_increments_version() {
|
||||
use datafusion_expr::lit;
|
||||
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
// Create a table with 5 rows
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5])).unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("test_delete_expr_noop", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Capture the initial state (Rows = 5, Version = 1)
|
||||
let initial_rows = table.count_rows(None).await.unwrap();
|
||||
let initial_version = table.version().await.unwrap();
|
||||
|
||||
assert_eq!(initial_rows, 5);
|
||||
let expr = lit(false);
|
||||
table.delete(&expr).await.unwrap();
|
||||
|
||||
// Rows should still be 5
|
||||
let current_rows = table.count_rows(None).await.unwrap();
|
||||
assert_eq!(
|
||||
current_rows, initial_rows,
|
||||
"Data should not change when predicate is false"
|
||||
);
|
||||
|
||||
// version check
|
||||
let current_version = table.version().await.unwrap();
|
||||
assert!(
|
||||
current_version > initial_version,
|
||||
"Table version must increment after delete_expr operation"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,16 @@ pub struct MergeResult {
|
||||
/// A value of 1 means the operation succeeded on the first try.
|
||||
#[serde(default)]
|
||||
pub num_attempts: u32,
|
||||
/// Total number of rows written.
|
||||
///
|
||||
/// On the standard `merge_insert` path this equals
|
||||
/// `num_inserted_rows + num_updated_rows`. On the MemWAL LSM write path the
|
||||
/// insert/update breakdown is not known until compaction; in that mode
|
||||
/// `num_inserted_rows`, `num_updated_rows`, `num_deleted_rows`, `version`
|
||||
/// and `num_attempts` are all `0` and this field holds the total number of
|
||||
/// rows written through the shard writer.
|
||||
#[serde(default)]
|
||||
pub num_rows: u64,
|
||||
}
|
||||
|
||||
/// A builder used to create and run a merge insert operation
|
||||
@@ -57,6 +67,8 @@ pub struct MergeInsertBuilder {
|
||||
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
|
||||
pub(crate) timeout: Option<Duration>,
|
||||
pub(crate) use_index: bool,
|
||||
pub(crate) use_lsm_write: Option<bool>,
|
||||
pub(crate) validate_single_shard: bool,
|
||||
}
|
||||
|
||||
impl MergeInsertBuilder {
|
||||
@@ -71,6 +83,8 @@ impl MergeInsertBuilder {
|
||||
when_not_matched_by_source_delete_filt: None,
|
||||
timeout: None,
|
||||
use_index: true,
|
||||
use_lsm_write: None,
|
||||
validate_single_shard: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,6 +164,34 @@ impl MergeInsertBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Controls whether `merge_insert` uses the MemWAL LSM write path.
|
||||
///
|
||||
/// By default (unset), a `merge_insert` on a table with an
|
||||
/// [`LsmWriteSpec`](super::LsmWriteSpec) installed is routed through
|
||||
/// Lance's MemWAL shard writer, and a table without one uses the standard
|
||||
/// path. Calling this with `false` forces the standard path even when a
|
||||
/// spec is set. Calling it with `true` requires a spec — `merge_insert`
|
||||
/// errors if none is installed.
|
||||
pub fn use_lsm_write(&mut self, use_lsm_write: bool) -> &mut Self {
|
||||
self.use_lsm_write = Some(use_lsm_write);
|
||||
self
|
||||
}
|
||||
|
||||
/// Controls how an LSM `merge_insert` checks that its input targets a
|
||||
/// single shard.
|
||||
///
|
||||
/// When a table has an LSM write spec, every row in a `merge_insert` call
|
||||
/// must route to the same shard. When `true` (the default), every row is
|
||||
/// inspected to verify this. When `false`, only the first row is inspected
|
||||
/// and the shard it routes to is used for the whole input — a faster path
|
||||
/// for callers that have already pre-sharded their input.
|
||||
///
|
||||
/// Has no effect on tables without an LSM write spec.
|
||||
pub fn validate_single_shard(&mut self, validate_single_shard: bool) -> &mut Self {
|
||||
self.validate_single_shard = validate_single_shard;
|
||||
self
|
||||
}
|
||||
|
||||
/// Executes the merge insert operation
|
||||
///
|
||||
/// Returns version and statistics about the merge operation including the number of rows
|
||||
@@ -167,6 +209,23 @@ pub(crate) async fn execute_merge_insert(
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<MergeResult> {
|
||||
match lsm::lsm_dispatch_decision(table, ¶ms).await? {
|
||||
lsm::LsmDispatch::Lsm(plan) => {
|
||||
let future =
|
||||
lsm::execute_lsm_merge_insert(table, plan, params.validate_single_shard, new_data);
|
||||
return match params.timeout {
|
||||
Some(timeout) => match tokio::time::timeout(timeout, future).await {
|
||||
Ok(result) => result,
|
||||
Err(_) => Err(Error::Runtime {
|
||||
message: "merge insert timed out".to_string(),
|
||||
}),
|
||||
},
|
||||
None => future.await,
|
||||
};
|
||||
}
|
||||
lsm::LsmDispatch::Standard => {}
|
||||
}
|
||||
|
||||
let dataset = table.dataset.get().await?;
|
||||
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
|
||||
match (
|
||||
@@ -219,6 +278,7 @@ pub(crate) async fn execute_merge_insert(
|
||||
num_inserted_rows: stats.num_inserted_rows,
|
||||
num_deleted_rows: stats.num_deleted_rows,
|
||||
num_attempts: stats.num_attempts,
|
||||
num_rows: stats.num_inserted_rows + stats.num_updated_rows,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -327,3 +387,366 @@ mod tests {
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 25);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod lsm_tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::{
|
||||
Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
|
||||
};
|
||||
use arrow_schema::{DataType, Field, Schema};
|
||||
use tempfile::{TempDir, tempdir};
|
||||
|
||||
use crate::connect;
|
||||
use crate::error::Error;
|
||||
use crate::table::{LsmWriteSpec, Table};
|
||||
|
||||
/// A reader of `[id: Int64, value: Int64]` rows; `value` is `0..n`.
|
||||
fn id_value_reader(ids: Vec<i64>) -> Box<dyn RecordBatchReader + Send> {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("value", DataType::Int64, false),
|
||||
]));
|
||||
let n = ids.len() as i64;
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int64Array::from(ids)),
|
||||
Arc::new(Int64Array::from_iter_values(0..n)),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
|
||||
}
|
||||
|
||||
/// A reader of `[id: Int64, region: Utf8]` rows.
|
||||
fn id_region_reader(rows: Vec<(i64, &str)>) -> Box<dyn RecordBatchReader + Send> {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("region", DataType::Utf8, false),
|
||||
]));
|
||||
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
|
||||
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int64Array::from(ids)),
|
||||
Arc::new(StringArray::from(regions)),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
|
||||
}
|
||||
|
||||
/// A multi-batch reader of `[id: Int64, region: Utf8]` rows.
|
||||
fn id_region_multi_reader(batches: Vec<Vec<(i64, &str)>>) -> Box<dyn RecordBatchReader + Send> {
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int64, false),
|
||||
Field::new("region", DataType::Utf8, false),
|
||||
]));
|
||||
let records: Vec<_> = batches
|
||||
.into_iter()
|
||||
.map(|rows| {
|
||||
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
|
||||
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
|
||||
Ok(RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int64Array::from(ids)),
|
||||
Arc::new(StringArray::from(regions)),
|
||||
],
|
||||
)
|
||||
.unwrap())
|
||||
})
|
||||
.collect();
|
||||
Box::new(RecordBatchIterator::new(records, schema))
|
||||
}
|
||||
|
||||
/// Create an `[id, value]` table with `id` as the unenforced primary key.
|
||||
async fn id_value_table(dir: &TempDir) -> Table {
|
||||
let conn = connect(dir.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("t", id_value_reader(vec![1, 2, 3]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_bucket() {
|
||||
let dir = tempdir().unwrap();
|
||||
let table = id_value_table(&dir).await;
|
||||
// num_buckets = 1: every row routes to the single bucket.
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Empty `on` defaults to the primary key.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let result = builder
|
||||
.execute(id_value_reader(vec![3, 4, 5]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// LSM path: rows go to the MemWAL, the breakdown is unknown until
|
||||
// compaction, so only `num_rows` is populated.
|
||||
assert_eq!(result.num_rows, 3);
|
||||
assert_eq!(result.version, 0);
|
||||
assert_eq!(result.num_inserted_rows, 0);
|
||||
assert_eq!(result.num_updated_rows, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_unsharded() {
|
||||
let dir = tempdir().unwrap();
|
||||
let table = id_value_table(&dir).await;
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::unsharded())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut builder = table.merge_insert(&["id"]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let result = builder
|
||||
.execute(id_value_reader(vec![10, 11, 12, 13]))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.num_rows, 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_identity() {
|
||||
let dir = tempdir().unwrap();
|
||||
let conn = connect(dir.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("t", id_region_reader(vec![(1, "us"), (2, "us")]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// All rows share one identity value, so they route to one shard.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let result = builder
|
||||
.execute(id_region_reader(vec![(3, "us"), (4, "us")]))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.num_rows, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_use_lsm_write_false_falls_back() {
|
||||
let dir = tempdir().unwrap();
|
||||
let table = id_value_table(&dir).await;
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// use_lsm_write(false) opts out: the standard path runs and commits.
|
||||
let mut builder = table.merge_insert(&["id"]);
|
||||
builder.when_not_matched_insert_all().use_lsm_write(false);
|
||||
let result = builder
|
||||
.execute(id_value_reader(vec![3, 4, 5]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.num_inserted_rows, 2);
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_rejects_on_not_primary_key() {
|
||||
let dir = tempdir().unwrap();
|
||||
let table = id_value_table(&dir).await;
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut builder = table.merge_insert(&["value"]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let err = builder.execute(id_value_reader(vec![1])).await.unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_rejects_non_upsert() {
|
||||
let dir = tempdir().unwrap();
|
||||
let table = id_value_table(&dir).await;
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Insert-only (no when_matched_update_all) is not the upsert shape.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder.when_not_matched_insert_all();
|
||||
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_close_writers_then_reopen() {
|
||||
let dir = tempdir().unwrap();
|
||||
let table = id_value_table(&dir).await;
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
builder.execute(id_value_reader(vec![7, 8])).await.unwrap();
|
||||
|
||||
table.close_lsm_writers().await.unwrap();
|
||||
|
||||
// The writer reopens lazily on the next merge_insert.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let result = builder.execute(id_value_reader(vec![9])).await.unwrap();
|
||||
assert_eq!(result.num_rows, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_multi_batch() {
|
||||
let dir = tempdir().unwrap();
|
||||
let conn = connect(dir.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("t", id_region_reader(vec![(1, "us")]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Multiple batches that all route to one shard are written together.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let result = builder
|
||||
.execute(id_region_multi_reader(vec![
|
||||
vec![(2, "us"), (3, "us")],
|
||||
vec![(4, "us")],
|
||||
]))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.num_rows, 3);
|
||||
|
||||
// Batches that route to different shards are rejected; the validation
|
||||
// runs before any write, so no partial write is left behind.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let err = builder
|
||||
.execute(id_region_multi_reader(vec![
|
||||
vec![(5, "us")],
|
||||
vec![(6, "eu")],
|
||||
]))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_use_lsm_write_true_requires_spec() {
|
||||
let dir = tempdir().unwrap();
|
||||
// id_value_table sets a primary key but no LSM write spec.
|
||||
let table = id_value_table(&dir).await;
|
||||
|
||||
let mut builder = table.merge_insert(&["id"]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all()
|
||||
.use_lsm_write(true);
|
||||
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lsm_merge_insert_rejects_second_shard() {
|
||||
let dir = tempdir().unwrap();
|
||||
let conn = connect(dir.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("t", id_region_reader(vec![(1, "us")]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The first merge_insert opens the single writer for shard "us".
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
builder
|
||||
.execute(id_region_reader(vec![(2, "us")]))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// A merge_insert routing to a different shard is rejected.
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
let err = builder
|
||||
.execute(id_region_reader(vec![(3, "eu")]))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
|
||||
|
||||
// After closing the writer, a different shard can be written.
|
||||
table.close_lsm_writers().await.unwrap();
|
||||
let mut builder = table.merge_insert(&[]);
|
||||
builder
|
||||
.when_matched_update_all(None)
|
||||
.when_not_matched_insert_all();
|
||||
builder
|
||||
.execute(id_region_reader(vec![(4, "eu")]))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -6,7 +6,7 @@ pub(crate) mod background_cache;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::{DataType, Schema, SchemaRef};
|
||||
use arrow_schema::{DataType, Field, Schema, SchemaRef};
|
||||
use datafusion_common::{DataFusionError, Result as DataFusionResult};
|
||||
use datafusion_execution::RecordBatchStream;
|
||||
use futures::{FutureExt, Stream};
|
||||
@@ -152,14 +152,10 @@ pub fn validate_namespace(namespace: &[String]) -> Result<()> {
|
||||
/// Find one default column to create index or perform vector query.
|
||||
pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result<String> {
|
||||
// Try to find a vector column.
|
||||
let candidates = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter_map(|field| match infer_vector_dim(field.data_type()) {
|
||||
Ok(d) if dim.is_none() || dim == Some(d as i32) => Some(field.name()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut candidates = Vec::new();
|
||||
for field in schema.fields() {
|
||||
collect_vector_columns(field, &mut Vec::new(), dim, &mut candidates);
|
||||
}
|
||||
if candidates.is_empty() {
|
||||
Err(Error::InvalidInput {
|
||||
message: format!(
|
||||
@@ -180,6 +176,57 @@ pub(crate) fn default_vector_column(schema: &Schema, dim: Option<i32>) -> Result
|
||||
}
|
||||
}
|
||||
|
||||
fn collect_vector_columns(
|
||||
field: &Field,
|
||||
path: &mut Vec<String>,
|
||||
dim: Option<i32>,
|
||||
candidates: &mut Vec<String>,
|
||||
) {
|
||||
path.push(field.name().clone());
|
||||
match infer_vector_dim(field.data_type()) {
|
||||
Ok(d) if dim.is_none() || dim == Some(d as i32) => {
|
||||
let path_segments = path.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
candidates.push(lance_core::datatypes::format_field_path(&path_segments));
|
||||
}
|
||||
_ => {
|
||||
if let DataType::Struct(fields) = field.data_type() {
|
||||
for child in fields {
|
||||
collect_vector_columns(child, path, dim, candidates);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
path.pop();
|
||||
}
|
||||
|
||||
pub(crate) fn resolve_arrow_field_path(schema: &Schema, column: &str) -> Result<(String, Field)> {
|
||||
lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
|
||||
message: format!("Invalid field path `{}`: {}", column, e),
|
||||
})?;
|
||||
|
||||
let lance_schema =
|
||||
lance_core::datatypes::Schema::try_from(schema).map_err(|e| Error::Schema {
|
||||
message: format!("Invalid schema: {}", e),
|
||||
})?;
|
||||
let field_path = lance_schema
|
||||
.resolve_case_insensitive(column)
|
||||
.ok_or_else(|| Error::Schema {
|
||||
message: format!(
|
||||
"Field path `{}` not found in schema. Available field paths: {}",
|
||||
column,
|
||||
lance_schema.field_paths().join(", ")
|
||||
),
|
||||
})?;
|
||||
let field = field_path.last().expect("field path should be non-empty");
|
||||
let path_segments = field_path
|
||||
.iter()
|
||||
.map(|field| field.name.as_str())
|
||||
.collect::<Vec<_>>();
|
||||
let canonical_path = lance_core::datatypes::format_field_path(&path_segments);
|
||||
|
||||
Ok((canonical_path, Field::from(*field)))
|
||||
}
|
||||
|
||||
pub fn supported_btree_data_type(dtype: &DataType) -> bool {
|
||||
dtype.is_integer()
|
||||
|| dtype.is_floating()
|
||||
@@ -450,6 +497,49 @@ mod tests {
|
||||
"vec"
|
||||
);
|
||||
|
||||
let schema_with_nested_vec_col = Schema::new(vec![
|
||||
Field::new("id", DataType::Int16, true),
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
default_vector_column(&schema_with_nested_vec_col, None).unwrap(),
|
||||
"image.embedding"
|
||||
);
|
||||
|
||||
let schema_with_escaped_nested_vec_col = Schema::new(vec![Field::new(
|
||||
"image-meta",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding.v1",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
)]);
|
||||
assert_eq!(
|
||||
default_vector_column(&schema_with_escaped_nested_vec_col, None).unwrap(),
|
||||
"`image-meta`.`embedding.v1`"
|
||||
);
|
||||
|
||||
let multi_vec_col = Schema::new(vec![
|
||||
Field::new("id", DataType::Int16, true),
|
||||
Field::new(
|
||||
@@ -469,6 +559,48 @@ mod tests {
|
||||
.to_string()
|
||||
.contains("More than one")
|
||||
);
|
||||
|
||||
let multi_nested_vec_col = Schema::new(vec![
|
||||
Field::new(
|
||||
"image",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
10,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
Field::new(
|
||||
"text",
|
||||
DataType::Struct(
|
||||
vec![Field::new(
|
||||
"embedding",
|
||||
DataType::FixedSizeList(
|
||||
Arc::new(Field::new("item", DataType::Float32, false)),
|
||||
50,
|
||||
),
|
||||
false,
|
||||
)]
|
||||
.into(),
|
||||
),
|
||||
false,
|
||||
),
|
||||
]);
|
||||
assert_eq!(
|
||||
default_vector_column(&multi_nested_vec_col, Some(50)).unwrap(),
|
||||
"text.embedding"
|
||||
);
|
||||
let err = default_vector_column(&multi_nested_vec_col, None)
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
assert!(err.contains("image.embedding"));
|
||||
assert!(err.contains("text.embedding"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user