Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
71afca2559 Bump version: 0.29.1-beta.0 → 0.30.0-beta.0 2026-05-21 21:35:54 +00:00
75 changed files with 667 additions and 5185 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.0-beta.1"
current_version = "0.30.0-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -29,3 +29,7 @@ runs:
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
working-directory: python
- uses: actions/upload-artifact@v4
with:
name: windows-wheels
path: python\target\wheels

View File

@@ -8,9 +8,6 @@ on:
# This should trigger a dry run (we skip the final publish step)
paths:
- .github/workflows/pypi-publish.yml
- .github/workflows/build_linux_wheel/action.yml
- .github/workflows/build_mac_wheel/action.yml
- .github/workflows/build_windows_wheel/action.yml
- Cargo.toml # Change in dependency frequently breaks builds
- Cargo.lock
@@ -24,21 +21,32 @@ jobs:
linux:
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
timeout-minutes: 60
permissions:
id-token: write
contents: read
strategy:
matrix:
config:
- platform: x86_64
manylinux: "2_17"
extra_args: ""
runner: ubuntu-22.04
- platform: x86_64
manylinux: "2_28"
extra_args: "--features fp16kernels"
runner: ubuntu-22.04
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
- platform: aarch64
manylinux: "2_17"
extra_args: ""
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
runner: ubuntu-2404-8x-arm64
- platform: aarch64
manylinux: "2_28"
extra_args: "--features fp16kernels"
runner: ubuntu-2404-8x-arm64
runs-on: ${{ matrix.config.runner }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -52,14 +60,15 @@ jobs:
args: "--release --strip ${{ matrix.config.extra_args }}"
arm-build: ${{ matrix.config.platform == 'aarch64' }}
manylinux: ${{ matrix.config.manylinux }}
- uses: actions/upload-artifact@v7
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-linux-${{ matrix.config.platform }}-${{ matrix.config.manylinux }}
path: target/wheels/lancedb-*.whl
if-no-files-found: error
fury_token: ${{ secrets.FURY_TOKEN }}
mac:
timeout-minutes: 90
permissions:
id-token: write
contents: read
runs-on: ${{ matrix.config.runner }}
strategy:
matrix:
@@ -69,7 +78,7 @@ jobs:
env:
MACOSX_DEPLOYMENT_TARGET: 10.15
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -81,21 +90,18 @@ jobs:
with:
python-minor-version: 10
args: "--release --strip --target ${{ matrix.config.target }} --features fp16kernels"
- uses: actions/upload-artifact@v7
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-mac-${{ matrix.config.target }}
path: target/wheels/lancedb-*.whl
if-no-files-found: error
fury_token: ${{ secrets.FURY_TOKEN }}
windows:
timeout-minutes: 90
timeout-minutes: 60
permissions:
id-token: write
contents: read
runs-on: windows-latest
env:
# link.exe is single-threaded and the long pole on Windows builds. Use
# rustc's bundled lld-link instead.
CARGO_TARGET_X86_64_PC_WINDOWS_MSVC_LINKER: rust-lld
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -107,70 +113,18 @@ jobs:
with:
python-minor-version: 10
args: "--release --strip"
- uses: actions/upload-artifact@v7
vcpkg_token: ${{ secrets.VCPKG_GITHUB_PACKAGES }}
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-windows
path: target/wheels/lancedb-*.whl
if-no-files-found: error
publish:
name: Publish wheels
if: startsWith(github.ref, 'refs/tags/python-v')
needs: [linux, mac, windows]
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v6
- name: Download wheel artifacts
uses: actions/download-artifact@v8
with:
pattern: wheels-*
path: target/wheels
merge-multiple: true
- name: List wheels
run: ls -la target/wheels
- name: Choose repo
id: choose_repo
run: |
if [[ ${{ github.ref }} == *beta* ]]; then
echo "repo=fury" >> $GITHUB_OUTPUT
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
env:
FURY_TOKEN: ${{ secrets.FURY_TOKEN }}
run: |
shopt -s nullglob
WHEELS=(target/wheels/lancedb-*.whl)
if [[ ${#WHEELS[@]} -eq 0 ]]; then
echo "No wheels found in target/wheels/" >&2
exit 1
fi
for WHEEL in "${WHEELS[@]}"; do
echo "Uploading $WHEEL to Fury"
curl -f -F package=@"$WHEEL" "https://$FURY_TOKEN@push.fury.io/lancedb/"
done
# NOTE: pypa/gh-action-pypi-publish must be invoked directly from a
# workflow file, not from inside a composite action. When called from a
# composite, `github.action_repository` is empty (actions/runner#2473)
# and the action falls back to `github.repository`, producing a bogus
# `docker://ghcr.io/<repo>:<ref>` image reference that GHA tries to pull.
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/
fury_token: ${{ secrets.FURY_TOKEN }}
gh-release:
if: startsWith(github.ref, 'refs/tags/python-v')
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -233,13 +187,13 @@ jobs:
report-failure:
name: Report Workflow Failure
runs-on: ubuntu-latest
needs: [linux, mac, windows, publish]
needs: [linux, mac, windows]
permissions:
contents: read
issues: write
if: always() && failure() && startsWith(github.ref, 'refs/tags/python-v')
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- uses: ./.github/actions/create-failure-issue
with:
job-results: ${{ toJSON(needs) }}

View File

@@ -0,0 +1,34 @@
name: upload-wheel
description: "Upload wheels to Pypi"
inputs:
fury_token:
required: true
description: "release token for the fury repo"
runs:
using: "composite"
steps:
- name: Choose repo
shell: bash
id: choose_repo
run: |
if [[ ${{ github.ref }} == *beta* ]]; then
echo "repo=fury" >> $GITHUB_OUTPUT
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
shell: bash
env:
FURY_TOKEN: ${{ inputs.fury_token }}
run: |
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
echo "Uploading $WHEEL to Fury"
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/

View File

@@ -37,13 +37,10 @@ Before committing changes, run formatting for every language you touched. At min
and run targeted tests through `cd python && uv run ...`.
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
Before creating a PR, the exact value passed to `gh pr create --title` must follow
Conventional Commits, such as `fix: support nested field paths in native index creation`
or `feat(python): add dataset multiprocessing support`. Do not use a plain natural
language summary like `Support nested field paths in native index creation` as the PR
title. The semantic-release check uses the PR title and body as the merge commit message,
so a non-conventional PR title will fail CI. After creating a PR, read the remote PR title
back and fix it immediately if it is not conventional.
Before creating a PR, make sure the PR title follows Conventional Commits, such as
`fix: support nested field paths in native index creation` or
`feat(python): add dataset multiprocessing support`. The semantic-release check uses the
PR title and body as the merge commit message, so a non-conventional PR title will fail CI.
## Coding tips

242
Cargo.lock generated
View File

@@ -1399,12 +1399,6 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "bytecount"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
[[package]]
name = "bytemuck"
version = "1.25.0"
@@ -1528,9 +1522,9 @@ dependencies = [
[[package]]
name = "cedarwood"
version = "0.5.0"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0524a528a6a0288df1863c3c20fe92c301875b4941e7b6c4b394ab08c5a4c55"
checksum = "6d910bedd62c24733263d0bed247460853c9d22e8956bd4cd964302095e04e90"
dependencies = [
"smallvec",
]
@@ -3290,8 +3284,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4083,21 +4077,6 @@ dependencies = [
"zerovec",
]
[[package]]
name = "icu_locale"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5a396343c7208121dc86e35623d3dfe19814a7613cfd14964994cdc9c9a2e26"
dependencies = [
"icu_collections",
"icu_locale_core",
"icu_locale_data",
"icu_provider",
"potential_utf",
"tinystr",
"zerovec",
]
[[package]]
name = "icu_locale_core"
version = "2.2.0"
@@ -4106,18 +4085,11 @@ checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29"
dependencies = [
"displaydoc",
"litemap",
"serde",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_locale_data"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fdcc9ac77c6d74ff5cf6e65ef3181d6af32003b16fce3a77fb451d2f695993"
[[package]]
name = "icu_normalizer"
version = "2.2.0"
@@ -4166,8 +4138,6 @@ checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421"
dependencies = [
"displaydoc",
"icu_locale_core",
"serde",
"stable_deref_trait",
"writeable",
"yoke",
"zerofrom",
@@ -4175,27 +4145,6 @@ dependencies = [
"zerovec",
]
[[package]]
name = "icu_segmenter"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c0794db0b1a86193ac9c48768d0e6c52c54448e0870ad87907d456ee0dac964"
dependencies = [
"icu_collections",
"icu_locale",
"icu_provider",
"icu_segmenter_data",
"potential_utf",
"utf8_iter",
"zerovec",
]
[[package]]
name = "icu_segmenter_data"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4a2c462a4d927d512f5f882a033ddd62f33a05bb9f230d98f736ac3dc85938f"
[[package]]
name = "id-arena"
version = "2.3.0"
@@ -4357,20 +4306,19 @@ checksum = "9028f49264629065d057f340a86acb84867925865f73bbf8d47b4d149a7e88b8"
[[package]]
name = "jieba-macros"
version = "0.10.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46adade69b634535a8f495cf87710ed893cff53e1dbc9dd750c2ab81c5defb82"
checksum = "a29cfc5dcd898604c6f80363411fa6b6b08e27d1d253d6225b9cb6702ea02fc0"
dependencies = [
"phf_codegen 0.13.1",
]
[[package]]
name = "jieba-rs"
version = "0.10.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11b53580aaa8ec8b713da271da434f8947409242c537a9ab3f7b76bdbb19e8a9"
checksum = "3245d6e9d1d5facbd6a23848d6b67e3439738ccbb4fa5a3d65da315ba1a910a2"
dependencies = [
"bytecount",
"cedarwood",
"jieba-macros",
"phf 0.13.1",
@@ -4558,8 +4506,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arc-swap",
"arrow",
@@ -4577,7 +4525,6 @@ dependencies = [
"async_cell",
"aws-credential-types",
"aws-sdk-dynamodb",
"bitpacking",
"byteorder",
"bytes",
"chrono",
@@ -4604,11 +4551,9 @@ dependencies = [
"lance-io",
"lance-linalg",
"lance-namespace",
"lance-select",
"lance-table",
"lance-tokenizer",
"log",
"moka",
"object_store",
"permutation",
"pin-project",
@@ -4632,8 +4577,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4651,34 +4596,10 @@ dependencies = [
"rand 0.9.4",
]
[[package]]
name = "lance-arrow-scalar"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
dependencies = [
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-row",
"arrow-schema",
"half",
]
[[package]]
name = "lance-arrow-stats"
version = "58.0.0"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
dependencies = [
"arrow-array",
"arrow-schema",
"lance-arrow-scalar",
]
[[package]]
name = "lance-bitpacking"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrayref",
"paste",
@@ -4687,8 +4608,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4723,8 +4644,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-array",
@@ -4754,8 +4675,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-array",
@@ -4773,8 +4694,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4809,8 +4730,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4841,8 +4762,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arc-swap",
"arrow",
@@ -4872,7 +4793,6 @@ dependencies = [
"jieba-rs",
"jsonb",
"lance-arrow",
"lance-arrow-stats",
"lance-core",
"lance-datafusion",
"lance-datagen",
@@ -4880,7 +4800,6 @@ dependencies = [
"lance-file",
"lance-io",
"lance-linalg",
"lance-select",
"lance-table",
"lance-tokenizer",
"libm",
@@ -4908,8 +4827,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-arith",
@@ -4951,8 +4870,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4968,8 +4887,8 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"async-trait",
@@ -4981,8 +4900,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-ipc",
@@ -5017,9 +4936,9 @@ dependencies = [
[[package]]
name = "lance-namespace-reqwest-client"
version = "0.7.7"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99"
checksum = "f65e31bdaa13e01dab6e7cf566da31df243c34a542f0d915d3601ec0e01e61d2"
dependencies = [
"reqwest 0.12.28",
"serde",
@@ -5029,25 +4948,10 @@ dependencies = [
"url",
]
[[package]]
name = "lance-select"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
dependencies = [
"arrow-array",
"arrow-buffer",
"byteorder",
"bytes",
"deepsize",
"itertools 0.13.0",
"lance-core",
"roaring",
]
[[package]]
name = "lance-table"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-array",
@@ -5066,7 +4970,6 @@ dependencies = [
"lance-core",
"lance-file",
"lance-io",
"lance-select",
"log",
"object_store",
"prost",
@@ -5087,8 +4990,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5099,10 +5002,9 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "7.2.0-beta.1"
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.1#b9995aba6115e8e4bc43179a45cbd0f9a170f305"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"icu_segmenter",
"jieba-rs",
"lindera",
"rust-stemmers",
@@ -5112,7 +5014,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.30.0-beta.1"
version = "0.29.1-beta.0"
dependencies = [
"ahash",
"anyhow",
@@ -5182,7 +5084,6 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"serial_test",
"snafu 0.8.9",
"tempfile",
"test-log",
@@ -5195,7 +5096,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.30.0-beta.1"
version = "0.29.1-beta.0"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5218,7 +5119,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.33.0-beta.1"
version = "0.32.1-beta.0"
dependencies = [
"arrow",
"async-trait",
@@ -7033,8 +6934,6 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564"
dependencies = [
"serde_core",
"writeable",
"zerovec",
]
@@ -8229,15 +8128,6 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "scc"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
dependencies = [
"sdd",
]
[[package]]
name = "schannel"
version = "0.1.29"
@@ -8304,12 +8194,6 @@ dependencies = [
"untrusted 0.9.0",
]
[[package]]
name = "sdd"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
[[package]]
name = "sec1"
version = "0.3.0"
@@ -8500,32 +8384,6 @@ dependencies = [
"unsafe-libyaml",
]
[[package]]
name = "serial_test"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f"
dependencies = [
"futures-executor",
"futures-util",
"log",
"once_cell",
"parking_lot",
"scc",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "sha1"
version = "0.10.6"
@@ -8548,12 +8406,6 @@ dependencies = [
"digest 0.11.3",
]
[[package]]
name = "sha1_smol"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "sha2"
version = "0.10.9"
@@ -9273,7 +9125,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d"
dependencies = [
"displaydoc",
"serde_core",
"zerovec",
]
@@ -9778,7 +9629,6 @@ dependencies = [
"getrandom 0.4.2",
"js-sys",
"serde_core",
"sha1_smol",
"wasm-bindgen",
]
@@ -10742,7 +10592,6 @@ dependencies = [
"displaydoc",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
@@ -10751,7 +10600,6 @@ version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239"
dependencies = [
"serde",
"yoke",
"zerofrom",
"zerovec-derive",

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -112,25 +112,25 @@ def fetch_remote_tags() -> List[TagInfo]:
"api",
"-X",
"GET",
f"repos/{LANCE_REPO}/releases",
f"repos/{LANCE_REPO}/git/refs/tags",
"--paginate",
"--jq",
".[].tag_name",
"-F",
"per_page=20",
".[].ref",
]
)
tags: List[TagInfo] = []
for line in output.splitlines():
tag = line.strip()
if not tag.startswith("v"):
ref = line.strip()
if not ref.startswith("refs/tags/v"):
continue
tag = ref.split("refs/tags/")[-1]
version = tag.lstrip("v")
try:
tags.append(TagInfo(tag=tag, version=version, semver=parse_semver(version)))
except ValueError:
continue
if not tags:
raise RuntimeError("No Lance releases could be parsed from GitHub API output")
raise RuntimeError("No Lance tags could be parsed from GitHub API output")
return tags

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.30.0-beta.1</version>
<version>0.30.0-beta.0</version>
</dependency>
```

View File

@@ -76,57 +76,6 @@ the query optimizer chooses a suboptimal path.
***
### useLsmWrite()
```ts
useLsmWrite(useLsmWrite): MergeInsertBuilder
```
Controls whether the merge uses the MemWAL LSM write path.
By default (unset), a `mergeInsert` on a table with an LSM write spec is
routed through Lance's MemWAL shard writer, and a table without one uses
the standard path. Pass `false` to force the standard path even when a
spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
is installed.
#### Parameters
* **useLsmWrite**: `boolean`
Whether to use the LSM write path.
#### Returns
[`MergeInsertBuilder`](MergeInsertBuilder.md)
***
### validateSingleShard()
```ts
validateSingleShard(validateSingleShard): MergeInsertBuilder
```
Controls how an LSM merge checks that its input targets a single shard.
When a table has an LSM write spec, every row in a `mergeInsert` call must
route to the same shard. When `true` (the default), every row is inspected
to verify this. When `false`, only the first row is inspected and the
shard it routes to is used for the whole input — a faster path for callers
that have already pre-sharded their input. Has no effect on tables without
an LSM write spec.
#### Parameters
* **validateSingleShard**: `boolean`
Whether to check every row routes to one shard. Defaults to `true`.
#### Returns
[`MergeInsertBuilder`](MergeInsertBuilder.md)
***
### whenMatchedUpdateAll()
```ts

View File

@@ -187,25 +187,6 @@ Any attempt to use the table after it is closed will result in an error.
***
### closeLsmWriters()
```ts
abstract closeLsmWriters(): Promise<void>
```
Drain and close any cached MemWAL shard writers held for this table.
When an [LsmWriteSpec](../interfaces/LsmWriteSpec.md) is installed, `mergeInsert` opens MemWAL
shard writers and caches them for reuse across calls. This closes them,
flushing pending data; writers reopen lazily on the next `mergeInsert`.
It is a no-op when no writers are cached.
#### Returns
`Promise`&lt;`void`&gt;
***
### countRows()
```ts
@@ -994,29 +975,6 @@ based on the row being updated (e.g. "my_col + 1")
***
### updateFieldMetadata()
```ts
abstract updateFieldMetadata(updates): Promise<UpdateFieldMetadataResult>
```
Update per-field (column) metadata.
#### Parameters
* **updates**: [`FieldMetadataUpdate`](../interfaces/FieldMetadataUpdate.md)[]
One or more per-field updates. Each
update's metadata is merged into the field's existing metadata by default;
a value of `null` deletes that key, and `replace: true` swaps the whole map.
#### Returns
`Promise`&lt;[`UpdateFieldMetadataResult`](../interfaces/UpdateFieldMetadataResult.md)&gt;
resolves to the new table version.
***
### vectorSearch()
```ts

View File

@@ -65,7 +65,6 @@
- [DropNamespaceOptions](interfaces/DropNamespaceOptions.md)
- [DropNamespaceResponse](interfaces/DropNamespaceResponse.md)
- [ExecutableQuery](interfaces/ExecutableQuery.md)
- [FieldMetadataUpdate](interfaces/FieldMetadataUpdate.md)
- [FragmentStatistics](interfaces/FragmentStatistics.md)
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
- [FtsOptions](interfaces/FtsOptions.md)
@@ -102,7 +101,6 @@
- [TimeoutConfig](interfaces/TimeoutConfig.md)
- [TlsConfig](interfaces/TlsConfig.md)
- [TokenResponse](interfaces/TokenResponse.md)
- [UpdateFieldMetadataResult](interfaces/UpdateFieldMetadataResult.md)
- [UpdateOptions](interfaces/UpdateOptions.md)
- [UpdateResult](interfaces/UpdateResult.md)
- [Version](interfaces/Version.md)

View File

@@ -70,20 +70,16 @@ client used by manifest-enabled native connections.
optional readConsistencyInterval: number;
```
The interval, in seconds, at which to check for updates to the table
from other processes. If None, then consistency is not checked. For
performance reasons, this is the default. For strong consistency, set
this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero value for
eventual consistency. If more than that interval has passed since the
last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
(For LanceDB OSS only): The interval, in seconds, at which to check for
updates to the table from other processes. If None, then consistency is not
checked. For performance reasons, this is the default. For strong
consistency, set this to zero seconds. Then every read will check for
updates from other processes. As a compromise, you can set this to a
non-zero value for eventual consistency. If more than that interval
has passed since the last check, then the table will be checked for updates.
Note: this consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
***
### region?

View File

@@ -1,41 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / FieldMetadataUpdate
# Interface: FieldMetadataUpdate
A per-field metadata update, addressed by dot-path.
## Properties
### metadata
```ts
metadata: Record<string, null | string>;
```
Metadata key/value pairs. Merged into the field's existing metadata by
default; a value of `null` deletes that key.
***
### path
```ts
path: string;
```
Dot-separated path to the field. For a top-level column this is just its
name; for a nested field it's the path, e.g. "a.b.c".
***
### replace?
```ts
optional replace: boolean;
```
If true, replace the field's entire metadata map instead of merging.

View File

@@ -11,10 +11,7 @@ Specification selecting Lance's MemWAL LSM-style write path for
`specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
`column` and `numBuckets` are required; for `"identity"`, `column` is
required and must be a deterministic function of the unenforced primary
key (every row with a given primary key must always produce the same
`column` value, or upserts of that key can land in different shards and a
stale version can win).
required.
## Properties

View File

@@ -32,14 +32,6 @@ numInsertedRows: number;
***
### numRows
```ts
numRows: number;
```
***
### numUpdatedRows
```ts

View File

@@ -1,15 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / UpdateFieldMetadataResult
# Interface: UpdateFieldMetadataResult
## Properties
### version
```ts
version: number;
```

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.30.0-beta.1</version>
<version>0.30.0-beta.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.30.0-beta.1</version>
<version>0.30.0-beta.0</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.2.0-beta.1</lance-core.version>
<lance-core.version>7.0.0-beta.13</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.30.0-beta.1"
version = "0.30.0-beta.0"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -171,22 +171,18 @@ describe("given a connection", () => {
let manifestDir =
tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], {
enableV2ManifestPaths: true,
})) as LocalTable;
expect(await table.usesV2ManifestPaths()).toBe(true);
manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
it("should be able to migrate tables to the V2 manifest paths", async () => {
@@ -203,20 +199,16 @@ describe("given a connection", () => {
const manifestDir =
tmpDir.name + "/test_manifest_path_migration.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
await table.migrateManifestPathsV2();
expect(await table.usesV2ManifestPaths()).toBe(true);
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
});

View File

@@ -1571,33 +1571,6 @@ describe("schema evolution", function () {
expect(await table.schema()).toEqual(expectedSchema3);
});
it("can update field metadata", async function () {
const con = await connect(tmpDir.name);
const table = await con.createTable("fm", [
{ id: 1, category: "a" },
{ id: 2, category: "b" },
]);
const res = await table.updateFieldMetadata([
{ path: "category", metadata: { unit: "label", pii: "false" } },
]);
expect(res).toHaveProperty("version");
expect(res.version).toBe(2);
let cat = (await table.schema()).fields.find((f) => f.name === "category");
expect(cat?.metadata.get("unit")).toBe("label");
expect(cat?.metadata.get("pii")).toBe("false");
// merge: add a key, delete one via null, keep the rest
await table.updateFieldMetadata([
{ path: "category", metadata: { source: "import", pii: null } },
]);
cat = (await table.schema()).fields.find((f) => f.name === "category");
expect(cat?.metadata.get("unit")).toBe("label"); // preserved
expect(cat?.metadata.get("source")).toBe("import"); // added
expect(cat?.metadata.has("pii")).toBe(false); // deleted
});
it("can cast to various types", async function () {
const con = await connect(tmpDir.name);
@@ -2652,97 +2625,3 @@ describe("setLsmWriteSpec / unsetLsmWriteSpec", () => {
).rejects.toThrow();
});
});
describe("LSM merge insert", () => {
let tmpDir: tmp.DirResult;
beforeEach(() => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
});
afterEach(() => tmpDir.removeCallback());
async function bucketTable(conn: Connection): Promise<Table> {
// The primary key column must be non-nullable.
const table = await conn.createEmptyTable(
"t",
new arrow.Schema([
new arrow.Field("id", new arrow.Utf8(), false),
new arrow.Field("value", new arrow.Float64(), true),
]),
);
await table.add([
{ id: "a", value: 1 },
{ id: "b", value: 2 },
]);
await table.setUnenforcedPrimaryKey("id");
// numBuckets = 1: every row routes to the single bucket.
await table.setLsmWriteSpec({
specType: "bucket",
column: "id",
numBuckets: 1,
});
return table;
}
it("routes merge_insert through the shard writer", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute([
{ id: "c", value: 3 },
{ id: "d", value: 4 },
]);
// LSM path: rows go to the MemWAL, so only numRows is populated.
expect(res.numRows).toBe(2);
expect(res.version).toBe(0);
expect(res.numInsertedRows).toBe(0);
await table.closeLsmWriters();
});
it("falls back to the standard path with useLsmWrite(false)", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.useLsmWrite(false)
.execute([
{ id: "b", value: 9 },
{ id: "e", value: 5 },
]);
// Standard path commits: id="e" inserted ("b" already exists).
expect(res.numInsertedRows).toBe(1);
expect(await table.countRows()).toBe(3);
});
it("supports validateSingleShard(false)", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
const res = await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.validateSingleShard(false)
.execute([{ id: "f", value: 6 }]);
expect(res.numRows).toBe(1);
});
it("rejects a non-upsert merge under an LSM spec", async () => {
const conn = await connect(tmpDir.name);
const table = await bucketTable(conn);
await expect(
table
.mergeInsert("id")
.whenNotMatchedInsertAll()
.execute([{ id: "g", value: 7 }]),
).rejects.toThrow();
});
});

View File

@@ -42,7 +42,6 @@ export {
AddResult,
AddColumnsResult,
AlterColumnsResult,
UpdateFieldMetadataResult,
DeleteResult,
DropColumnsResult,
UpdateResult,
@@ -118,7 +117,6 @@ export {
WriteProgress,
LsmWriteSpec,
ColumnAlteration,
FieldMetadataUpdate,
} from "./table";
export {

View File

@@ -87,41 +87,6 @@ export class MergeInsertBuilder {
this.#schema,
);
}
/**
* Controls whether the merge uses the MemWAL LSM write path.
*
* By default (unset), a `mergeInsert` on a table with an LSM write spec is
* routed through Lance's MemWAL shard writer, and a table without one uses
* the standard path. Pass `false` to force the standard path even when a
* spec is set. Pass `true` to require a spec — `mergeInsert` rejects if none
* is installed.
*
* @param useLsmWrite - Whether to use the LSM write path.
*/
useLsmWrite(useLsmWrite: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.useLsmWrite(useLsmWrite),
this.#schema,
);
}
/**
* Controls how an LSM merge checks that its input targets a single shard.
*
* When a table has an LSM write spec, every row in a `mergeInsert` call must
* route to the same shard. When `true` (the default), every row is inspected
* to verify this. When `false`, only the first row is inspected and the
* shard it routes to is used for the whole input — a faster path for callers
* that have already pre-sharded their input. Has no effect on tables without
* an LSM write spec.
*
* @param validateSingleShard - Whether to check every row routes to one shard. Defaults to `true`.
*/
validateSingleShard(validateSingleShard: boolean): MergeInsertBuilder {
return new MergeInsertBuilder(
this.#native.validateSingleShard(validateSingleShard),
this.#schema,
);
}
/**
* Executes the merge insert operation
*

View File

@@ -32,7 +32,6 @@ import {
OptimizeStats,
TableStatistics,
Tags,
UpdateFieldMetadataResult,
UpdateResult,
Table as _NativeTable,
} from "./native";
@@ -162,10 +161,7 @@ export interface Version {
*
* `specType` is `"bucket"`, `"identity"`, or `"unsharded"`. For `"bucket"`,
* `column` and `numBuckets` are required; for `"identity"`, `column` is
* required and must be a deterministic function of the unenforced primary
* key (every row with a given primary key must always produce the same
* `column` value, or upserts of that key can land in different shards and a
* stale version can win).
* required.
*/
export interface LsmWriteSpec {
/** One of `"bucket"`, `"identity"`, or `"unsharded"`. */
@@ -509,18 +505,6 @@ export abstract class Table {
abstract alterColumns(
columnAlterations: ColumnAlteration[],
): Promise<AlterColumnsResult>;
/**
* Update per-field (column) metadata.
* @param {FieldMetadataUpdate[]} updates One or more per-field updates. Each
* update's metadata is merged into the field's existing metadata by default;
* a value of `null` deletes that key, and `replace: true` swaps the whole map.
* @returns {Promise<UpdateFieldMetadataResult>} resolves to the new table version.
*/
abstract updateFieldMetadata(
updates: FieldMetadataUpdate[],
): Promise<UpdateFieldMetadataResult>;
/**
* Drop one or more columns from the dataset
*
@@ -583,16 +567,6 @@ export abstract class Table {
* @returns {Promise<void>}
*/
abstract unsetLsmWriteSpec(): Promise<void>;
/**
* Drain and close any cached MemWAL shard writers held for this table.
*
* When an {@link LsmWriteSpec} is installed, `mergeInsert` opens MemWAL
* shard writers and caches them for reuse across calls. This closes them,
* flushing pending data; writers reopen lazily on the next `mergeInsert`.
* It is a no-op when no writers are cached.
* @returns {Promise<void>}
*/
abstract closeLsmWriters(): Promise<void>;
/** Retrieve the version of the table */
abstract version(): Promise<number>;
@@ -1050,12 +1024,6 @@ export class LocalTable extends Table {
return await this.inner.alterColumns(processedAlterations);
}
async updateFieldMetadata(
updates: FieldMetadataUpdate[],
): Promise<UpdateFieldMetadataResult> {
return await this.inner.updateFieldMetadata(updates);
}
async dropColumns(columnNames: string[]): Promise<DropColumnsResult> {
return await this.inner.dropColumns(columnNames);
}
@@ -1073,10 +1041,6 @@ export class LocalTable extends Table {
return await this.inner.unsetLsmWriteSpec();
}
async closeLsmWriters(): Promise<void> {
return await this.inner.closeLsmWriters();
}
async version(): Promise<number> {
return await this.inner.version();
}
@@ -1222,19 +1186,3 @@ export interface ColumnAlteration {
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
nullable?: boolean;
}
/** A per-field metadata update, addressed by dot-path. */
export interface FieldMetadataUpdate {
/**
* Dot-separated path to the field. For a top-level column this is just its
* name; for a nested field it's the path, e.g. "a.b.c".
*/
path: string;
/**
* Metadata key/value pairs. Merged into the field's existing metadata by
* default; a value of `null` deletes that key.
*/
metadata: Record<string, string | null>;
/** If true, replace the field's entire metadata map instead of merging. */
replace?: boolean;
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.30.0-beta.1",
"version": "0.29.1-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.30.0-beta.1",
"version": "0.29.1-beta.0",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -24,19 +24,15 @@ mod util;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
/// The interval, in seconds, at which to check for updates to the table
/// from other processes. If None, then consistency is not checked. For
/// performance reasons, this is the default. For strong consistency, set
/// this to zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero value for
/// eventual consistency. If more than that interval has passed since the
/// last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
pub read_consistency_interval: Option<f64>,
/// (For LanceDB OSS only): configuration for object storage.
///

View File

@@ -50,20 +50,6 @@ impl NativeMergeInsertBuilder {
this
}
#[napi]
pub fn use_lsm_write(&self, use_lsm_write: bool) -> Self {
let mut this = self.clone();
this.inner.use_lsm_write(use_lsm_write);
this
}
#[napi]
pub fn validate_single_shard(&self, validate_single_shard: bool) -> Self {
let mut this = self.clone();
this.inner.validate_single_shard(validate_single_shard);
this
}
#[napi(catch_unwind)]
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
let data = ipc_file_to_batches(buf.to_vec())

View File

@@ -5,9 +5,8 @@ use std::collections::HashMap;
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration,
FieldMetadataUpdate as LanceFieldMetadataUpdate, NewColumnTransform, OptimizeAction,
OptimizeOptions, Table as LanceDbTable,
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
@@ -356,23 +355,6 @@ impl Table {
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn update_field_metadata(
&self,
updates: Vec<FieldMetadataUpdate>,
) -> napi::Result<UpdateFieldMetadataResult> {
let updates = updates
.into_iter()
.map(LanceFieldMetadataUpdate::from)
.collect::<Vec<_>>();
let res = self
.inner_ref()?
.update_field_metadata(&updates)
.await
.default_error()?;
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<DropColumnsResult> {
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
@@ -409,11 +391,6 @@ impl Table {
.default_error()
}
#[napi(catch_unwind)]
pub async fn close_lsm_writers(&self) -> napi::Result<()> {
self.inner_ref()?.close_lsm_writers().await.default_error()
}
#[napi(catch_unwind)]
pub async fn version(&self) -> napi::Result<i64> {
self.inner_ref()?
@@ -765,29 +742,6 @@ pub struct ColumnAlteration {
pub nullable: Option<bool>,
}
/// A per-field metadata update, addressed by dot-path. Merges into the field's
/// existing metadata by default; a `null` value deletes a key, and `replace`
/// swaps the field's entire metadata map.
#[napi(object)]
pub struct FieldMetadataUpdate {
/// Dot-separated path to the field (e.g. "embedding" or "a.b.c").
pub path: String,
/// Metadata keys to set; a `null` value deletes that key.
pub metadata: HashMap<String, Option<String>>,
/// If true, replace the field's entire metadata map instead of merging.
pub replace: Option<bool>,
}
impl From<FieldMetadataUpdate> for LanceFieldMetadataUpdate {
fn from(js: FieldMetadataUpdate) -> Self {
Self {
path: js.path,
metadata: js.metadata,
replace: js.replace.unwrap_or(false),
}
}
}
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
type Error = String;
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
@@ -986,7 +940,6 @@ pub struct MergeResult {
pub num_updated_rows: i64,
pub num_deleted_rows: i64,
pub num_attempts: i64,
pub num_rows: i64,
}
impl From<lancedb::table::MergeResult> for MergeResult {
@@ -997,7 +950,6 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_updated_rows: value.num_updated_rows as i64,
num_deleted_rows: value.num_deleted_rows as i64,
num_attempts: value.num_attempts as i64,
num_rows: value.num_rows as i64,
}
}
}
@@ -1028,19 +980,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
}
}
#[napi(object)]
pub struct UpdateFieldMetadataResult {
pub version: i64,
}
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
fn from(value: lancedb::table::UpdateFieldMetadataResult) -> Self {
Self {
version: value.version as i64,
}
}
}
#[napi(object)]
pub struct DropColumnsResult {
pub version: i64,

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.33.0-beta.1"
current_version = "0.33.0-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.33.0-beta.1"
version = "0.33.0-beta.0"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -94,6 +94,7 @@ def connect(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -103,10 +104,6 @@ def connect(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -150,13 +147,6 @@ def connect(
>>> db = lancedb.connect("s3://my-bucket/lancedb",
... storage_options={"aws_access_key_id": "***"})
For tests and temporary data, use an in-memory database:
>>> db = lancedb.connect("memory://")
In-memory databases are not persisted. Tables are dropped when the last
connection or table handle referencing them is closed.
Connect to LanceDB cloud:
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
@@ -220,7 +210,6 @@ def connect(
request_thread_pool=request_thread_pool,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
**kwargs,
)
_check_s3_bucket_with_dots(str(uri), storage_options)
@@ -347,6 +336,7 @@ async def connect_async(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -356,10 +346,6 @@ async def connect_async(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -392,8 +378,6 @@ async def connect_async(
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
... storage_options={
... "aws_access_key_id": "***"})
... # For tests and temporary data, use an in-memory database
... db = await lancedb.connect_async("memory://")
... # Connect to LanceDB cloud
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
... client_config={

View File

@@ -208,9 +208,6 @@ class Table:
async def alter_columns(
self, columns: list[dict[str, Any]]
) -> AlterColumnsResult: ...
async def update_field_metadata(
self, updates: list[dict[str, Any]]
) -> UpdateFieldMetadataResult: ...
async def optimize(
self,
*,
@@ -223,7 +220,6 @@ class Table:
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
async def unset_lsm_write_spec(self) -> None: ...
async def close_lsm_writers(self) -> None: ...
@property
def tags(self) -> Tags: ...
def query(self) -> Query: ...
@@ -424,7 +420,6 @@ class MergeResult:
num_inserted_rows: int
num_deleted_rows: int
num_attempts: int
num_rows: int
class LsmWriteSpec:
"""Specification selecting Lance's MemWAL LSM-style write path for
@@ -463,9 +458,6 @@ class AddColumnsResult:
class AlterColumnsResult:
version: int
class UpdateFieldMetadataResult:
version: int
class DropColumnsResult:
version: int

View File

@@ -281,9 +281,6 @@ class HnswPq:
m: int = 20
ef_construction: int = 300
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -389,9 +386,6 @@ class HnswSq:
m: int = 20
ef_construction: int = 300
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -585,9 +579,6 @@ class IvfFlat:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -618,9 +609,6 @@ class IvfSq:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -751,9 +739,6 @@ class IvfPq:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
@dataclass
@@ -807,9 +792,6 @@ class IvfRq:
max_iterations: int = 50
sample_rate: int = 256
target_partition_size: Optional[int] = None
# Name of the accelerator (e.g. "cuda") to use for IVF training. When set,
# create_index() dispatches to pylance to build the index on the accelerator.
accelerator: Optional[str] = None
__all__ = [

View File

@@ -34,8 +34,6 @@ class LanceMergeInsertBuilder(object):
self._when_not_matched_by_source_condition = None
self._timeout = None
self._use_index = True
self._use_lsm_write = None
self._validate_single_shard = None
def when_matched_update_all(
self, *, where: Optional[str] = None
@@ -98,46 +96,6 @@ class LanceMergeInsertBuilder(object):
self._use_index = use_index
return self
def use_lsm_write(self, use_lsm_write: bool) -> LanceMergeInsertBuilder:
"""
Controls whether the merge uses the MemWAL LSM write path.
By default (unset), a `merge_insert` on a table with an LSM write spec
is routed through Lance's MemWAL shard writer, and a table without one
uses the standard path. Pass `False` to force the standard path even
when a spec is set. Pass `True` to require a spec — `merge_insert`
raises an error if none is installed.
Parameters
----------
use_lsm_write: bool
Whether to use the LSM write path.
"""
self._use_lsm_write = use_lsm_write
return self
def validate_single_shard(
self, validate_single_shard: bool
) -> LanceMergeInsertBuilder:
"""
Controls how an LSM merge checks that its input targets a single shard.
When a table has an LSM write spec, every row in a `merge_insert` call
must route to the same shard. When `True` (the default), every row is
inspected to verify this. When `False`, only the first row is inspected
and the shard it routes to is used for the whole input — a faster path
for callers that have already pre-sharded their input.
Has no effect on tables without an LSM write spec.
Parameters
----------
validate_single_shard: bool
Whether to check every row routes to one shard. Defaults to `True`.
"""
self._validate_single_shard = validate_single_shard
return self
def execute(
self,
new_data: DATA,

View File

@@ -50,7 +50,6 @@ class RemoteDBConnection(DBConnection):
connection_timeout: Optional[float] = None,
read_timeout: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
read_consistency_interval: Optional[timedelta] = None,
):
"""Connect to a remote LanceDB database."""
if isinstance(client_config, dict):
@@ -104,7 +103,6 @@ class RemoteDBConnection(DBConnection):
host_override=host_override,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
)
)

View File

@@ -2,29 +2,15 @@
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from datetime import timedelta
import deprecation
import logging
from functools import cached_property
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Union,
Literal,
overload,
)
from typing import Any, Callable, Dict, Iterable, List, Optional, Union, Literal
import warnings
from lancedb import __version__
from lancedb._lancedb import (
AddColumnsResult,
AddResult,
AlterColumnsResult,
UpdateFieldMetadataResult,
DeleteResult,
DropColumnsResult,
IndexConfig,
@@ -46,7 +32,6 @@ from lancedb.index import (
LabelList,
)
from lancedb.remote.db import LOOP
from lancedb.table import IndexConfigType, KNOWN_METRICS
import pyarrow as pa
from lancedb.common import DATA, VEC, VECTOR_COLUMN_NAME
@@ -137,11 +122,6 @@ class RemoteTable(Table):
"""List all the stats of a specified index"""
return LOOP.run(self._table.index_stats(index_uuid))
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=BTree()/Bitmap()/LabelList() instead.",
)
def create_scalar_index(
self,
column: str,
@@ -151,12 +131,7 @@ class RemoteTable(Table):
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
):
"""Creates a scalar index.
.. deprecated:: 0.25.0
Use :meth:`create_index` with a BTree, Bitmap, or LabelList config instead.
Example: ``table.create_index("column", config=BTree())``
"""Creates a scalar index
Parameters
----------
column : str
@@ -187,11 +162,6 @@ class RemoteTable(Table):
)
)
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=FTS() instead.",
)
def create_fts_index(
self,
column: str,
@@ -212,12 +182,6 @@ class RemoteTable(Table):
prefix_only: bool = False,
name: Optional[str] = None,
):
"""Create a full-text search index on a column.
.. deprecated:: 0.25.0
Use :meth:`create_index` with an FTS config instead.
Example: ``table.create_index("text_column", config=FTS())``
"""
config = FTS(
with_position=with_position,
base_tokenizer=base_tokenizer,
@@ -241,43 +205,9 @@ class RemoteTable(Table):
)
)
# New unified API overload
@overload
def create_index(
self,
column: str,
/,
*,
config: IndexConfigType,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
# Legacy API overload (deprecated)
@overload
def create_index(
self,
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
vector_column_name: str = ...,
index_cache_size: Optional[int] = ...,
num_partitions: Optional[int] = ...,
num_sub_vectors: Optional[int] = ...,
replace: Optional[bool] = ...,
accelerator: Optional[str] = ...,
index_type: Literal[
"VECTOR", "IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = ...,
wait_timeout: Optional[timedelta] = ...,
*,
num_bits: int = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
def create_index(
self,
metric: str = "l2",
metric="l2",
vector_column_name: str = VECTOR_COLUMN_NAME,
index_cache_size: Optional[int] = None,
num_partitions: Optional[int] = None,
@@ -288,113 +218,89 @@ class RemoteTable(Table):
wait_timeout: Optional[timedelta] = None,
*,
num_bits: int = 8,
config: Optional[IndexConfigType] = None,
name: Optional[str] = None,
train: bool = True,
):
"""Create an index on a column.
"""Create an index on the table.
This method supports both the new unified API and the legacy API
for backwards compatibility. The new API takes the column name as the
first positional argument and an index configuration object via
``config``; the legacy API takes the distance metric as the first
argument plus separate ``vector_column_name`` / ``num_partitions`` /
etc. parameters, and emits a ``DeprecationWarning``.
Parameters
----------
metric : str
The metric to use for the index. Default is "l2".
vector_column_name : str
The name of the vector column. Default is "vector".
Examples
--------
New API (recommended):
>>> table.create_index( # doctest: +SKIP
... "vector", config=IvfPq(distance_type="l2")
>>> import lancedb
>>> import uuid
>>> from lancedb.schema import vector
>>> db = lancedb.connect("db://...", api_key="...", # doctest: +SKIP
... region="...") # doctest: +SKIP
>>> table_name = uuid.uuid4().hex
>>> schema = pa.schema(
... [
... pa.field("id", pa.uint32(), False),
... pa.field("vector", vector(128), False),
... pa.field("s", pa.string(), False),
... ]
... )
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
Legacy API (deprecated):
>>> table.create_index( # doctest: +SKIP
... "l2", vector_column_name="vector"
>>> table = db.create_table( # doctest: +SKIP
... table_name, # doctest: +SKIP
... schema=schema, # doctest: +SKIP
... )
>>> table.create_index("l2", "vector") # doctest: +SKIP
"""
# Detect whether this is a legacy API call
is_legacy = self._is_legacy_create_index_call(
metric,
config,
num_partitions,
num_sub_vectors,
vector_column_name,
accelerator,
index_cache_size,
replace,
)
if is_legacy:
warnings.warn(
"The create_index() API with metric/num_partitions parameters is "
"deprecated and will be removed in a future version. "
"Please migrate to the new unified API:\n"
" # Old (deprecated):\n"
" table.create_index('l2', vector_column_name='my_vector')\n"
" # New (recommended):\n"
" table.create_index('my_vector', config=IvfPq(distance_type='l2'))",
DeprecationWarning,
stacklevel=2,
if accelerator is not None:
logging.warning(
"GPU accelerator is not yet supported on LanceDB cloud."
"If you have 100M+ vectors to index,"
"please contact us at contact@lancedb.com"
)
if replace is not None:
logging.warning(
"replace is not supported on LanceDB cloud."
"Existing indexes will always be replaced."
)
column = vector_column_name
if accelerator is not None:
logging.warning(
"GPU accelerator is not yet supported on LanceDB cloud."
"If you have 100M+ vectors to index,"
"please contact us at contact@lancedb.com"
)
if replace is not None:
logging.warning(
"replace is not supported on LanceDB cloud."
"Existing indexes will always be replaced."
)
idx_type = index_type.upper()
if idx_type == "VECTOR" or idx_type == "IVF_PQ":
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
)
elif idx_type == "IVF_RQ":
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
)
elif idx_type == "IVF_SQ":
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
elif idx_type == "IVF_HNSW_PQ":
raise ValueError(
"IVF_HNSW_PQ is not supported on LanceDB cloud."
"Please use IVF_HNSW_SQ instead."
)
elif idx_type == "IVF_HNSW_SQ":
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
elif idx_type == "IVF_HNSW_FLAT":
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
elif idx_type == "IVF_FLAT":
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
else:
raise ValueError(
f"Unknown vector index type: {idx_type}. Valid options are"
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
)
index_type = index_type.upper()
if index_type == "VECTOR" or index_type == "IVF_PQ":
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
num_bits=num_bits,
)
elif index_type == "IVF_RQ":
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
)
elif index_type == "IVF_SQ":
config = IvfSq(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_HNSW_PQ":
raise ValueError(
"IVF_HNSW_PQ is not supported on LanceDB cloud."
"Please use IVF_HNSW_SQ instead."
)
elif index_type == "IVF_HNSW_SQ":
config = HnswSq(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_HNSW_FLAT":
config = HnswFlat(distance_type=metric, num_partitions=num_partitions)
elif index_type == "IVF_FLAT":
config = IvfFlat(distance_type=metric, num_partitions=num_partitions)
else:
column = metric
raise ValueError(
f"Unknown vector index type: {index_type}. Valid options are"
" 'IVF_FLAT', 'IVF_PQ', 'IVF_RQ', 'IVF_SQ',"
" 'IVF_HNSW_PQ', 'IVF_HNSW_SQ', 'IVF_HNSW_FLAT'"
)
LOOP.run(
self._table.create_index(
column,
vector_column_name,
config=config,
wait_timeout=wait_timeout,
name=name,
@@ -402,37 +308,6 @@ class RemoteTable(Table):
)
)
def _is_legacy_create_index_call(
self,
first_arg: str,
config: Optional[IndexConfigType],
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
vector_column_name: str,
accelerator: Optional[str],
index_cache_size: Optional[int],
replace: Optional[bool],
) -> bool:
"""Detect if this is a legacy create_index call."""
if config is not None:
return False
if any(
x is not None
for x in (
num_partitions,
num_sub_vectors,
accelerator,
index_cache_size,
replace,
)
):
return True
if vector_column_name != VECTOR_COLUMN_NAME:
return True
if first_arg.lower() in KNOWN_METRICS:
return True
return False
def add(
self,
data: DATA,
@@ -778,11 +653,6 @@ class RemoteTable(Table):
) -> AlterColumnsResult:
return LOOP.run(self._table.alter_columns(*alterations))
def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
return LOOP.run(self._table.update_field_metadata(*updates))
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
@@ -798,10 +668,6 @@ class RemoteTable(Table):
"""Not supported on LanceDB Cloud."""
return LOOP.run(self._table.unset_lsm_write_spec())
def close_lsm_writers(self) -> None:
"""No-op on LanceDB Cloud (no local shard writers)."""
return LOOP.run(self._table.close_lsm_writers())
def drop_index(self, index_name: str):
return LOOP.run(self._table.drop_index(index_name))

View File

@@ -102,15 +102,8 @@ class LinearCombinationReranker(Reranker):
combined_list = []
for row_id, result in results.items():
# Convert vector distance to a relevance score in [0, 1] where
# higher is better. Missing vector entries are penalised with
# `_invert_score(fill)` = 1 - fill (= 0.0 for the default fill=1).
vector_score = self._invert_score(result.get("_distance", fill))
# FTS scores (BM25) are already in a "higher = more relevant" space.
# Missing FTS entries are penalised symmetrically: we use
# `1 - fill` so that the same `fill` value drives both missing-vector
# and missing-FTS penalties in the same direction.
fts_score = result.get("_score", 1 - fill)
fts_score = result.get("_score", fill)
result["_relevance_score"] = self._combine_score(vector_score, fts_score)
combined_list.append(result)
@@ -130,12 +123,8 @@ class LinearCombinationReranker(Reranker):
return tbl
def _combine_score(self, vector_score, fts_score):
# Both vector_score (inverted distance) and fts_score are in a
# "higher = more relevant" space. A straight weighted average gives
# higher _relevance_score to better matches, as expected.
# Previously this returned `1 - (...)` which inverted the final
# ranking so that the *least* relevant document ranked first.
return self.weight * vector_score + (1 - self.weight) * fts_score
# these scores represent distance
return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score)
def _invert_score(self, dist: float):
# Invert the score between relevance and distance

View File

@@ -154,7 +154,6 @@ if TYPE_CHECKING:
AddColumnsResult,
AddResult,
AlterColumnsResult,
UpdateFieldMetadataResult,
DeleteResult,
DropColumnsResult,
LsmWriteSpec,
@@ -175,24 +174,6 @@ if TYPE_CHECKING:
DistanceType,
)
# Type alias for index configuration objects
IndexConfigType = Union[
IvfFlat,
IvfPq,
IvfSq,
IvfRq,
HnswFlat,
HnswPq,
HnswSq,
BTree,
Bitmap,
LabelList,
FTS,
]
# Known distance metrics for legacy API detection
KNOWN_METRICS = {"l2", "cosine", "dot", "hamming"}
def _into_pyarrow_reader(
data, schema: Optional[pa.Schema] = None
@@ -826,49 +807,11 @@ class Table(ABC):
"""
raise NotImplementedError
# New unified API overload
@overload
def create_index(
self,
column: str,
/,
*,
config: IndexConfigType,
replace: bool = ...,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
# Legacy API overload (deprecated)
@overload
def create_index(
self,
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
num_partitions: Optional[int] = ...,
num_sub_vectors: Optional[int] = ...,
vector_column_name: str = ...,
replace: bool = ...,
accelerator: Optional[str] = ...,
index_cache_size: Optional[int] = ...,
*,
index_type: VectorIndexType = ...,
wait_timeout: Optional[timedelta] = ...,
num_bits: int = ...,
max_iterations: int = ...,
sample_rate: int = ...,
m: int = ...,
ef_construction: int = ...,
name: Optional[str] = ...,
train: bool = ...,
target_partition_size: Optional[int] = ...,
) -> None: ...
def create_index(
self,
metric: DistanceType = "l2",
num_partitions: Optional[int] = None,
num_sub_vectors: Optional[int] = None,
metric="l2",
num_partitions=256,
num_sub_vectors=96,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
@@ -881,53 +824,46 @@ class Table(ABC):
sample_rate: int = 256,
m: int = 20,
ef_construction: int = 300,
config: Optional[IndexConfigType] = None,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on a column.
This method supports both the new unified API and the legacy API
for backwards compatibility. The new API takes the column name as the
first positional argument and an index configuration object via
``config``; the legacy API takes the distance metric as the first
argument plus separate ``vector_column_name`` / ``num_partitions`` /
etc. parameters, and emits a ``DeprecationWarning``.
"""Create an index on the table.
Parameters
----------
metric : str
For new API: the column name to index.
For legacy API: the distance metric ("l2", "cosine", "dot", "hamming").
config : IndexConfigType, optional
The index configuration object. If provided, uses the new unified API.
Can be one of: IvfFlat, IvfPq, IvfSq, IvfRq, HnswPq, HnswSq,
BTree, Bitmap, LabelList, FTS.
replace : bool, default True
Whether to replace an existing index on this column.
wait_timeout : timedelta, optional
Timeout to wait for async indexing to complete.
name : str, optional
Custom name for the index.
train : bool, default True
Whether to train the index with existing data.
metric: str, default "l2"
The distance metric to use when creating the index.
Valid values are "l2", "cosine", "dot", or "hamming".
l2 is euclidean distance.
Hamming is available only for binary vectors.
num_partitions: int, default 256
The number of IVF partitions to use when creating the index.
Default is 256.
num_sub_vectors: int, default 96
The number of PQ sub-vectors to use when creating the index.
Default is 96.
vector_column_name: str, default "vector"
The vector column name to create the index.
replace: bool, default True
- If True, replace the existing index if it exists.
Examples
--------
New API (recommended):
>>> table.create_index( # doctest: +SKIP
... "vector", config=IvfPq(distance_type="l2")
... )
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
Legacy API (deprecated):
>>> table.create_index( # doctest: +SKIP
... "l2", vector_column_name="vector"
... )
- If False, raise an error if duplicate index exists.
accelerator: str, default None
If set, use the given accelerator to create the index.
Only support "cuda" for now.
index_cache_size : int, optional
The size of the index cache in number of entries. Default value is 256.
num_bits: int
The number of bits to encode sub-vectors. Only used with the IVF_PQ index.
Only 4 and 8 are supported.
wait_timeout: timedelta, optional
The timeout to wait if indexing is asynchronous.
name: str, optional
The name of the index. If not provided, a default name will be generated.
train: bool, default True
Whether to train the index with existing data. Vector indices always train
with existing data.
"""
raise NotImplementedError
@@ -1252,7 +1188,7 @@ class Table(ABC):
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -1800,29 +1736,6 @@ class Table(ABC):
version: the new version number of the table after the alteration.
"""
@abstractmethod
def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
"""
Update per-field (column) metadata.
Parameters
----------
updates : dict
One or more dicts, each with:
- "path": str — dot-path to the field (e.g. "embedding" or "a.b.c").
- "metadata": dict[str, str | None] — keys to set; a value of ``None``
deletes that key.
- "replace": bool, optional — replace the field's whole metadata map
instead of merging (default False).
Returns
-------
UpdateFieldMetadataResult
version: the new table version after the update.
"""
@abstractmethod
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
"""
@@ -2337,51 +2250,11 @@ class LanceTable(Table):
dataset, allow_pyarrow_filter=False, batch_size=batch_size
)
# New unified API overload
@overload
def create_index(
self,
column: str,
/,
*,
config: IndexConfigType,
replace: bool = ...,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
) -> None: ...
# Legacy API overload (deprecated)
@overload
def create_index(
self,
metric: Literal["l2", "cosine", "dot", "hamming"] = ...,
num_partitions: Optional[int] = ...,
num_sub_vectors: Optional[int] = ...,
vector_column_name: str = ...,
replace: bool = ...,
accelerator: Optional[str] = ...,
index_cache_size: Optional[int] = ...,
num_bits: int = ...,
index_type: Literal[
"IVF_FLAT", "IVF_SQ", "IVF_PQ", "IVF_RQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = ...,
max_iterations: int = ...,
sample_rate: int = ...,
m: int = ...,
ef_construction: int = ...,
*,
wait_timeout: Optional[timedelta] = ...,
name: Optional[str] = ...,
train: bool = ...,
target_partition_size: Optional[int] = ...,
) -> None: ...
def create_index(
self,
metric: str = "l2",
num_partitions: Optional[int] = None,
num_sub_vectors: Optional[int] = None,
metric: DistanceType = "l2",
num_partitions=None,
num_sub_vectors=None,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
@@ -2401,232 +2274,47 @@ class LanceTable(Table):
m: int = 20,
ef_construction: int = 300,
*,
config: Optional[IndexConfigType] = None,
wait_timeout: Optional[timedelta] = None,
name: Optional[str] = None,
train: bool = True,
target_partition_size: Optional[int] = None,
):
"""Create an index on a column.
This method supports both the new unified API and the legacy API
for backwards compatibility. The new API takes the column name as the
first positional argument and an index configuration object via
``config``; the legacy API takes the distance metric as the first
argument plus separate ``vector_column_name`` / ``num_partitions`` /
etc. parameters, and emits a ``DeprecationWarning``.
Parameters
----------
metric : str
For new API: the column name to index.
For legacy API: the distance metric ("l2", "cosine", "dot", "hamming").
config : IndexConfigType, optional
The index configuration object. If provided, uses the new unified API.
Can be one of: IvfFlat, IvfPq, IvfSq, IvfRq, HnswPq, HnswSq,
BTree, Bitmap, LabelList, FTS.
replace : bool, default True
Whether to replace an existing index on this column.
wait_timeout : timedelta, optional
Timeout to wait for async indexing to complete.
name : str, optional
Custom name for the index.
train : bool, default True
Whether to train the index with existing data.
Examples
--------
New API (recommended):
>>> table.create_index( # doctest: +SKIP
... "vector", config=IvfPq(distance_type="l2")
... )
>>> table.create_index("category", config=BTree()) # doctest: +SKIP
>>> table.create_index("content", config=FTS()) # doctest: +SKIP
Legacy API (deprecated):
>>> table.create_index( # doctest: +SKIP
... "l2", vector_column_name="vector"
... )
"""
# Detect whether this is a legacy API call
is_legacy = self._is_legacy_create_index_call(
metric,
config,
num_partitions,
num_sub_vectors,
vector_column_name,
accelerator,
index_cache_size,
)
if is_legacy:
warnings.warn(
"The create_index() API with metric/num_partitions parameters is "
"deprecated and will be removed in a future version. "
"Please migrate to the new unified API:\n"
" # Old (deprecated):\n"
" table.create_index('l2', vector_column_name='my_vector')\n"
" # New (recommended):\n"
" table.create_index('my_vector', config=IvfPq(distance_type='l2'))",
DeprecationWarning,
stacklevel=2,
)
# Legacy API: first arg is the distance metric
column = vector_column_name
# Build config from legacy parameters
config = self._build_vector_config_from_legacy_params(
metric=metric,
"""Create an index on the table."""
if accelerator is not None:
# accelerator is only supported through pylance.
self.to_lance().create_index(
column=vector_column_name,
index_type=index_type,
metric=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
replace=replace,
accelerator=accelerator,
index_cache_size=index_cache_size,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
# Handle accelerator through pylance
if accelerator is not None:
self.to_lance().create_index(
column=column,
index_type=index_type,
metric=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
replace=replace,
accelerator=accelerator,
index_cache_size=index_cache_size,
num_bits=num_bits,
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
)
self.checkout_latest()
return
else:
# New API: metric is the column name
column = metric
# Check if config has accelerator set and dispatch to pylance
if config is not None and hasattr(config, "accelerator"):
acc = getattr(config, "accelerator", None)
if acc is not None:
# Dispatch to pylance for GPU acceleration
index_type_map = {
"IvfFlat": "IVF_FLAT",
"IvfSq": "IVF_SQ",
"IvfPq": "IVF_PQ",
"IvfRq": "IVF_RQ",
"HnswPq": "IVF_HNSW_PQ",
"HnswSq": "IVF_HNSW_SQ",
}
cfg_type = type(config).__name__
lance_index_type = index_type_map.get(cfg_type, "IVF_PQ")
self.to_lance().create_index(
column=column,
index_type=lance_index_type,
metric=getattr(config, "distance_type", "l2"),
num_partitions=getattr(config, "num_partitions", None),
num_sub_vectors=getattr(config, "num_sub_vectors", None),
replace=replace,
accelerator=acc,
num_bits=getattr(config, "num_bits", 8),
m=getattr(config, "m", 20),
ef_construction=getattr(config, "ef_construction", 300),
target_partition_size=getattr(
config, "target_partition_size", None
),
)
self.checkout_latest()
return
return LOOP.run(
self._table.create_index(
column,
replace=replace,
config=config,
wait_timeout=wait_timeout,
name=name,
train=train,
)
)
def _is_legacy_create_index_call(
self,
first_arg: str,
config: Optional[IndexConfigType],
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
vector_column_name: str,
accelerator: Optional[str],
index_cache_size: Optional[int],
) -> bool:
"""Detect if this is a legacy create_index call."""
# If config is provided, it's definitely the new API
if config is not None:
return False
# If old-style parameters were explicitly set, it's legacy
if any(
x is not None
for x in (num_partitions, num_sub_vectors, accelerator, index_cache_size)
):
return True
# If vector_column_name differs from default, it's legacy
if vector_column_name != VECTOR_COLUMN_NAME:
return True
# If first arg is a known metric, assume legacy
if first_arg.lower() in KNOWN_METRICS:
return True
# Otherwise assume new API
return False
def _build_vector_config_from_legacy_params(
self,
metric: str,
index_type: str,
num_partitions: Optional[int],
num_sub_vectors: Optional[int],
num_bits: int,
max_iterations: int,
sample_rate: int,
m: int,
ef_construction: int,
target_partition_size: Optional[int],
accelerator: Optional[str],
) -> IndexConfigType:
"""Build an index config object from legacy parameters."""
if index_type == "IVF_FLAT":
return IvfFlat(
self.checkout_latest()
return
elif index_type == "IVF_FLAT":
config = IvfFlat(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_SQ":
return IvfSq(
config = IvfSq(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_PQ":
return IvfPq(
config = IvfPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
@@ -2634,20 +2322,18 @@ class LanceTable(Table):
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_RQ":
return IvfRq(
config = IvfRq(
distance_type=metric,
num_partitions=num_partitions,
num_bits=num_bits,
max_iterations=max_iterations,
sample_rate=sample_rate,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_HNSW_PQ":
return HnswPq(
config = HnswPq(
distance_type=metric,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
@@ -2657,10 +2343,9 @@ class LanceTable(Table):
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_HNSW_SQ":
return HnswSq(
config = HnswSq(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
@@ -2668,10 +2353,9 @@ class LanceTable(Table):
m=m,
ef_construction=ef_construction,
target_partition_size=target_partition_size,
accelerator=accelerator,
)
elif index_type == "IVF_HNSW_FLAT":
return HnswFlat(
config = HnswFlat(
distance_type=metric,
num_partitions=num_partitions,
max_iterations=max_iterations,
@@ -2683,6 +2367,16 @@ class LanceTable(Table):
else:
raise ValueError(f"Unknown index type {index_type}")
return LOOP.run(
self._table.create_index(
vector_column_name,
replace=replace,
config=config,
name=name,
train=train,
)
)
def drop_index(self, name: str) -> None:
"""
Drops an index from the table
@@ -2782,11 +2476,6 @@ class LanceTable(Table):
"""
return LOOP.run(self._table.latest_storage_options())
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=BTree()/Bitmap()/LabelList() instead.",
)
def create_scalar_index(
self,
column: str,
@@ -2795,12 +2484,6 @@ class LanceTable(Table):
index_type: ScalarIndexType = "BTREE",
name: Optional[str] = None,
):
"""Create a scalar index on a column.
.. deprecated:: 0.25.0
Use :meth:`create_index` with a BTree, Bitmap, or LabelList config instead.
Example: ``table.create_index("column", config=BTree())``
"""
if index_type == "BTREE":
config = BTree()
elif index_type == "BITMAP":
@@ -2813,11 +2496,6 @@ class LanceTable(Table):
self._table.create_index(column, replace=replace, config=config, name=name)
)
@deprecation.deprecated(
deprecated_in="0.25.0",
current_version=__version__,
details="Use create_index() with config=FTS() instead.",
)
def create_fts_index(
self,
field_names: Union[str, List[str]],
@@ -2841,12 +2519,6 @@ class LanceTable(Table):
prefix_only: bool = False,
name: Optional[str] = None,
):
"""Create a full-text search index on a column.
.. deprecated:: 0.25.0
Use :meth:`create_index` with an FTS config instead.
Example: ``table.create_index("text_column", config=FTS())``
"""
self._ensure_no_legacy_fts_index()
if use_tantivy:
@@ -3607,11 +3279,6 @@ class LanceTable(Table):
) -> AlterColumnsResult:
return LOOP.run(self._table.alter_columns(*alterations))
def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
return LOOP.run(self._table.update_field_metadata(*updates))
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
@@ -3630,11 +3297,6 @@ class LanceTable(Table):
[`AsyncTable.unset_lsm_write_spec`][lancedb.AsyncTable.unset_lsm_write_spec]."""
return LOOP.run(self._table.unset_lsm_write_spec())
def close_lsm_writers(self) -> None:
"""Close cached MemWAL shard writers. See
[`AsyncTable.close_lsm_writers`][lancedb.AsyncTable.close_lsm_writers]."""
return LOOP.run(self._table.close_lsm_writers())
def uses_v2_manifest_paths(self) -> bool:
"""
Check if the table is using the new v2 manifest paths.
@@ -4243,16 +3905,6 @@ class AsyncTable:
"""
await self._inner.unset_lsm_write_spec()
async def close_lsm_writers(self) -> None:
"""Drain and close any cached MemWAL shard writers for this table.
When an LSM write spec is installed, `merge_insert` opens MemWAL shard
writers and caches them for reuse across calls. This closes them,
flushing pending data; writers reopen lazily on the next
`merge_insert`. It is a no-op when no writers are cached.
"""
await self._inner.close_lsm_writers()
@property
def name(self) -> str:
"""The name of the table."""
@@ -4703,7 +4355,7 @@ class AsyncTable:
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> res
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1, num_rows=3)
MergeResult(version=2, num_updated_rows=2, num_inserted_rows=1, num_deleted_rows=0, num_attempts=1)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
@@ -5083,8 +4735,6 @@ class AsyncTable:
when_not_matched_by_source_condition=merge._when_not_matched_by_source_condition,
timeout=merge._timeout,
use_index=merge._use_index,
use_lsm_write=merge._use_lsm_write,
validate_single_shard=merge._validate_single_shard,
),
)
@@ -5263,13 +4913,6 @@ class AsyncTable:
"""
return await self._inner.alter_columns(alterations)
async def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
"""Update per-field metadata. See
[`Table.update_field_metadata`][lancedb.table.Table.update_field_metadata]."""
return await self._inner.update_field_metadata(updates)
async def drop_columns(self, columns: Iterable[str]):
"""
Drop columns from the table.

View File

@@ -57,7 +57,7 @@ async def test_upsert_async(mem_db_async):
await table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=1, num_deleted_rows=0, num_rows=2)
# num_inserted_rows=1, num_deleted_rows=0)
# --8<-- [end:upsert_basic_async]
assert await table.count_rows() == 3
assert res.version == 2
@@ -86,7 +86,7 @@ def test_insert_if_not_exists(mem_db):
table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=0,
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
# num_inserted_rows=1, num_deleted_rows=0)
# --8<-- [end:insert_if_not_exists]
assert table.count_rows() == 3
assert res.version == 2
@@ -116,7 +116,7 @@ async def test_insert_if_not_exists_async(mem_db_async):
await table.count_rows() # 3
res
# MergeResult(version=2, num_updated_rows=0,
# num_inserted_rows=1, num_deleted_rows=0, num_rows=1)
# num_inserted_rows=1, num_deleted_rows=0)
# --8<-- [end:insert_if_not_exists]
assert await table.count_rows() == 3
assert res.version == 2
@@ -150,7 +150,7 @@ def test_replace_range(mem_db):
table.count_rows("doc_id = 1") # 1
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
# num_inserted_rows=0, num_deleted_rows=1)
# --8<-- [end:insert_if_not_exists]
assert table.count_rows("doc_id = 1") == 1
assert res.version == 2
@@ -185,7 +185,7 @@ async def test_replace_range_async(mem_db_async):
await table.count_rows("doc_id = 1") # 1
res
# MergeResult(version=2, num_updated_rows=1,
# num_inserted_rows=0, num_deleted_rows=1, num_rows=1)
# num_inserted_rows=0, num_deleted_rows=1)
# --8<-- [end:insert_if_not_exists]
assert await table.count_rows("doc_id = 1") == 1
assert res.version == 2

View File

@@ -466,8 +466,7 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d{20}\.manifest", manifest)
assert re.match(r"\d{20}\.manifest", manifest)
# Start a table in V1 mode then migrate
tbl = await db_no_v2_paths.create_table(
@@ -477,15 +476,13 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert not await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d\.manifest", manifest)
assert re.match(r"\d\.manifest", manifest)
await tbl.migrate_manifest_paths_v2()
assert await tbl.uses_v2_manifest_paths()
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d{20}\.manifest", manifest)
assert re.match(r"\d{20}\.manifest", manifest)
@pytest.mark.asyncio

View File

@@ -215,12 +215,11 @@ def test_reject_legacy_tantivy_index(table):
@pytest.mark.parametrize("with_position", [True, False])
def test_create_inverted_index(table, with_position):
with pytest.warns(DeprecationWarning, match="create_fts_index"):
table.create_fts_index(
"text",
with_position=with_position,
name="custom_fts_index",
)
table.create_fts_index(
"text",
with_position=with_position,
name="custom_fts_index",
)
indices = table.list_indices()
fts_indices = [i for i in indices if i.index_type == "FTS"]
assert any(i.name == "custom_fts_index" for i in fts_indices)

View File

@@ -162,13 +162,12 @@ async def test_create_bitmap_index(some_table: AsyncTable):
await some_table.create_index("data", config=Bitmap())
indices = await some_table.list_indices()
assert len(indices) == 3
# list_indices returns indices in alphabetical order by name
assert indices[0].index_type == "Bitmap"
assert indices[0].columns == ["data"]
assert indices[0].columns == ["id"]
assert indices[1].index_type == "Bitmap"
assert indices[1].columns == ["id"]
assert indices[1].columns == ["is_active"]
assert indices[2].index_type == "Bitmap"
assert indices[2].columns == ["is_active"]
assert indices[2].columns == ["data"]
index_name = indices[0].name
stats = await some_table.index_stats(index_name)

View File

@@ -40,6 +40,16 @@ def _make_table(tmp_path):
def test_set_lsm_write_spec_validates(tmp_path):
_db, table = _make_table(tmp_path)
# No PK set yet.
with pytest.raises(Exception, match="primary key"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.set_unenforced_primary_key("id")
# Column mismatch.
with pytest.raises(Exception, match="match"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
# Out-of-range num_buckets.
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
@@ -60,6 +70,7 @@ def test_unset_lsm_write_spec(tmp_path):
table.unset_lsm_write_spec()
# Install a spec, then remove it; afterwards a fresh spec can be set.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.unset_lsm_write_spec()
# A second unset errors — there is no spec left to remove.

View File

@@ -1,196 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
"""Tests for the MemWAL LSM ``merge_insert`` dispatch."""
from datetime import timedelta
import lancedb
import pyarrow as pa
import pytest
from lancedb._lancedb import LsmWriteSpec
SCHEMA = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("value", pa.int64(), nullable=False),
]
)
REGION_SCHEMA = pa.schema(
[
pa.field("id", pa.int64(), nullable=False),
pa.field("region", pa.utf8(), nullable=False),
]
)
def _reader(ids):
batch = pa.RecordBatch.from_arrays(
[
pa.array(ids, type=pa.int64()),
pa.array(list(range(len(ids))), type=pa.int64()),
],
schema=SCHEMA,
)
return pa.RecordBatchReader.from_batches(SCHEMA, [batch])
def _region_reader(rows):
batch = pa.RecordBatch.from_arrays(
[
pa.array([row[0] for row in rows], type=pa.int64()),
pa.array([row[1] for row in rows], type=pa.utf8()),
],
schema=REGION_SCHEMA,
)
return pa.RecordBatchReader.from_batches(REGION_SCHEMA, [batch])
def _bucket_table(tmp_path):
"""A table with ``id`` as the primary key and a single-bucket LSM spec."""
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
# num_buckets = 1: every row routes to the single bucket.
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
return table
def test_lsm_merge_insert_bucket(tmp_path):
table = _bucket_table(tmp_path)
# Empty `on` defaults to the primary key.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([3, 4, 5]))
)
# LSM path: rows go to the MemWAL, so only num_rows is populated.
assert result.num_rows == 3
assert result.version == 0
assert result.num_inserted_rows == 0
assert result.num_updated_rows == 0
def test_lsm_merge_insert_unsharded(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.unsharded())
result = (
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([10, 11, 12, 13]))
)
assert result.num_rows == 4
def test_lsm_merge_insert_identity(tmp_path):
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _region_reader([(1, "us"), (2, "us")]))
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.identity("region"))
# All rows share one identity value, so they route to one shard.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_region_reader([(3, "us"), (4, "us")]))
)
assert result.num_rows == 2
def test_lsm_merge_insert_use_lsm_write_false(tmp_path):
table = _bucket_table(tmp_path) # rows id = 1, 2, 3
# use_lsm_write(False) opts out: the standard path runs and commits.
result = (
table.merge_insert("id")
.when_not_matched_insert_all()
.use_lsm_write(False)
.execute(_reader([3, 4, 5]))
)
assert result.num_inserted_rows == 2
assert table.count_rows() == 5
def test_lsm_merge_insert_validate_single_shard_off(tmp_path):
table = _bucket_table(tmp_path)
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.validate_single_shard(False)
.execute(_reader([6, 7, 8]))
)
assert result.num_rows == 3
def test_lsm_merge_insert_use_lsm_write_true_requires_spec(tmp_path):
# A table with a primary key but no LSM write spec installed.
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(seconds=0))
table = db.create_table("t", _reader([1, 2, 3]))
table.set_unenforced_primary_key("id")
with pytest.raises(Exception, match="use_lsm_write"):
(
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.use_lsm_write(True)
.execute(_reader([4]))
)
def test_lsm_merge_insert_rejects_on_not_primary_key(tmp_path):
table = _bucket_table(tmp_path)
with pytest.raises(Exception, match="primary key"):
(
table.merge_insert("value")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([1]))
)
def test_lsm_merge_insert_rejects_non_upsert(tmp_path):
table = _bucket_table(tmp_path)
# Insert-only (no when_matched_update_all) is not the upsert shape.
with pytest.raises(Exception, match="upsert"):
table.merge_insert([]).when_not_matched_insert_all().execute(_reader([4]))
def test_lsm_close_writers(tmp_path):
table = _bucket_table(tmp_path)
(
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([7, 8]))
)
table.close_lsm_writers()
# The writer reopens lazily on the next merge_insert.
result = (
table.merge_insert([])
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(_reader([9]))
)
assert result.num_rows == 1
@pytest.mark.asyncio
async def test_async_lsm_merge_insert(tmp_path):
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=0)
)
table = await db.create_table("t", _reader([1, 2, 3]))
await table.set_unenforced_primary_key("id")
await table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 1))
builder = (
table.merge_insert([]).when_matched_update_all().when_not_matched_insert_all()
)
result = await builder.execute(_reader([3, 4, 5]))
assert result.num_rows == 3
await table.close_lsm_writers()

View File

@@ -362,22 +362,6 @@ def test_table_create_indices():
schema=dict(
fields=[
dict(name="id", type={"type": "int64"}, nullable=False),
dict(name="text", type={"type": "string"}, nullable=False),
dict(
name="vector",
type={
"type": "fixed_size_list",
"fields": [
dict(
name="item",
type={"type": "float"},
nullable=True,
)
],
"length": 2,
},
nullable=False,
),
]
),
)
@@ -436,25 +420,22 @@ def test_table_create_indices():
# This is a smoke-test.
table = db.create_table("test", [{"id": 1}])
# Test create_scalar_index with custom name (legacy method)
with pytest.warns(DeprecationWarning, match="create_scalar_index"):
table.create_scalar_index(
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
)
# Test create_scalar_index with custom name
table.create_scalar_index(
"id", wait_timeout=timedelta(seconds=2), name="custom_scalar_idx"
)
# Test create_fts_index with custom name (legacy method)
with pytest.warns(DeprecationWarning, match="create_fts_index"):
table.create_fts_index(
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
)
# Test create_fts_index with custom name
table.create_fts_index(
"text", wait_timeout=timedelta(seconds=2), name="custom_fts_idx"
)
# Test create_index with custom name (legacy form: vector_column_name kwarg)
with pytest.warns(DeprecationWarning, match="create_index"):
table.create_index(
vector_column_name="vector",
wait_timeout=timedelta(seconds=10),
name="custom_vector_idx",
)
# Test create_index with custom name
table.create_index(
vector_column_name="vector",
wait_timeout=timedelta(seconds=10),
name="custom_vector_idx",
)
# Validate that the name parameter was passed correctly in requests
assert len(received_requests) == 3
@@ -483,98 +464,6 @@ def test_table_create_indices():
table.drop_index("custom_fts_idx")
def test_remote_create_index_new_api():
received_requests = []
def handler(request):
if request.path == "/v1/table/test/create_index/":
content_len = int(request.headers.get("Content-Length", 0))
body = request.rfile.read(content_len) if content_len > 0 else b""
received_requests.append(json.loads(body) if body else {})
request.send_response(200)
request.end_headers()
elif request.path == "/v1/table/test/create/?mode=create":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"{}")
elif request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(
json.dumps(
dict(
version=1,
schema=dict(
fields=[
dict(name="id", type={"type": "int64"}, nullable=False),
dict(
name="category",
type={"type": "string"},
nullable=False,
),
dict(
name="text", type={"type": "string"}, nullable=False
),
dict(
name="vector",
type={
"type": "fixed_size_list",
"fields": [
dict(
name="item",
type={"type": "float"},
nullable=True,
)
],
"length": 2,
},
nullable=False,
),
]
),
)
).encode()
)
else:
request.send_response(404)
request.end_headers()
from lancedb.index import BTree, FTS, IvfPq, IvfRq
with mock_lancedb_connection(handler) as db:
table = db.create_table("test", [{"id": 1}])
# New API: column-first, config= kwarg. Should NOT emit DeprecationWarning.
import warnings as _warnings
with _warnings.catch_warnings():
_warnings.simplefilter("error", DeprecationWarning)
table.create_index("vector", config=IvfPq(distance_type="l2"))
table.create_index("category", config=BTree())
table.create_index("text", config=FTS())
# IvfRq via new API
table.create_index("vector", config=IvfRq(distance_type="l2"))
# Legacy index_type="IVF_RQ" routes to IvfRq config under the hood.
with pytest.warns(DeprecationWarning, match="create_index"):
table.create_index(
vector_column_name="vector",
index_type="IVF_RQ",
num_partitions=8,
)
assert len(received_requests) == 5
assert [req["column"] for req in received_requests] == [
"vector",
"category",
"text",
"vector",
"vector",
]
def test_table_wait_for_index_timeout():
def handler(request):
index_stats = dict(

View File

@@ -603,89 +603,3 @@ def test_cross_encoder_reranker_return_all(tmp_path):
assert "_relevance_score" in result.column_names
assert "_score" in result.column_names
assert "_distance" in result.column_names
# ---------------------------------------------------------------------------
# Regression tests for LinearCombinationReranker scoring bugs (issue #3154)
# ---------------------------------------------------------------------------
def test_linear_combination_best_match_ranks_first():
"""
The document that is BOTH the closest vector match AND the only FTS match
must rank first. Previously _combine_score subtracted from 1, inverting
the ranking so the worst document ranked highest.
"""
reranker = LinearCombinationReranker(weight=0.7, return_score="all")
# rowid 0: perfect vector match, sole FTS match → should rank 1st
# rowid 1: mediocre vector, no FTS match
# rowid 2: bad vector, no FTS match
vector_results = pa.Table.from_pydict(
{
"_rowid": [0, 1, 2],
"_distance": [0.0, 0.5, 0.9],
}
)
fts_results = pa.Table.from_pydict(
{
"_rowid": [0],
"_score": [1.0],
}
)
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
scores = dict(
zip(
combined["_rowid"].to_pylist(),
combined["_relevance_score"].to_pylist(),
)
)
# rowid 0 must have the highest relevance score
assert scores[0] > scores[1], (
f"Best match (rowid 0, score={scores[0]:.4f}) should beat "
f"mid match (rowid 1, score={scores[1]:.4f})"
)
assert scores[1] > scores[2], (
f"Mid match (rowid 1, score={scores[1]:.4f}) should beat "
f"bad match (rowid 2, score={scores[2]:.4f})"
)
def test_linear_combination_missing_fts_is_penalised():
"""
A document with no FTS match must score *lower* than a document that
has a mediocre FTS match, everything else being equal. Previously
missing-FTS entries used fill=1.0 directly, which gave them a reward
(via the 1-(...) inversion) instead of a penalty.
"""
reranker = LinearCombinationReranker(weight=0.5, return_score="all")
vector_results = pa.Table.from_pydict(
{
"_rowid": [0, 1],
"_distance": [0.2, 0.2], # identical vector scores
}
)
fts_results = pa.Table.from_pydict(
{
"_rowid": [0], # rowid 1 has no FTS match
"_score": [0.3], # small FTS score
}
)
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
scores = dict(
zip(
combined["_rowid"].to_pylist(),
combined["_relevance_score"].to_pylist(),
)
)
# rowid 0 has a small FTS score; rowid 1 has none.
# Even a small FTS contribution should beat having none at all.
assert scores[0] > scores[1], (
f"Document with FTS score (rowid 0, {scores[0]:.4f}) should beat "
f"document with no FTS match (rowid 1, {scores[1]:.4f})"
)

View File

@@ -4,7 +4,6 @@
import os
import sys
import warnings
from datetime import date, datetime, timedelta
from time import sleep
from typing import List
@@ -12,7 +11,7 @@ from unittest.mock import patch
import lancedb
from lancedb.dependencies import _PANDAS_AVAILABLE
from lancedb.index import BTree, FTS, HnswFlat, HnswPq, HnswSq, IvfPq
from lancedb.index import HnswFlat, HnswPq, HnswSq, IvfPq
import numpy as np
import polars as pl
import pyarrow as pa
@@ -929,12 +928,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
num_bits=4,
)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name=None, train=True
)
# Test with target_partition_size
@@ -954,12 +948,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
target_partition_size=8192,
)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name=None, train=True
)
# target_partition_size has a default value,
@@ -978,12 +967,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
num_bits=4,
)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name=None, train=True
)
table.create_index(
@@ -994,12 +978,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
)
expected_config = HnswPq(distance_type="dot")
mock_create_index.assert_called_with(
"my_vector",
replace=False,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"my_vector", replace=False, config=expected_config, name=None, train=True
)
table.create_index(
@@ -1014,12 +993,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
)
mock_create_index.assert_called_with(
"my_vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"my_vector", replace=True, config=expected_config, name=None, train=True
)
table.create_index(
@@ -1034,12 +1008,7 @@ def test_create_index_method(mock_create_index, mem_db: DBConnection):
distance_type="cosine", sample_rate=0.1, m=29, ef_construction=10
)
mock_create_index.assert_called_with(
"my_vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=True,
"my_vector", replace=True, config=expected_config, name=None, train=True
)
@@ -1063,7 +1032,6 @@ def test_create_index_name_and_train_parameters(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name="my_custom_index",
train=True,
)
@@ -1071,82 +1039,13 @@ def test_create_index_name_and_train_parameters(
# Test with train=False
table.create_index(vector_column_name="vector", train=False)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name=None,
train=False,
"vector", replace=True, config=expected_config, name=None, train=False
)
# Test with both name and train
table.create_index(vector_column_name="vector", name="my_index_name", train=True)
mock_create_index.assert_called_with(
"vector",
replace=True,
config=expected_config,
wait_timeout=None,
name="my_index_name",
train=True,
)
@patch("lancedb.table.AsyncTable.create_index")
def test_create_index_legacy_emits_deprecation_warning(
mock_create_index, mem_db: DBConnection
):
table = mem_db.create_table(
"test",
data=[{"vector": [3.1, 4.1]}, {"vector": [5.9, 26.5]}],
)
with pytest.warns(DeprecationWarning, match="create_index"):
table.create_index(metric="l2", num_partitions=8, vector_column_name="vector")
@patch("lancedb.table.AsyncTable.create_index")
def test_create_index_new_api(mock_create_index, mem_db: DBConnection):
table = mem_db.create_table(
"test",
data=[
{"vector": [3.1, 4.1], "category": "a", "text": "hello world"},
{"vector": [5.9, 26.5], "category": "b", "text": "goodbye"},
],
)
# Vector index via new API should not warn
with warnings.catch_warnings():
warnings.simplefilter("error", DeprecationWarning)
table.create_index("vector", config=IvfPq(distance_type="l2"))
mock_create_index.assert_called_with(
"vector",
replace=True,
config=IvfPq(distance_type="l2"),
wait_timeout=None,
name=None,
train=True,
)
# Scalar index via new API
table.create_index("category", config=BTree())
mock_create_index.assert_called_with(
"category",
replace=True,
config=BTree(),
wait_timeout=None,
name=None,
train=True,
)
# FTS index via new API
table.create_index("text", config=FTS(with_position=True))
mock_create_index.assert_called_with(
"text",
replace=True,
config=FTS(with_position=True),
wait_timeout=None,
name=None,
train=True,
"vector", replace=True, config=expected_config, name="my_index_name", train=True
)
@@ -1962,9 +1861,8 @@ def test_create_scalar_index(mem_db: DBConnection):
"my_table",
data=test_data,
)
# Test with default name; confirm DeprecationWarning fires
with pytest.warns(DeprecationWarning, match="create_scalar_index"):
table.create_scalar_index("x")
# Test with default name
table.create_scalar_index("x")
indices = table.list_indices()
assert len(indices) == 1
scalar_index = indices[0]
@@ -2472,30 +2370,6 @@ def test_alter_columns(mem_db: DBConnection):
assert table.to_arrow().column_names == ["new_id"]
def test_update_field_metadata(mem_db: DBConnection):
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
table = mem_db.create_table("my_table", data=data)
res = table.update_field_metadata(
{"path": "category", "metadata": {"unit": "label", "pii": "false"}}
)
assert res.version == 2
# Arrow field metadata is bytes-keyed
assert table.schema.field("category").metadata == {
b"unit": b"label",
b"pii": b"false",
}
# merge: add a key, delete one via None, keep the rest
table.update_field_metadata(
{"path": "category", "metadata": {"source": "import", "pii": None}}
)
assert table.schema.field("category").metadata == {
b"unit": b"label",
b"source": b"import",
}
@pytest.mark.asyncio
async def test_alter_columns_async(mem_db_async: AsyncConnection):
data = pa.table({"id": [0, 1]})

View File

@@ -16,7 +16,7 @@ use query::{FTSQuery, HybridQuery, Query, VectorQuery};
use session::Session;
use table::{
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
MergeResult, Table, UpdateFieldMetadataResult, UpdateResult,
MergeResult, Table, UpdateResult,
};
pub mod arrow;
@@ -50,7 +50,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<RecordBatchStream>()?;
m.add_class::<AddColumnsResult>()?;
m.add_class::<AlterColumnsResult>()?;
m.add_class::<UpdateFieldMetadataResult>()?;
m.add_class::<AddResult>()?;
m.add_class::<MergeResult>()?;
m.add_class::<LsmWriteSpec>()?;

View File

@@ -16,8 +16,8 @@ use arrow::{
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
};
use lancedb::table::{
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
Table as LanceDbTable,
};
use pyo3::{
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
@@ -143,20 +143,18 @@ pub struct MergeResult {
pub num_inserted_rows: u64,
pub num_deleted_rows: u64,
pub num_attempts: u32,
pub num_rows: u64,
}
#[pymethods]
impl MergeResult {
pub fn __repr__(&self) -> String {
format!(
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={}, num_rows={})",
"MergeResult(version={}, num_updated_rows={}, num_inserted_rows={}, num_deleted_rows={}, num_attempts={})",
self.version,
self.num_updated_rows,
self.num_inserted_rows,
self.num_deleted_rows,
self.num_attempts,
self.num_rows
self.num_attempts
)
}
}
@@ -169,7 +167,6 @@ impl From<lancedb::table::MergeResult> for MergeResult {
num_inserted_rows: result.num_inserted_rows,
num_deleted_rows: result.num_deleted_rows,
num_attempts: result.num_attempts,
num_rows: result.num_rows,
}
}
}
@@ -197,12 +194,6 @@ impl LsmWriteSpec {
}
/// Identity sharding — shard by the raw value of `column`.
///
/// `column` must be a deterministic function of the unenforced primary
/// key: every row with a given primary key must always produce the same
/// `column` value, or upserts of that key can land in different shards
/// and a stale version can win. Typically `column` is the primary key
/// itself or a stable attribute of it.
#[staticmethod]
pub fn identity(column: String) -> Self {
Self {
@@ -357,27 +348,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct UpdateFieldMetadataResult {
pub version: u64,
}
#[pymethods]
impl UpdateFieldMetadataResult {
pub fn __repr__(&self) -> String {
format!("UpdateFieldMetadataResult(version={})", self.version)
}
}
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
fn from(result: lancedb::table::UpdateFieldMetadataResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct DropColumnsResult {
@@ -963,12 +933,6 @@ impl Table {
if let Some(use_index) = parameters.use_index {
builder.use_index(use_index);
}
if let Some(use_lsm_write) = parameters.use_lsm_write {
builder.use_lsm_write(use_lsm_write);
}
if let Some(validate_single_shard) = parameters.validate_single_shard {
builder.validate_single_shard(validate_single_shard);
}
future_into_py(self_.py(), async move {
let res = builder.execute(Box::new(batches)).await.infer_error()?;
@@ -1007,13 +971,6 @@ impl Table {
})
}
pub fn close_lsm_writers(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.close_lsm_writers().await.infer_error()
})
}
pub fn uses_v2_manifest_paths(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
@@ -1148,45 +1105,6 @@ impl Table {
Ok(())
})
}
pub fn update_field_metadata<'a>(
self_: PyRef<'a, Self>,
updates: Vec<Bound<PyDict>>,
) -> PyResult<Bound<'a, PyAny>> {
let updates = updates
.iter()
.map(|update| {
let path: String = update
.get_item("path")?
.ok_or_else(|| PyValueError::new_err("Missing path"))?
.extract()?;
let mut field_update = FieldMetadataUpdate::new(path);
if let Some(metadata) = update.get_item("metadata")? {
let metadata_dict = metadata.cast::<PyDict>()?;
for (key, value) in metadata_dict.iter() {
let key: String = key.extract()?;
if value.is_none() {
field_update = field_update.remove(key);
} else {
field_update = field_update.set(key, value.extract::<String>()?);
}
}
}
if let Some(replace) = update.get_item("replace")?
&& replace.extract::<bool>()?
{
field_update = field_update.replace();
}
Ok(field_update)
})
.collect::<PyResult<Vec<_>>>()?;
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let result = inner.update_field_metadata(&updates).await.infer_error()?;
Ok(UpdateFieldMetadataResult::from(result))
})
}
}
#[derive(FromPyObject)]
@@ -1206,8 +1124,6 @@ pub struct MergeInsertParams {
when_not_matched_by_source_condition: Option<String>,
timeout: Option<std::time::Duration>,
use_index: Option<bool>,
use_lsm_write: Option<bool>,
validate_single_shard: Option<bool>,
}
#[pyclass]

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.95.0"
channel = "1.94.0"

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.30.0-beta.1"
version = "0.30.0-beta.0"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -75,7 +75,7 @@ reqwest = { version = "0.12.0", default-features = false, features = [
"stream",
], optional = true }
http = { version = "1", optional = true } # Matching what is in reqwest
uuid = { version = "1.7.0", features = ["v4", "v5"] }
uuid = { version = "1.7.0", features = ["v4"] }
polars-arrow = { version = ">=0.37,<0.40.0", optional = true }
polars = { version = ">=0.37,<0.40.0", optional = true }
hf-hub = { version = "0.4.1", optional = true, default-features = false, features = [
@@ -104,7 +104,6 @@ datafusion.workspace = true
http-body = "1" # Matching reqwest
rstest = "0.23.0"
test-log = "0.2"
serial_test = "3"
[features]

View File

@@ -812,7 +812,8 @@ impl ConnectBuilder {
self
}
/// The interval at which to check for updates from other processes.
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
@@ -824,11 +825,8 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// # Cost
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
@@ -888,7 +886,6 @@ impl ConnectBuilder {
options.host_override,
self.request.client_config,
storage_options.into(),
self.request.read_consistency_interval,
)?);
Ok(Connection {
internal,

View File

@@ -464,9 +464,11 @@ mod tests {
let mut iter = ids.into_iter().map(|o| o.unwrap());
while let Some(first) = iter.next() {
let rows_left_in_clump = if first == 4470 { 19 } else { 29 };
for expected_next in (first + 1)..=(first + rows_left_in_clump) {
let mut expected_next = first + 1;
for _ in 0..rows_left_in_clump {
let next = iter.next().unwrap();
assert_eq!(next, expected_next);
expected_next += 1;
}
}
}

View File

@@ -245,9 +245,6 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
pub(crate) sender: S,
pub(crate) id_delimiter: String,
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
/// Connection-level read consistency interval. Drives the
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
pub(crate) read_consistency_interval: Option<Duration>,
}
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
@@ -341,7 +338,6 @@ impl RestfulLanceDbClient<Sender> {
host_override: Option<String>,
default_headers: HeaderMap,
client_config: ClientConfig,
read_consistency_interval: Option<Duration>,
) -> Result<Self> {
// Get the timeouts
let timeout =
@@ -439,7 +435,6 @@ impl RestfulLanceDbClient<Sender> {
.clone()
.unwrap_or("$".to_string()),
header_provider: client_config.header_provider,
read_consistency_interval,
})
}
}
@@ -845,16 +840,6 @@ pub mod test_utils {
pub fn client_with_handler<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
client_with_handler_and_interval(handler, None)
}
pub fn client_with_handler_and_interval<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
read_consistency_interval: Option<Duration>,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
@@ -872,7 +857,6 @@ pub mod test_utils {
},
id_delimiter: "$".to_string(),
header_provider: None,
read_consistency_interval,
}
}
@@ -897,7 +881,6 @@ pub mod test_utils {
},
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
header_provider: config.header_provider,
read_consistency_interval: None,
}
}
}
@@ -905,18 +888,8 @@ pub mod test_utils {
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::time::Duration;
// Serializes the env-var-mutating tests below: cargo test runs tests in
// parallel, but several of these tests read and write the same process-
// global env vars (`LANCEDB_USER_ID*`), so they would race without this.
static ENV_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
fn lock_env() -> std::sync::MutexGuard<'static, ()> {
ENV_MUTEX.lock().unwrap_or_else(|e| e.into_inner())
}
#[test]
fn test_timeout_config_default() {
let config = TimeoutConfig::default();
@@ -1073,7 +1046,6 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1109,7 +1081,6 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1147,7 +1118,6 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Header provider errors should fail the request
@@ -1173,9 +1143,7 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_none() {
let _guard = lock_env();
let config = ClientConfig::default();
// Clear env vars that might be set from other tests
// SAFETY: This is only called in tests
@@ -1187,9 +1155,7 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
@@ -1203,9 +1169,7 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env_key() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::remove_var("LANCEDB_USER_ID");
@@ -1225,9 +1189,7 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_direct_takes_precedence() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "env-user-id");
@@ -1244,9 +1206,7 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_empty_env_ignored() {
let _guard = lock_env();
// SAFETY: This is only called in tests
unsafe {
std::env::set_var("LANCEDB_USER_ID", "");

View File

@@ -206,7 +206,6 @@ impl RemoteDatabase {
host_override: Option<String>,
client_config: ClientConfig,
options: RemoteOptions,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let parsed = super::client::parse_db_url(uri)?;
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
@@ -234,7 +233,6 @@ impl RemoteDatabase {
host_override,
header_map,
client_config.clone(),
read_consistency_interval,
)?;
let table_cache = Cache::builder()

File diff suppressed because it is too large Load Diff

View File

@@ -89,12 +89,10 @@ use futures::future::join_all;
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
pub use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::statistics::DatasetStatisticsExt;
use lance_index::frag_reuse::FRAG_REUSE_INDEX_NAME;
pub use lance_index::optimize::OptimizeOptions;
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
pub use schema_evolution::{
AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate,
UpdateFieldMetadataResult,
};
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
use serde_with::skip_serializing_none;
pub use update::{UpdateBuilder, UpdateResult};
@@ -255,36 +253,6 @@ pub enum Filter {
Datafusion(Expr),
}
/// A predicate for filtering rows in delete operations.
///
/// Accepts either a SQL string or a DataFusion [`Expr`]. Use the [`From`]
/// implementations to convert from `&str` or `&Expr` automatically.
/// See [`Table::delete`] for usage examples.
pub enum Predicate<'a> {
/// A SQL predicate string
String(&'a str),
/// A DataFusion logical expression
Expr(&'a Expr),
}
impl<'a> From<&'a str> for Predicate<'a> {
fn from(s: &'a str) -> Self {
Predicate::String(s)
}
}
impl<'a> From<&'a String> for Predicate<'a> {
fn from(s: &'a String) -> Self {
Predicate::String(s.as_str())
}
}
impl<'a> From<&'a Expr> for Predicate<'a> {
fn from(e: &'a Expr) -> Self {
Predicate::Expr(e)
}
}
#[async_trait]
pub trait Tags: Send + Sync {
/// List the tags of the table.
@@ -314,15 +282,17 @@ pub use self::merge::MergeResult;
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
/// `ShardWriter` configuration recorded in the MemWAL index).
///
/// All variants require the table to have an unenforced primary key.
///
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
/// onto the MemWAL writer is a follow-up.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum LsmWriteSpec {
/// Hash-bucket sharding by a scalar column.
/// Hash-bucket sharding by the unenforced primary key column.
///
/// `column` must be a non-nested column with a supported scalar type.
/// `num_buckets` must be in `[1, 1024]`.
/// `column` must equal the table's currently-set single-column
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
/// `bucket(column, num_buckets)` value is stable across processes.
Bucket {
@@ -369,14 +339,6 @@ impl LsmWriteSpec {
/// Construct an identity-sharding spec (shard by the raw value of
/// `column`) with no maintained indexes.
///
/// `column` must be a deterministic function of the unenforced primary
/// key: every row with a given primary key must always produce the same
/// `column` value. MemWAL dedups upserts by primary key but tracks
/// generations per shard, so if the same key is written with two
/// different `column` values its versions land in different shards and a
/// stale value can win. Typically `column` is the primary key itself, or
/// a stable attribute of it (e.g. a tenant id).
pub fn identity(column: impl Into<String>) -> Self {
Self::Identity {
column: column.into(),
@@ -529,8 +491,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
/// Add new records to the table.
async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
/// Delete rows from the table matching the given [`Predicate`].
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult>;
/// Delete rows from the table.
async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
/// Update rows in the table.
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
/// Create an index on the provided column(s).
@@ -591,13 +553,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
message: "unset_lsm_write_spec is not supported on this table type".into(),
})
}
/// Drain and close any cached MemWAL shard writers for this table.
///
/// The default implementation is a no-op; table types that maintain
/// MemWAL shard writers override it.
async fn close_lsm_writers(&self) -> Result<()> {
Ok(())
}
/// Gets the table tag manager.
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
/// Optimize the dataset.
@@ -663,19 +618,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
message: "create_insert_exec not implemented".to_string(),
})
}
/// Update per-field metadata. Merges into existing metadata by default;
/// [`FieldMetadataUpdate::remove`] deletes a key and
/// [`FieldMetadataUpdate::replace`] swaps the field's whole map.
///
/// The default returns `NotSupported`; Lance-backed and remote tables override it.
async fn update_field_metadata(
&self,
_updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
Err(Error::NotSupported {
message: "update_field_metadata is not supported on this table type".into(),
})
}
}
/// A Table is a collection of strong typed Rows.
@@ -714,30 +656,6 @@ mod test_utils {
}
}
pub fn new_with_handler_and_interval<T>(
name: impl Into<String>,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
read_consistency_interval: Option<std::time::Duration>,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
name.into(),
handler.clone(),
read_consistency_interval,
),
);
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
Self {
inner,
database: Some(database),
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version<T>(
name: impl Into<String>,
version: semver::Version,
@@ -942,8 +860,7 @@ impl Table {
/// Delete the rows from table that match the predicate.
///
/// # Arguments
/// - `predicate` - A SQL string (`&str`) or DataFusion expression (`&Expr`)
/// that selects the rows to delete.
/// - `predicate` - The SQL predicate string to filter the rows to be deleted.
///
/// # Example
///
@@ -952,7 +869,6 @@ impl Table {
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// use datafusion_expr::{col, lit};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
@@ -982,17 +898,11 @@ impl Table {
/// .execute()
/// .await
/// .unwrap();
///
/// // Using a SQL string:
/// tbl.delete("id > 5").await.unwrap();
///
/// // Using a DataFusion expression:
/// let expr = col("id").lt(lit(4));
/// tbl.delete(&expr).await.unwrap();
/// # });
/// ```
pub async fn delete(&self, predicate: impl Into<Predicate<'_>>) -> Result<DeleteResult> {
self.inner.delete(predicate.into()).await
pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
self.inner.delete(predicate).await
}
/// Create an index on the provided column(s).
@@ -1356,14 +1266,6 @@ impl Table {
self.inner.alter_columns(alterations).await
}
/// Update per-field metadata (merges by default).
pub async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
self.inner.update_field_metadata(updates).await
}
/// Remove columns from the table.
pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
self.inner.drop_columns(columns).await
@@ -1396,15 +1298,21 @@ impl Table {
///
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
///
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by a scalar column.
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
/// unenforced primary key.
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
///
/// All variants require the table to have an unenforced primary key
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
/// requires it to be the single column being bucketed.
///
/// # Example
///
/// ```
/// # use lancedb::table::{LsmWriteSpec, Table};
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// table.set_unenforced_primary_key(["id"]).await?;
/// table
/// .set_lsm_write_spec(
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
@@ -1425,16 +1333,6 @@ impl Table {
self.inner.unset_lsm_write_spec().await
}
/// Drain and close any cached MemWAL shard writers held for this table.
///
/// When an [`LsmWriteSpec`] is installed, `merge_insert` opens MemWAL shard
/// writers and caches them for reuse across calls. This closes them,
/// flushing pending data; writers reopen lazily on the next `merge_insert`.
/// It is a no-op when no writers are cached.
pub async fn close_lsm_writers(&self) -> Result<()> {
self.inner.close_lsm_writers().await
}
/// Retrieve the version of the table
///
/// LanceDb supports versioning. Every operation that modifies the table increases
@@ -2878,12 +2776,9 @@ impl BaseTable for NativeTable {
merge::lsm::unset_lsm_write_spec(self).await
}
async fn close_lsm_writers(&self) -> Result<()> {
merge::lsm::close_lsm_writers(self).await
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
// Delegate to the submodule implementation
delete::execute_delete(self, predicate).await
}
@@ -2910,45 +2805,77 @@ impl BaseTable for NativeTable {
schema_evolution::execute_alter_columns(self, alterations).await
}
async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
schema_evolution::execute_update_field_metadata(self, updates).await
}
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
schema_evolution::execute_drop_columns(self, columns).await
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
let dataset = self.dataset.get().await?;
let indices = dataset
.describe_indices(None)
.await?
.into_iter()
.filter_map(|idx_desc| {
let index_type: crate::index::IndexType = match idx_desc.index_type().parse() {
Ok(index_type) => index_type,
let indices = dataset.load_indices().await?;
let results = futures::stream::iter(indices.as_slice())
.then(|idx| async {
// skip Lance internal indexes
if idx.name == FRAG_REUSE_INDEX_NAME {
return None;
}
let stats = match dataset.index_statistics(idx.name.as_str()).await {
Ok(stats) => stats,
Err(e) => {
log::warn!(
"Failed to parse index type for index {}: {}",
idx_desc.name(),
"Failed to get statistics for index {} ({}): {}",
idx.name,
idx.uuid,
e
);
return None;
}
};
let field_ids = idx_desc.field_ids();
let mut columns = Vec::with_capacity(field_ids.len());
for field_id in field_ids {
let field_path = match dataset.schema().field_path(*field_id as i32) {
let stats: serde_json::Value = match serde_json::from_str(&stats) {
Ok(stats) => stats,
Err(e) => {
log::warn!(
"Failed to deserialize index statistics for index {} ({}): {}",
idx.name,
idx.uuid,
e
);
return None;
}
};
let Some(index_type) = stats.get("index_type").and_then(|v| v.as_str()) else {
log::warn!(
"Index statistics was missing 'index_type' field for index {} ({})",
idx.name,
idx.uuid
);
return None;
};
let index_type: crate::index::IndexType = match index_type.parse() {
Ok(index_type) => index_type,
Err(e) => {
log::warn!(
"Failed to parse index type for index {} ({}): {}",
idx.name,
idx.uuid,
e
);
return None;
}
};
let mut columns = Vec::with_capacity(idx.fields.len());
for field_id in &idx.fields {
let field_path = match dataset.schema().field_path(*field_id) {
Ok(field_path) => field_path,
Err(e) => {
log::warn!(
"Failed to resolve field path for index {} field id {}: {}",
idx_desc.name(),
"Failed to resolve field path for index {} ({}) field id {}: {}",
idx.name,
idx.uuid,
field_id,
e
);
@@ -2958,14 +2885,17 @@ impl BaseTable for NativeTable {
columns.push(field_path);
}
let name = idx.name.clone();
Some(IndexConfig {
name: idx_desc.name().to_string(),
index_type,
columns,
name,
})
})
.collect();
Ok(indices)
.collect::<Vec<_>>()
.await;
Ok(results.into_iter().flatten().collect())
}
async fn uri(&self) -> Result<String> {
@@ -3075,12 +3005,11 @@ impl BaseTable for NativeTable {
let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
let min = sorted_sizes.first().copied().unwrap_or(0);
let max = sorted_sizes.last().copied().unwrap_or(0);
let mean = sorted_sizes
.iter()
.copied()
.sum::<usize>()
.checked_div(num_fragments)
.unwrap_or(0);
let mean = if num_fragments == 0 {
0
} else {
sorted_sizes.iter().copied().sum::<usize>() / num_fragments
};
let frag_stats = FragmentStatistics {
num_fragments,
@@ -4080,27 +4009,26 @@ mod tests {
let index_configs = table.list_indices().await.unwrap();
assert_eq!(index_configs.len(), 5);
// list_indices returns indices in alphabetical order by name
let mut configs_iter = index_configs.into_iter();
let index = configs_iter.next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
assert_eq!(index.columns, vec!["category".to_string()]);
let index = configs_iter.next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
assert_eq!(index.columns, vec!["data".to_string()]);
let index = configs_iter.next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
assert_eq!(index.columns, vec!["is_active".to_string()]);
let index = configs_iter.next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
assert_eq!(index.columns, vec!["large_category".to_string()]);
assert_eq!(index.columns, vec!["data".to_string()]);
let index = configs_iter.next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
assert_eq!(index.columns, vec!["large_data".to_string()]);
let index = configs_iter.next().unwrap();
assert_eq!(index.index_type, crate::index::IndexType::Bitmap);
assert_eq!(index.columns, vec!["large_category".to_string()]);
}
#[tokio::test]
@@ -4672,6 +4600,21 @@ mod tests {
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Reject when no PK is set.
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await
.expect_err("should reject without PK");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
// Set PK, then a mismatched column on the spec must be rejected.
table.set_unenforced_primary_key(["id"]).await.unwrap();
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
.await
.expect_err("should reject column != PK");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
// Reject num_buckets out of range.
for bad in [0u32, 1025] {
let err = table
@@ -4737,6 +4680,9 @@ mod tests {
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Lance's MemWAL still requires *some* unenforced primary key on
// the dataset; Unsharded just skips the per-row hashing step.
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::unsharded())
.await
@@ -4783,6 +4729,7 @@ mod tests {
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(
LsmWriteSpec::identity("region")
@@ -4838,6 +4785,7 @@ mod tests {
table.unset_lsm_write_spec().await.unwrap_err();
// Install a spec, then unset it.
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await

View File

@@ -982,105 +982,4 @@ mod tests {
table2.add(struct_batch).execute().await.unwrap();
assert_eq!(table2.count_rows(None).await.unwrap(), 2);
}
/// Regression test: appending `arrow.json` (PyArrow `pa.json_()`) data into a table
/// whose schema was created with `pa.json_()` (internally stored as `lance.json`, backed
/// by `LargeBinary`) must succeed without a schema-mismatch error.
///
/// Previously `build_field_exprs` would attempt a `Utf8 → LargeBinary` DataFusion cast,
/// which produced a field whose Arrow extension metadata still read `arrow.json` instead
/// of `lance.json`. Lance-core then rejected the append with
/// `"json vs large_binary" schema mismatch`.
///
/// PyArrow's `pa.json_()` may be backed by either `Utf8` or `LargeUtf8` depending on the
/// constructor used, so the test is parameterized over the input backing type.
#[rstest::rstest]
#[case::utf8(DataType::Utf8)]
#[case::large_utf8(DataType::LargeUtf8)]
#[tokio::test]
async fn test_add_arrow_json_into_lance_json_table(#[case] input_type: DataType) {
use arrow_array::{Array, cast::AsArray};
use lance_arrow::ARROW_EXT_NAME_KEY;
use lance_arrow::json::{ARROW_JSON_EXT_NAME, JSON_EXT_NAME};
// Build a table whose "data" column is lance.json (LargeBinary +
// ARROW:extension:name = "lance.json").
let lance_json_field = lance_arrow::json::json_field("data", true);
let table_schema = Arc::new(Schema::new(vec![lance_json_field]));
let db = connect("memory://").execute().await.unwrap();
let table = db
.create_empty_table("json_test", table_schema)
.execute()
.await
.unwrap();
// Sanity-check the stored schema.
let stored_field = table.schema().await.unwrap();
let data_field = stored_field.field_with_name("data").unwrap();
assert_eq!(data_field.data_type(), &DataType::LargeBinary);
assert_eq!(
data_field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|s| s.as_str()),
Some(JSON_EXT_NAME),
);
// Build an arrow.json input field (Utf8/LargeUtf8 + arrow.json extension).
// This is what PyArrow produces for pa.json_() arrays.
let arrow_json_metadata = std::collections::HashMap::from([(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
)]);
let arrow_json_field =
Field::new("data", input_type.clone(), true).with_metadata(arrow_json_metadata);
let arrow_json_schema = Arc::new(Schema::new(vec![arrow_json_field]));
let rows: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)];
let string_array: Arc<dyn arrow_array::Array> = match input_type {
DataType::Utf8 => Arc::new(arrow_array::StringArray::from(rows.clone())),
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(rows.clone())),
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
};
let batch = RecordBatch::try_new(arrow_json_schema, vec![string_array]).unwrap();
// This must not fail with a schema-mismatch error.
table.add(batch).execute().await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), rows.len());
// A lance.json column is read back as Utf8 carrying arrow.json extension metadata.
let results: Vec<RecordBatch> = table
.query()
.select(Select::columns(&["data"]))
.execute()
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert_eq!(results.len(), 1);
let batch = &results[0];
assert_eq!(batch.num_rows(), rows.len());
let json_col = batch.column(0);
assert_eq!(json_col.data_type(), &DataType::Utf8);
let json_strs = json_col.as_string::<i32>();
for (i, expected) in rows.iter().enumerate() {
match expected {
None => assert!(json_strs.is_null(i), "row {i} expected null"),
Some(raw) => {
assert!(!json_strs.is_null(i), "row {i} expected non-null");
let actual: serde_json::Value = serde_json::from_str(json_strs.value(i))
.expect("read-back JSON should be valid");
let expected: serde_json::Value =
serde_json::from_str(raw).expect("expected JSON should be valid");
assert_eq!(actual, expected, "row {i} JSON mismatch");
}
}
}
}
}

View File

@@ -13,7 +13,6 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
use lance_arrow::json::{is_arrow_json_field, is_json_field};
use crate::{Error, Result};
@@ -65,18 +64,6 @@ fn build_field_exprs(
let input_field = &input_fields[input_idx];
let input_expr = get_input_expr(input_idx);
// Special case: input is arrow.json (PyArrow pa.json_() extension type backed by
// Utf8/LargeUtf8) and the table field is lance.json (backed by LargeBinary).
// Lance-core's write path already handles the arrow.json → lance.json conversion
// (including JSONB encoding), so we pass the expression through unchanged and let
// lance-core deal with it. Attempting to cast Utf8 → LargeBinary here would
// produce a field whose metadata still identifies it as arrow.json, which then
// causes a schema-mismatch error inside lance-core.
if is_arrow_json_field(input_field) && is_json_field(table_field) {
result.push((input_expr, Arc::clone(input_field) as FieldRef));
continue;
}
let expr = match (input_field.data_type(), table_field.data_type()) {
// Both are structs: recurse into sub-fields to handle subschemas and casts.
(DataType::Struct(in_children), DataType::Struct(tbl_children))
@@ -631,75 +618,4 @@ mod tests {
.unwrap();
assert_eq!(a.values(), &[1, 3]);
}
/// `arrow.json` input (PyArrow `pa.json_()`, Utf8/LargeUtf8 + extension metadata) against a
/// `lance.json` table field (LargeBinary + extension metadata) must be passed through
/// without a cast so that lance-core can perform its own arrow.json → JSONB conversion.
///
/// Before the fix, `cast_to_table_schema` attempted a `Utf8 → LargeBinary` DataFusion
/// cast that preserved the wrong extension metadata, causing lance-core to reject the
/// batch with a "json vs large_binary" schema-mismatch error.
#[rstest::rstest]
#[case::utf8(DataType::Utf8)]
#[case::large_utf8(DataType::LargeUtf8)]
#[tokio::test]
async fn test_arrow_json_passthrough_to_lance_json(#[case] input_type: DataType) {
use lance_arrow::ARROW_EXT_NAME_KEY;
use lance_arrow::json::{ARROW_JSON_EXT_NAME, json_field};
// Build a table schema with a lance.json field (LargeBinary + lance.json metadata).
let lance_field = json_field("data", true);
let table_schema = Schema::new(vec![lance_field]);
// Build an input batch with an arrow.json field (Utf8/LargeUtf8 + arrow.json metadata).
let arrow_meta = std::collections::HashMap::from([(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
)]);
let arrow_field = Field::new("data", input_type.clone(), true).with_metadata(arrow_meta);
let input_schema = Arc::new(Schema::new(vec![arrow_field]));
let values = vec![Some(r#"{"x": 1}"#), None, Some(r#"{"y": 2}"#)];
let input_array: Arc<dyn arrow_array::Array> = match input_type {
DataType::Utf8 => Arc::new(StringArray::from(values)),
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(values)),
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
};
let input_batch = RecordBatch::try_new(input_schema, vec![input_array]).unwrap();
let plan = plan_from_batch(input_batch).await;
let projected = cast_to_table_schema(plan, &table_schema).unwrap();
// The projected schema's "data" field must carry arrow.json metadata
// (the input field), not be silently dropped or miscast.
let out_field = projected.schema().field_with_name("data").unwrap().clone();
assert_eq!(out_field.data_type(), &input_type);
assert_eq!(
out_field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|s| s.as_str()),
Some(ARROW_JSON_EXT_NAME),
"output field must still carry arrow.json metadata so lance-core can handle it"
);
// The data must flow through correctly (3 rows, no panic).
let result = collect(projected).await;
assert_eq!(result.num_rows(), 3);
let (v0, v2) = match input_type {
DataType::Utf8 => {
let col: &StringArray = result.column(0).as_any().downcast_ref().unwrap();
(col.value(0).to_string(), col.value(2).to_string())
}
DataType::LargeUtf8 => {
let col: &arrow_array::LargeStringArray =
result.column(0).as_any().downcast_ref().unwrap();
(col.value(0).to_string(), col.value(2).to_string())
}
_ => unreachable!(),
};
assert_eq!(v0, r#"{"x": 1}"#);
assert!(result.column(0).is_null(1));
assert_eq!(v2, r#"{"y": 2}"#);
}
}

View File

@@ -870,10 +870,8 @@ mod tests {
.await
.unwrap();
assert_eq!(
result.iter().map(|batch| batch.num_rows()).sum::<usize>(),
0
);
// Should return empty or nearly empty result
assert!(result[0].num_rows() <= 1);
}
#[tokio::test]

View File

@@ -8,7 +8,6 @@ use std::{
use lance::{Dataset, dataset::refs};
use crate::table::merge::lsm::ShardWriterCache;
use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
/// A wrapper around a [Dataset] that provides consistency checks.
@@ -19,10 +18,6 @@ use crate::{Error, error::Result, utils::background_cache::BackgroundCache};
pub struct DatasetConsistencyWrapper {
state: Arc<Mutex<DatasetState>>,
consistency: ConsistencyMode,
/// The single MemWAL `ShardWriter` for this dataset, co-located so it is
/// cached for the session and shares the dataset's lifecycle. A dataset
/// writes to one shard at a time. Shared by `Arc` across clones.
shard_writer: Arc<ShardWriterCache>,
}
/// The current dataset and whether it is pinned to a specific version.
@@ -72,15 +67,9 @@ impl DatasetConsistencyWrapper {
pinned_version: None,
})),
consistency,
shard_writer: Arc::new(ShardWriterCache::default()),
}
}
/// The MemWAL `ShardWriter` cache co-located with this dataset.
pub(crate) fn shard_writer(&self) -> &Arc<ShardWriterCache> {
&self.shard_writer
}
/// Get the current dataset.
///
/// Behavior depends on the consistency mode:

View File

@@ -1,12 +1,9 @@
use std::sync::Arc;
use futures::FutureExt;
use lance::dataset::DeleteBuilder;
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use serde::{Deserialize, Serialize};
use super::{NativeTable, Predicate};
use super::NativeTable;
use crate::Result;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
@@ -24,39 +21,17 @@ pub struct DeleteResult {
/// Internal implementation of the delete logic
///
/// This logic was moved from NativeTable::delete to keep table.rs clean.
pub(crate) async fn execute_delete(
table: &NativeTable,
predicate: Predicate<'_>,
) -> Result<DeleteResult> {
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
match predicate {
Predicate::String(s) => {
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(s).boxed().await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
Predicate::Expr(expr) => {
let dataset = table.dataset.get().await?;
let delete_result = DeleteBuilder::from_expr(Arc::clone(&dataset), expr.clone())
.execute()
.await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = delete_result.new_dataset.version().version;
table.dataset.update(
Arc::try_unwrap(delete_result.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
);
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
}
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(predicate).boxed().await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
#[cfg(test)]
@@ -201,100 +176,4 @@ mod tests {
"Table version must increment after delete operation"
);
}
#[tokio::test]
async fn test_delete_expr() {
use datafusion_expr::{col, lit};
let conn = connect("memory://").execute().await.unwrap();
// 1. Create a table with values 0 to 9
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let table = conn
.create_table("test_delete_expr", batch)
.execute()
.await
.unwrap();
// 2. Verify initial state
assert_eq!(table.count_rows(None).await.unwrap(), 10);
let initial_version = table.version().await.unwrap();
// 3. Execute Delete with Expr (removes values > 5)
let expr = col("i").gt(lit(5));
table.delete(&expr).await.unwrap();
// 4. Verify results
assert_eq!(table.count_rows(None).await.unwrap(), 6); // 0, 1, 2, 3, 4, 5 remain
let current_version = table.version().await.unwrap();
assert!(
current_version > initial_version,
"Table version must increment after delete_expr operation"
);
// 5. Verify specific data consistency
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let batch = &batches[0];
let array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
// Ensure no value > 5 exists
for val in array.iter() {
assert!(val.unwrap() <= 5);
}
}
#[tokio::test]
async fn test_delete_expr_increments_version() {
use datafusion_expr::lit;
let conn = connect("memory://").execute().await.unwrap();
// Create a table with 5 rows
let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5])).unwrap();
let table = conn
.create_table("test_delete_expr_noop", batch)
.execute()
.await
.unwrap();
// Capture the initial state (Rows = 5, Version = 1)
let initial_rows = table.count_rows(None).await.unwrap();
let initial_version = table.version().await.unwrap();
assert_eq!(initial_rows, 5);
let expr = lit(false);
table.delete(&expr).await.unwrap();
// Rows should still be 5
let current_rows = table.count_rows(None).await.unwrap();
assert_eq!(
current_rows, initial_rows,
"Data should not change when predicate is false"
);
// version check
let current_version = table.version().await.unwrap();
assert!(
current_version > initial_version,
"Table version must increment after delete_expr operation"
);
}
}

View File

@@ -41,16 +41,6 @@ pub struct MergeResult {
/// A value of 1 means the operation succeeded on the first try.
#[serde(default)]
pub num_attempts: u32,
/// Total number of rows written.
///
/// On the standard `merge_insert` path this equals
/// `num_inserted_rows + num_updated_rows`. On the MemWAL LSM write path the
/// insert/update breakdown is not known until compaction; in that mode
/// `num_inserted_rows`, `num_updated_rows`, `num_deleted_rows`, `version`
/// and `num_attempts` are all `0` and this field holds the total number of
/// rows written through the shard writer.
#[serde(default)]
pub num_rows: u64,
}
/// A builder used to create and run a merge insert operation
@@ -67,8 +57,6 @@ pub struct MergeInsertBuilder {
pub(crate) when_not_matched_by_source_delete_filt: Option<String>,
pub(crate) timeout: Option<Duration>,
pub(crate) use_index: bool,
pub(crate) use_lsm_write: Option<bool>,
pub(crate) validate_single_shard: bool,
}
impl MergeInsertBuilder {
@@ -83,8 +71,6 @@ impl MergeInsertBuilder {
when_not_matched_by_source_delete_filt: None,
timeout: None,
use_index: true,
use_lsm_write: None,
validate_single_shard: true,
}
}
@@ -164,34 +150,6 @@ impl MergeInsertBuilder {
self
}
/// Controls whether `merge_insert` uses the MemWAL LSM write path.
///
/// By default (unset), a `merge_insert` on a table with an
/// [`LsmWriteSpec`](super::LsmWriteSpec) installed is routed through
/// Lance's MemWAL shard writer, and a table without one uses the standard
/// path. Calling this with `false` forces the standard path even when a
/// spec is set. Calling it with `true` requires a spec — `merge_insert`
/// errors if none is installed.
pub fn use_lsm_write(&mut self, use_lsm_write: bool) -> &mut Self {
self.use_lsm_write = Some(use_lsm_write);
self
}
/// Controls how an LSM `merge_insert` checks that its input targets a
/// single shard.
///
/// When a table has an LSM write spec, every row in a `merge_insert` call
/// must route to the same shard. When `true` (the default), every row is
/// inspected to verify this. When `false`, only the first row is inspected
/// and the shard it routes to is used for the whole input — a faster path
/// for callers that have already pre-sharded their input.
///
/// Has no effect on tables without an LSM write spec.
pub fn validate_single_shard(&mut self, validate_single_shard: bool) -> &mut Self {
self.validate_single_shard = validate_single_shard;
self
}
/// Executes the merge insert operation
///
/// Returns version and statistics about the merge operation including the number of rows
@@ -209,23 +167,6 @@ pub(crate) async fn execute_merge_insert(
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<MergeResult> {
match lsm::lsm_dispatch_decision(table, &params).await? {
lsm::LsmDispatch::Lsm(plan) => {
let future =
lsm::execute_lsm_merge_insert(table, plan, params.validate_single_shard, new_data);
return match params.timeout {
Some(timeout) => match tokio::time::timeout(timeout, future).await {
Ok(result) => result,
Err(_) => Err(Error::Runtime {
message: "merge insert timed out".to_string(),
}),
},
None => future.await,
};
}
lsm::LsmDispatch::Standard => {}
}
let dataset = table.dataset.get().await?;
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
match (
@@ -278,7 +219,6 @@ pub(crate) async fn execute_merge_insert(
num_inserted_rows: stats.num_inserted_rows,
num_deleted_rows: stats.num_deleted_rows,
num_attempts: stats.num_attempts,
num_rows: stats.num_inserted_rows + stats.num_updated_rows,
})
}
@@ -387,366 +327,3 @@ mod tests {
assert_eq!(table.count_rows(None).await.unwrap(), 25);
}
}
#[cfg(test)]
mod lsm_tests {
use std::sync::Arc;
use arrow_array::{
Int64Array, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray,
};
use arrow_schema::{DataType, Field, Schema};
use tempfile::{TempDir, tempdir};
use crate::connect;
use crate::error::Error;
use crate::table::{LsmWriteSpec, Table};
/// A reader of `[id: Int64, value: Int64]` rows; `value` is `0..n`.
fn id_value_reader(ids: Vec<i64>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("value", DataType::Int64, false),
]));
let n = ids.len() as i64;
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(Int64Array::from_iter_values(0..n)),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
/// A reader of `[id: Int64, region: Utf8]` rows.
fn id_region_reader(rows: Vec<(i64, &str)>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
]));
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(regions)),
],
)
.unwrap();
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
}
/// A multi-batch reader of `[id: Int64, region: Utf8]` rows.
fn id_region_multi_reader(batches: Vec<Vec<(i64, &str)>>) -> Box<dyn RecordBatchReader + Send> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
]));
let records: Vec<_> = batches
.into_iter()
.map(|rows| {
let ids: Vec<i64> = rows.iter().map(|(id, _)| *id).collect();
let regions: Vec<&str> = rows.iter().map(|(_, region)| *region).collect();
Ok(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(ids)),
Arc::new(StringArray::from(regions)),
],
)
.unwrap())
})
.collect();
Box::new(RecordBatchIterator::new(records, schema))
}
/// Create an `[id, value]` table with `id` as the unenforced primary key.
async fn id_value_table(dir: &TempDir) -> Table {
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_value_reader(vec![1, 2, 3]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
}
#[tokio::test]
async fn lsm_merge_insert_bucket() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
// num_buckets = 1: every row routes to the single bucket.
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// Empty `on` defaults to the primary key.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_value_reader(vec![3, 4, 5]))
.await
.unwrap();
// LSM path: rows go to the MemWAL, the breakdown is unknown until
// compaction, so only `num_rows` is populated.
assert_eq!(result.num_rows, 3);
assert_eq!(result.version, 0);
assert_eq!(result.num_inserted_rows, 0);
assert_eq!(result.num_updated_rows, 0);
}
#[tokio::test]
async fn lsm_merge_insert_unsharded() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::unsharded())
.await
.unwrap();
let mut builder = table.merge_insert(&["id"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_value_reader(vec![10, 11, 12, 13]))
.await
.unwrap();
assert_eq!(result.num_rows, 4);
}
#[tokio::test]
async fn lsm_merge_insert_identity() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us"), (2, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// All rows share one identity value, so they route to one shard.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_region_reader(vec![(3, "us"), (4, "us")]))
.await
.unwrap();
assert_eq!(result.num_rows, 2);
}
#[tokio::test]
async fn lsm_merge_insert_use_lsm_write_false_falls_back() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// use_lsm_write(false) opts out: the standard path runs and commits.
let mut builder = table.merge_insert(&["id"]);
builder.when_not_matched_insert_all().use_lsm_write(false);
let result = builder
.execute(id_value_reader(vec![3, 4, 5]))
.await
.unwrap();
assert_eq!(result.num_inserted_rows, 2);
assert_eq!(table.count_rows(None).await.unwrap(), 5);
}
#[tokio::test]
async fn lsm_merge_insert_rejects_on_not_primary_key() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
let mut builder = table.merge_insert(&["value"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder.execute(id_value_reader(vec![1])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_rejects_non_upsert() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
// Insert-only (no when_matched_update_all) is not the upsert shape.
let mut builder = table.merge_insert(&[]);
builder.when_not_matched_insert_all();
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_close_writers_then_reopen() {
let dir = tempdir().unwrap();
let table = id_value_table(&dir).await;
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 1))
.await
.unwrap();
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder.execute(id_value_reader(vec![7, 8])).await.unwrap();
table.close_lsm_writers().await.unwrap();
// The writer reopens lazily on the next merge_insert.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder.execute(id_value_reader(vec![9])).await.unwrap();
assert_eq!(result.num_rows, 1);
}
#[tokio::test]
async fn lsm_merge_insert_multi_batch() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// Multiple batches that all route to one shard are written together.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let result = builder
.execute(id_region_multi_reader(vec![
vec![(2, "us"), (3, "us")],
vec![(4, "us")],
]))
.await
.unwrap();
assert_eq!(result.num_rows, 3);
// Batches that route to different shards are rejected; the validation
// runs before any write, so no partial write is left behind.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder
.execute(id_region_multi_reader(vec![
vec![(5, "us")],
vec![(6, "eu")],
]))
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_use_lsm_write_true_requires_spec() {
let dir = tempdir().unwrap();
// id_value_table sets a primary key but no LSM write spec.
let table = id_value_table(&dir).await;
let mut builder = table.merge_insert(&["id"]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all()
.use_lsm_write(true);
let err = builder.execute(id_value_reader(vec![4])).await.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
}
#[tokio::test]
async fn lsm_merge_insert_rejects_second_shard() {
let dir = tempdir().unwrap();
let conn = connect(dir.path().to_str().unwrap())
.execute()
.await
.unwrap();
let table = conn
.create_table("t", id_region_reader(vec![(1, "us")]))
.execute()
.await
.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::identity("region"))
.await
.unwrap();
// The first merge_insert opens the single writer for shard "us".
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder
.execute(id_region_reader(vec![(2, "us")]))
.await
.unwrap();
// A merge_insert routing to a different shard is rejected.
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
let err = builder
.execute(id_region_reader(vec![(3, "eu")]))
.await
.unwrap_err();
assert!(matches!(err, Error::InvalidInput { .. }), "got {err:?}");
// After closing the writer, a different shard can be written.
table.close_lsm_writers().await.unwrap();
let mut builder = table.merge_insert(&[]);
builder
.when_matched_update_all(None)
.when_not_matched_insert_all();
builder
.execute(id_region_reader(vec![(4, "eu")]))
.await
.unwrap();
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -10,7 +10,6 @@
use lance::dataset::{ColumnAlteration, NewColumnTransform};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use super::NativeTable;
use crate::Result;
@@ -45,52 +44,6 @@ pub struct DropColumnsResult {
pub version: u64,
}
/// A single field's metadata update, addressed by dot-path.
///
/// Merges into the field's existing metadata by default. Use [`Self::remove`] to
/// delete a key, or [`Self::replace`] to swap the field's entire metadata map.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
pub struct FieldMetadataUpdate {
/// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`).
pub path: String,
/// Keys to set (`Some`) or delete (`None`).
pub metadata: HashMap<String, Option<String>>,
/// If `true`, replace the field's entire metadata map instead of merging.
pub replace: bool,
}
impl FieldMetadataUpdate {
pub fn new(path: impl Into<String>) -> Self {
Self {
path: path.into(),
metadata: HashMap::new(),
replace: false,
}
}
pub fn set(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), Some(value.into()));
self
}
pub fn remove(mut self, key: impl Into<String>) -> Self {
self.metadata.insert(key.into(), None);
self
}
pub fn replace(mut self) -> Self {
self.replace = true;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct UpdateFieldMetadataResult {
/// The commit version associated with the operation.
#[serde(default)]
pub version: u64,
}
/// Internal implementation of the add columns logic.
///
/// Adds new columns to the table using the provided transforms.
@@ -137,32 +90,6 @@ pub(crate) async fn execute_drop_columns(
Ok(DropColumnsResult { version })
}
/// Internal implementation of the update field metadata logic.
///
/// Merges or replaces per-field metadata, addressing fields by dot-path.
pub(crate) async fn execute_update_field_metadata(
table: &NativeTable,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let mut builder = dataset.update_field_metadata();
for update in updates {
let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone()));
builder = if update.replace {
builder.replace(&update.path, entries)?
} else {
builder.update(&update.path, entries)?
};
}
builder.await?;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(UpdateFieldMetadataResult { version })
}
#[cfg(test)]
mod tests {
use arrow_array::{Int32Array, StringArray, record_batch};
@@ -170,7 +97,6 @@ mod tests {
use futures::TryStreamExt;
use lance::dataset::ColumnAlteration;
use super::FieldMetadataUpdate;
use crate::connect;
use crate::query::{ExecutableQuery, QueryBase, Select};
use crate::table::NewColumnTransform;
@@ -684,46 +610,4 @@ mod tests {
let v4 = table.version().await.unwrap();
assert_eq!(drop_result.version, v4);
}
#[tokio::test]
async fn test_update_field_metadata() {
let conn = connect("memory://").execute().await.unwrap();
let batch = record_batch!(
("id", Int32, [1, 2, 3]),
("category", Utf8, ["A", "B", "C"])
)
.unwrap();
let table = conn
.create_table("test_update_field_metadata", batch)
.execute()
.await
.unwrap();
// Set metadata on a field.
table
.update_field_metadata(&[FieldMetadataUpdate::new("category")
.set("unit", "label")
.set("pii", "false")])
.await
.unwrap();
let schema = table.schema().await.unwrap();
let field = schema.field_with_name("category").unwrap();
assert_eq!(
field.metadata().get("unit").map(String::as_str),
Some("label")
);
// Merge: add a key, delete one, keep the rest.
table
.update_field_metadata(&[FieldMetadataUpdate::new("category")
.set("source", "import")
.remove("pii")])
.await
.unwrap();
let schema = table.schema().await.unwrap();
let md = schema.field_with_name("category").unwrap().metadata();
assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved
assert_eq!(md.get("source").map(String::as_str), Some("import")); // added
assert!(!md.contains_key("pii")); // deleted
}
}

View File

@@ -6,7 +6,7 @@ pub(crate) mod background_cache;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_execution::RecordBatchStream;
use futures::{FutureExt, Stream};
@@ -199,32 +199,38 @@ fn collect_vector_columns(
path.pop();
}
pub(crate) fn resolve_arrow_field_path(schema: &Schema, column: &str) -> Result<(String, Field)> {
lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
message: format!("Invalid field path `{}`: {}", column, e),
})?;
pub(crate) fn resolve_arrow_field_path(schema: &Schema, column: &str) -> Result<Field> {
let segments =
lance_core::datatypes::parse_field_path(column).map_err(|e| Error::InvalidInput {
message: format!("Invalid field path `{}`: {}", column, e),
})?;
let mut fields = schema.fields();
let lance_schema =
lance_core::datatypes::Schema::try_from(schema).map_err(|e| Error::Schema {
message: format!("Invalid schema: {}", e),
for (idx, segment) in segments.iter().enumerate() {
let field = find_field(fields, segment).ok_or_else(|| Error::Schema {
message: format!("Field path `{}` not found in schema", column),
})?;
let field_path = lance_schema
.resolve_case_insensitive(column)
.ok_or_else(|| Error::Schema {
message: format!(
"Field path `{}` not found in schema. Available field paths: {}",
column,
lance_schema.field_paths().join(", ")
),
})?;
let field = field_path.last().expect("field path should be non-empty");
let path_segments = field_path
if idx + 1 == segments.len() {
return Ok(field.clone());
}
fields = match field.data_type() {
DataType::Struct(fields) => fields,
_ => {
return Err(Error::Schema {
message: format!("Field path `{}` not found in schema", column),
});
}
};
}
unreachable!("parse_field_path returns at least one segment")
}
fn find_field<'a>(fields: &'a Fields, name: &str) -> Option<&'a Field> {
fields
.iter()
.map(|field| field.name.as_str())
.collect::<Vec<_>>();
let canonical_path = lance_core::datatypes::format_field_path(&path_segments);
Ok((canonical_path, Field::from(*field)))
.find(|field| field.name() == name)
.map(|field| field.as_ref())
}
pub fn supported_btree_data_type(dtype: &DataType) -> bool {