Compare commits

..

1 Commits

Author SHA1 Message Date
Lance Release
d468a0adb1 Bump version: 0.30.0-beta.0 → 0.30.0-beta.1 2026-05-22 10:08:37 +00:00
27 changed files with 320 additions and 1370 deletions

View File

@@ -29,3 +29,7 @@ runs:
args: ${{ inputs.args }}
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
working-directory: python
- uses: actions/upload-artifact@v4
with:
name: windows-wheels
path: python\target\wheels

View File

@@ -8,9 +8,6 @@ on:
# This should trigger a dry run (we skip the final publish step)
paths:
- .github/workflows/pypi-publish.yml
- .github/workflows/build_linux_wheel/action.yml
- .github/workflows/build_mac_wheel/action.yml
- .github/workflows/build_windows_wheel/action.yml
- Cargo.toml # Change in dependency frequently breaks builds
- Cargo.lock
@@ -24,21 +21,32 @@ jobs:
linux:
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
timeout-minutes: 60
permissions:
id-token: write
contents: read
strategy:
matrix:
config:
- platform: x86_64
manylinux: "2_17"
extra_args: ""
runner: ubuntu-22.04
- platform: x86_64
manylinux: "2_28"
extra_args: "--features fp16kernels"
runner: ubuntu-22.04
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
- platform: aarch64
manylinux: "2_17"
extra_args: ""
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
runner: ubuntu-2404-8x-arm64
- platform: aarch64
manylinux: "2_28"
extra_args: "--features fp16kernels"
runner: ubuntu-2404-8x-arm64
runs-on: ${{ matrix.config.runner }}
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -52,14 +60,15 @@ jobs:
args: "--release --strip ${{ matrix.config.extra_args }}"
arm-build: ${{ matrix.config.platform == 'aarch64' }}
manylinux: ${{ matrix.config.manylinux }}
- uses: actions/upload-artifact@v7
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-linux-${{ matrix.config.platform }}-${{ matrix.config.manylinux }}
path: target/wheels/lancedb-*.whl
if-no-files-found: error
fury_token: ${{ secrets.FURY_TOKEN }}
mac:
timeout-minutes: 90
permissions:
id-token: write
contents: read
runs-on: ${{ matrix.config.runner }}
strategy:
matrix:
@@ -69,7 +78,7 @@ jobs:
env:
MACOSX_DEPLOYMENT_TARGET: 10.15
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -81,21 +90,18 @@ jobs:
with:
python-minor-version: 10
args: "--release --strip --target ${{ matrix.config.target }} --features fp16kernels"
- uses: actions/upload-artifact@v7
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-mac-${{ matrix.config.target }}
path: target/wheels/lancedb-*.whl
if-no-files-found: error
fury_token: ${{ secrets.FURY_TOKEN }}
windows:
timeout-minutes: 90
timeout-minutes: 60
permissions:
id-token: write
contents: read
runs-on: windows-latest
env:
# link.exe is single-threaded and the long pole on Windows builds. Use
# rustc's bundled lld-link instead.
CARGO_TARGET_X86_64_PC_WINDOWS_MSVC_LINKER: rust-lld
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -107,70 +113,18 @@ jobs:
with:
python-minor-version: 10
args: "--release --strip"
- uses: actions/upload-artifact@v7
vcpkg_token: ${{ secrets.VCPKG_GITHUB_PACKAGES }}
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
name: wheels-windows
path: target/wheels/lancedb-*.whl
if-no-files-found: error
publish:
name: Publish wheels
if: startsWith(github.ref, 'refs/tags/python-v')
needs: [linux, mac, windows]
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
steps:
- uses: actions/checkout@v6
- name: Download wheel artifacts
uses: actions/download-artifact@v8
with:
pattern: wheels-*
path: target/wheels
merge-multiple: true
- name: List wheels
run: ls -la target/wheels
- name: Choose repo
id: choose_repo
run: |
if [[ ${{ github.ref }} == *beta* ]]; then
echo "repo=fury" >> $GITHUB_OUTPUT
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
env:
FURY_TOKEN: ${{ secrets.FURY_TOKEN }}
run: |
shopt -s nullglob
WHEELS=(target/wheels/lancedb-*.whl)
if [[ ${#WHEELS[@]} -eq 0 ]]; then
echo "No wheels found in target/wheels/" >&2
exit 1
fi
for WHEEL in "${WHEELS[@]}"; do
echo "Uploading $WHEEL to Fury"
curl -f -F package=@"$WHEEL" "https://$FURY_TOKEN@push.fury.io/lancedb/"
done
# NOTE: pypa/gh-action-pypi-publish must be invoked directly from a
# workflow file, not from inside a composite action. When called from a
# composite, `github.action_repository` is empty (actions/runner#2473)
# and the action falls back to `github.repository`, producing a bogus
# `docker://ghcr.io/<repo>:<ref>` image reference that GHA tries to pull.
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/
fury_token: ${{ secrets.FURY_TOKEN }}
gh-release:
if: startsWith(github.ref, 'refs/tags/python-v')
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
@@ -233,13 +187,13 @@ jobs:
report-failure:
name: Report Workflow Failure
runs-on: ubuntu-latest
needs: [linux, mac, windows, publish]
needs: [linux, mac, windows]
permissions:
contents: read
issues: write
if: always() && failure() && startsWith(github.ref, 'refs/tags/python-v')
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- uses: ./.github/actions/create-failure-issue
with:
job-results: ${{ toJSON(needs) }}

View File

@@ -0,0 +1,34 @@
name: upload-wheel
description: "Upload wheels to Pypi"
inputs:
fury_token:
required: true
description: "release token for the fury repo"
runs:
using: "composite"
steps:
- name: Choose repo
shell: bash
id: choose_repo
run: |
if [[ ${{ github.ref }} == *beta* ]]; then
echo "repo=fury" >> $GITHUB_OUTPUT
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
shell: bash
env:
FURY_TOKEN: ${{ inputs.fury_token }}
run: |
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
echo "Uploading $WHEEL to Fury"
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/

View File

@@ -37,13 +37,10 @@ Before committing changes, run formatting for every language you touched. At min
and run targeted tests through `cd python && uv run ...`.
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
Before creating a PR, the exact value passed to `gh pr create --title` must follow
Conventional Commits, such as `fix: support nested field paths in native index creation`
or `feat(python): add dataset multiprocessing support`. Do not use a plain natural
language summary like `Support nested field paths in native index creation` as the PR
title. The semantic-release check uses the PR title and body as the merge commit message,
so a non-conventional PR title will fail CI. After creating a PR, read the remote PR title
back and fix it immediately if it is not conventional.
Before creating a PR, make sure the PR title follows Conventional Commits, such as
`fix: support nested field paths in native index creation` or
`feat(python): add dataset multiprocessing support`. The semantic-release check uses the
PR title and body as the merge commit message, so a non-conventional PR title will fail CI.
## Coding tips

140
Cargo.lock generated
View File

@@ -3284,8 +3284,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsst"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"rand 0.9.4",
@@ -4506,8 +4506,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
[[package]]
name = "lance"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arc-swap",
"arrow",
@@ -4525,7 +4525,6 @@ dependencies = [
"async_cell",
"aws-credential-types",
"aws-sdk-dynamodb",
"bitpacking",
"byteorder",
"bytes",
"chrono",
@@ -4552,11 +4551,9 @@ dependencies = [
"lance-io",
"lance-linalg",
"lance-namespace",
"lance-select",
"lance-table",
"lance-tokenizer",
"log",
"moka",
"object_store",
"permutation",
"pin-project",
@@ -4580,8 +4577,8 @@ dependencies = [
[[package]]
name = "lance-arrow"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4601,8 +4598,8 @@ dependencies = [
[[package]]
name = "lance-bitpacking"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrayref",
"paste",
@@ -4611,8 +4608,8 @@ dependencies = [
[[package]]
name = "lance-core"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4647,8 +4644,8 @@ dependencies = [
[[package]]
name = "lance-datafusion"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-array",
@@ -4678,8 +4675,8 @@ dependencies = [
[[package]]
name = "lance-datagen"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-array",
@@ -4697,8 +4694,8 @@ dependencies = [
[[package]]
name = "lance-encoding"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4733,8 +4730,8 @@ dependencies = [
[[package]]
name = "lance-file"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -4765,8 +4762,8 @@ dependencies = [
[[package]]
name = "lance-index"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arc-swap",
"arrow",
@@ -4803,7 +4800,6 @@ dependencies = [
"lance-file",
"lance-io",
"lance-linalg",
"lance-select",
"lance-table",
"lance-tokenizer",
"libm",
@@ -4831,8 +4827,8 @@ dependencies = [
[[package]]
name = "lance-io"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-arith",
@@ -4874,8 +4870,8 @@ dependencies = [
[[package]]
name = "lance-linalg"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -4891,8 +4887,8 @@ dependencies = [
[[package]]
name = "lance-namespace"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"async-trait",
@@ -4904,8 +4900,8 @@ dependencies = [
[[package]]
name = "lance-namespace-impls"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-ipc",
@@ -4940,9 +4936,9 @@ dependencies = [
[[package]]
name = "lance-namespace-reqwest-client"
version = "0.7.7"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99"
checksum = "f65e31bdaa13e01dab6e7cf566da31df243c34a542f0d915d3601ec0e01e61d2"
dependencies = [
"reqwest 0.12.28",
"serde",
@@ -4952,25 +4948,10 @@ dependencies = [
"url",
]
[[package]]
name = "lance-select"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
dependencies = [
"arrow-array",
"arrow-buffer",
"byteorder",
"bytes",
"deepsize",
"itertools 0.13.0",
"lance-core",
"roaring",
]
[[package]]
name = "lance-table"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow",
"arrow-array",
@@ -4989,7 +4970,6 @@ dependencies = [
"lance-core",
"lance-file",
"lance-io",
"lance-select",
"log",
"object_store",
"prost",
@@ -5010,8 +4990,8 @@ dependencies = [
[[package]]
name = "lance-testing"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"arrow-array",
"arrow-schema",
@@ -5022,8 +5002,8 @@ dependencies = [
[[package]]
name = "lance-tokenizer"
version = "7.1.0-beta.4"
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
version = "7.0.0-beta.13"
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
dependencies = [
"jieba-rs",
"lindera",
@@ -5034,7 +5014,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.30.0-beta.1"
version = "0.30.0-beta.0"
dependencies = [
"ahash",
"anyhow",
@@ -5104,7 +5084,6 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"serial_test",
"snafu 0.8.9",
"tempfile",
"test-log",
@@ -5117,7 +5096,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.30.0-beta.1"
version = "0.30.0-beta.0"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -5140,7 +5119,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.33.0-beta.1"
version = "0.33.0-beta.0"
dependencies = [
"arrow",
"async-trait",
@@ -8149,15 +8128,6 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "scc"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
dependencies = [
"sdd",
]
[[package]]
name = "schannel"
version = "0.1.29"
@@ -8224,12 +8194,6 @@ dependencies = [
"untrusted 0.9.0",
]
[[package]]
name = "sdd"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
[[package]]
name = "sec1"
version = "0.3.0"
@@ -8420,32 +8384,6 @@ dependencies = [
"unsafe-libyaml",
]
[[package]]
name = "serial_test"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f"
dependencies = [
"futures-executor",
"futures-util",
"log",
"once_cell",
"parking_lot",
"scc",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "sha1"
version = "0.10.6"

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.1.0-beta.4", default-features = false, "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.1.0-beta.4", default-features = false, "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.1.0-beta.4", default-features = false, "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

View File

@@ -112,25 +112,25 @@ def fetch_remote_tags() -> List[TagInfo]:
"api",
"-X",
"GET",
f"repos/{LANCE_REPO}/releases",
f"repos/{LANCE_REPO}/git/refs/tags",
"--paginate",
"--jq",
".[].tag_name",
"-F",
"per_page=20",
".[].ref",
]
)
tags: List[TagInfo] = []
for line in output.splitlines():
tag = line.strip()
if not tag.startswith("v"):
ref = line.strip()
if not ref.startswith("refs/tags/v"):
continue
tag = ref.split("refs/tags/")[-1]
version = tag.lstrip("v")
try:
tags.append(TagInfo(tag=tag, version=version, semver=parse_semver(version)))
except ValueError:
continue
if not tags:
raise RuntimeError("No Lance releases could be parsed from GitHub API output")
raise RuntimeError("No Lance tags could be parsed from GitHub API output")
return tags

View File

@@ -70,20 +70,16 @@ client used by manifest-enabled native connections.
optional readConsistencyInterval: number;
```
The interval, in seconds, at which to check for updates to the table
from other processes. If None, then consistency is not checked. For
performance reasons, this is the default. For strong consistency, set
this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero value for
eventual consistency. If more than that interval has passed since the
last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
(For LanceDB OSS only): The interval, in seconds, at which to check for
updates to the table from other processes. If None, then consistency is not
checked. For performance reasons, this is the default. For strong
consistency, set this to zero seconds. Then every read will check for
updates from other processes. As a compromise, you can set this to a
non-zero value for eventual consistency. If more than that interval
has passed since the last check, then the table will be checked for updates.
Note: this consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
***
### region?

View File

@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>7.1.0-beta.4</lance-core.version>
<lance-core.version>7.0.0-beta.13</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -171,22 +171,18 @@ describe("given a connection", () => {
let manifestDir =
tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], {
enableV2ManifestPaths: true,
})) as LocalTable;
expect(await table.usesV2ManifestPaths()).toBe(true);
manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
it("should be able to migrate tables to the V2 manifest paths", async () => {
@@ -203,20 +199,16 @@ describe("given a connection", () => {
const manifestDir =
tmpDir.name + "/test_manifest_path_migration.lance/_versions";
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d\.manifest$/);
});
await table.migrateManifestPathsV2();
expect(await table.usesV2ManifestPaths()).toBe(true);
readdirSync(manifestDir)
.filter((f) => f.endsWith(".manifest"))
.forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
readdirSync(manifestDir).forEach((file) => {
expect(file).toMatch(/^\d{20}\.manifest$/);
});
});
});

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.30.0-beta.1",
"version": "0.30.0-beta.0",
"cpu": [
"x64",
"arm64"

View File

@@ -24,19 +24,15 @@ mod util;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
/// The interval, in seconds, at which to check for updates to the table
/// from other processes. If None, then consistency is not checked. For
/// performance reasons, this is the default. For strong consistency, set
/// this to zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero value for
/// eventual consistency. If more than that interval has passed since the
/// last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
pub read_consistency_interval: Option<f64>,
/// (For LanceDB OSS only): configuration for object storage.
///

View File

@@ -94,6 +94,7 @@ def connect(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -103,10 +104,6 @@ def connect(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -150,13 +147,6 @@ def connect(
>>> db = lancedb.connect("s3://my-bucket/lancedb",
... storage_options={"aws_access_key_id": "***"})
For tests and temporary data, use an in-memory database:
>>> db = lancedb.connect("memory://")
In-memory databases are not persisted. Tables are dropped when the last
connection or table handle referencing them is closed.
Connect to LanceDB cloud:
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
@@ -220,7 +210,6 @@ def connect(
request_thread_pool=request_thread_pool,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
**kwargs,
)
_check_s3_bucket_with_dots(str(uri), storage_options)
@@ -347,6 +336,7 @@ async def connect_async(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -356,10 +346,6 @@ async def connect_async(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -392,8 +378,6 @@ async def connect_async(
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
... storage_options={
... "aws_access_key_id": "***"})
... # For tests and temporary data, use an in-memory database
... db = await lancedb.connect_async("memory://")
... # Connect to LanceDB cloud
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
... client_config={

View File

@@ -50,7 +50,6 @@ class RemoteDBConnection(DBConnection):
connection_timeout: Optional[float] = None,
read_timeout: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
read_consistency_interval: Optional[timedelta] = None,
):
"""Connect to a remote LanceDB database."""
if isinstance(client_config, dict):
@@ -104,7 +103,6 @@ class RemoteDBConnection(DBConnection):
host_override=host_override,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
)
)

View File

@@ -102,15 +102,8 @@ class LinearCombinationReranker(Reranker):
combined_list = []
for row_id, result in results.items():
# Convert vector distance to a relevance score in [0, 1] where
# higher is better. Missing vector entries are penalised with
# `_invert_score(fill)` = 1 - fill (= 0.0 for the default fill=1).
vector_score = self._invert_score(result.get("_distance", fill))
# FTS scores (BM25) are already in a "higher = more relevant" space.
# Missing FTS entries are penalised symmetrically: we use
# `1 - fill` so that the same `fill` value drives both missing-vector
# and missing-FTS penalties in the same direction.
fts_score = result.get("_score", 1 - fill)
fts_score = result.get("_score", fill)
result["_relevance_score"] = self._combine_score(vector_score, fts_score)
combined_list.append(result)
@@ -130,12 +123,8 @@ class LinearCombinationReranker(Reranker):
return tbl
def _combine_score(self, vector_score, fts_score):
# Both vector_score (inverted distance) and fts_score are in a
# "higher = more relevant" space. A straight weighted average gives
# higher _relevance_score to better matches, as expected.
# Previously this returned `1 - (...)` which inverted the final
# ranking so that the *least* relevant document ranked first.
return self.weight * vector_score + (1 - self.weight) * fts_score
# these scores represent distance
return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score)
def _invert_score(self, dist: float):
# Invert the score between relevance and distance

View File

@@ -466,8 +466,7 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d{20}\.manifest", manifest)
assert re.match(r"\d{20}\.manifest", manifest)
# Start a table in V1 mode then migrate
tbl = await db_no_v2_paths.create_table(
@@ -477,15 +476,13 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
assert not await tbl.uses_v2_manifest_paths()
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d\.manifest", manifest)
assert re.match(r"\d\.manifest", manifest)
await tbl.migrate_manifest_paths_v2()
assert await tbl.uses_v2_manifest_paths()
for manifest in os.listdir(manifests_dir):
if manifest.endswith(".manifest"):
assert re.match(r"\d{20}\.manifest", manifest)
assert re.match(r"\d{20}\.manifest", manifest)
@pytest.mark.asyncio

View File

@@ -40,6 +40,16 @@ def _make_table(tmp_path):
def test_set_lsm_write_spec_validates(tmp_path):
_db, table = _make_table(tmp_path)
# No PK set yet.
with pytest.raises(Exception, match="primary key"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.set_unenforced_primary_key("id")
# Column mismatch.
with pytest.raises(Exception, match="match"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
# Out-of-range num_buckets.
with pytest.raises(Exception, match="num_buckets"):
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
@@ -60,6 +70,7 @@ def test_unset_lsm_write_spec(tmp_path):
table.unset_lsm_write_spec()
# Install a spec, then remove it; afterwards a fresh spec can be set.
table.set_unenforced_primary_key("id")
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
table.unset_lsm_write_spec()
# A second unset errors — there is no spec left to remove.

View File

@@ -603,89 +603,3 @@ def test_cross_encoder_reranker_return_all(tmp_path):
assert "_relevance_score" in result.column_names
assert "_score" in result.column_names
assert "_distance" in result.column_names
# ---------------------------------------------------------------------------
# Regression tests for LinearCombinationReranker scoring bugs (issue #3154)
# ---------------------------------------------------------------------------
def test_linear_combination_best_match_ranks_first():
"""
The document that is BOTH the closest vector match AND the only FTS match
must rank first. Previously _combine_score subtracted from 1, inverting
the ranking so the worst document ranked highest.
"""
reranker = LinearCombinationReranker(weight=0.7, return_score="all")
# rowid 0: perfect vector match, sole FTS match → should rank 1st
# rowid 1: mediocre vector, no FTS match
# rowid 2: bad vector, no FTS match
vector_results = pa.Table.from_pydict(
{
"_rowid": [0, 1, 2],
"_distance": [0.0, 0.5, 0.9],
}
)
fts_results = pa.Table.from_pydict(
{
"_rowid": [0],
"_score": [1.0],
}
)
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
scores = dict(
zip(
combined["_rowid"].to_pylist(),
combined["_relevance_score"].to_pylist(),
)
)
# rowid 0 must have the highest relevance score
assert scores[0] > scores[1], (
f"Best match (rowid 0, score={scores[0]:.4f}) should beat "
f"mid match (rowid 1, score={scores[1]:.4f})"
)
assert scores[1] > scores[2], (
f"Mid match (rowid 1, score={scores[1]:.4f}) should beat "
f"bad match (rowid 2, score={scores[2]:.4f})"
)
def test_linear_combination_missing_fts_is_penalised():
"""
A document with no FTS match must score *lower* than a document that
has a mediocre FTS match, everything else being equal. Previously
missing-FTS entries used fill=1.0 directly, which gave them a reward
(via the 1-(...) inversion) instead of a penalty.
"""
reranker = LinearCombinationReranker(weight=0.5, return_score="all")
vector_results = pa.Table.from_pydict(
{
"_rowid": [0, 1],
"_distance": [0.2, 0.2], # identical vector scores
}
)
fts_results = pa.Table.from_pydict(
{
"_rowid": [0], # rowid 1 has no FTS match
"_score": [0.3], # small FTS score
}
)
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
scores = dict(
zip(
combined["_rowid"].to_pylist(),
combined["_relevance_score"].to_pylist(),
)
)
# rowid 0 has a small FTS score; rowid 1 has none.
# Even a small FTS contribution should beat having none at all.
assert scores[0] > scores[1], (
f"Document with FTS score (rowid 0, {scores[0]:.4f}) should beat "
f"document with no FTS match (rowid 1, {scores[1]:.4f})"
)

View File

@@ -104,7 +104,6 @@ datafusion.workspace = true
http-body = "1" # Matching reqwest
rstest = "0.23.0"
test-log = "0.2"
serial_test = "3"
[features]

View File

@@ -812,7 +812,8 @@ impl ConnectBuilder {
self
}
/// The interval at which to check for updates from other processes.
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
@@ -824,11 +825,8 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// # Cost
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
@@ -888,7 +886,6 @@ impl ConnectBuilder {
options.host_override,
self.request.client_config,
storage_options.into(),
self.request.read_consistency_interval,
)?);
Ok(Connection {
internal,

View File

@@ -245,9 +245,6 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
pub(crate) sender: S,
pub(crate) id_delimiter: String,
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
/// Connection-level read consistency interval. Drives the
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
pub(crate) read_consistency_interval: Option<Duration>,
}
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
@@ -341,7 +338,6 @@ impl RestfulLanceDbClient<Sender> {
host_override: Option<String>,
default_headers: HeaderMap,
client_config: ClientConfig,
read_consistency_interval: Option<Duration>,
) -> Result<Self> {
// Get the timeouts
let timeout =
@@ -439,7 +435,6 @@ impl RestfulLanceDbClient<Sender> {
.clone()
.unwrap_or("$".to_string()),
header_provider: client_config.header_provider,
read_consistency_interval,
})
}
}
@@ -845,16 +840,6 @@ pub mod test_utils {
pub fn client_with_handler<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
client_with_handler_and_interval(handler, None)
}
pub fn client_with_handler_and_interval<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
read_consistency_interval: Option<Duration>,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
@@ -872,7 +857,6 @@ pub mod test_utils {
},
id_delimiter: "$".to_string(),
header_provider: None,
read_consistency_interval,
}
}
@@ -897,7 +881,6 @@ pub mod test_utils {
},
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
header_provider: config.header_provider,
read_consistency_interval: None,
}
}
}
@@ -905,7 +888,6 @@ pub mod test_utils {
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::time::Duration;
#[test]
@@ -1064,7 +1046,6 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1100,7 +1081,6 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1138,7 +1118,6 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Header provider errors should fail the request
@@ -1164,7 +1143,6 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_none() {
let config = ClientConfig::default();
// Clear env vars that might be set from other tests
@@ -1177,7 +1155,6 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env() {
// SAFETY: This is only called in tests
unsafe {
@@ -1192,7 +1169,6 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_from_env_key() {
// SAFETY: This is only called in tests
unsafe {
@@ -1213,7 +1189,6 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_direct_takes_precedence() {
// SAFETY: This is only called in tests
unsafe {
@@ -1231,7 +1206,6 @@ mod tests {
}
#[test]
#[serial(user_id_env)]
fn test_resolve_user_id_empty_env_ignored() {
// SAFETY: This is only called in tests
unsafe {

View File

@@ -206,7 +206,6 @@ impl RemoteDatabase {
host_override: Option<String>,
client_config: ClientConfig,
options: RemoteOptions,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let parsed = super::client::parse_db_url(uri)?;
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
@@ -234,7 +233,6 @@ impl RemoteDatabase {
host_override,
header_map,
client_config.clone(),
read_consistency_interval,
)?;
let table_cache = Cache::builder()

View File

@@ -25,7 +25,7 @@ use crate::table::MergeResult;
use crate::table::Tags;
use crate::table::UpdateResult;
use crate::table::query::create_multi_vector_plan;
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
use crate::table::{AnyQuery, Filter, PreprocessingOutput, TableStatistics};
use crate::utils::background_cache::BackgroundCache;
use crate::utils::{
resolve_arrow_field_path, supported_btree_data_type, supported_vector_data_type,
@@ -62,73 +62,15 @@ use std::collections::HashMap;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::time::Duration;
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
/// Per-table state driving the `x-lancedb-min-timestamp` freshness header
/// sent on read requests.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessState {
/// Wall-clock floor for read freshness, bumped to `now` whenever this
/// handle performs a write or an explicit [`BaseTable::checkout_latest`].
/// Subsequent reads send `max(baseline, now - read_consistency_interval)`
/// as `x-lancedb-min-timestamp`.
///
/// This is what provides read-your-write on a single handle: after a write
/// the next read forces the server cache past the write time. It also
/// preserves the `checkout_latest()` signal when `read_consistency_interval`
/// is unset (the default), where there is no interval-based floor.
freshness_baseline: Option<SystemTime>,
}
/// Snapshot of the headers that should be attached to a single read request.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessHeaders {
min_timestamp: Option<SystemTime>,
}
impl FreshnessHeaders {
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
if let Some(ts) = self.min_timestamp {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
}
request
}
}
/// Monotonic floor for the freshness baseline. `SystemTime` is not monotonic
/// (NTP steps, hibernate/resume can move it backward), so a write must never
/// lower the baseline below a prior write's — that would let the next read send
/// a `min_timestamp` earlier than an earlier write and break read-your-write.
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
prev.map_or(now, |prev| prev.max(now))
}
fn compute_min_timestamp(
state: &FreshnessState,
interval: Option<Duration>,
now: SystemTime,
) -> Option<SystemTime> {
let interval_based = match interval {
None => None,
Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
};
match (interval_based, state.freshness_baseline) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)),
}
}
pub struct RemoteTags<'a, S: HttpSend = Sender> {
inner: &'a RemoteTable<S>,
}
@@ -138,7 +80,8 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
async fn list(&self) -> Result<HashMap<String, TagContents>> {
let request = self
.inner
.post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
.client
.post(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
@@ -169,13 +112,48 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
}
async fn get_version(&self, tag: &str) -> Result<u64> {
let request = self.inner.post_read(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
));
self.inner
.resolve_tag_version_with_request(tag, request)
.await
let request = self
.inner
.client
.post(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
))
.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
.check_table_response(&request_id, response)
.await?;
match response.text().await {
Ok(body) => {
let value: serde_json::Value =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse tag version: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
})?;
value
.get("version")
.and_then(|v| v.as_u64())
.ok_or_else(|| Error::Http {
source: format!("Invalid tag version response: {}", body).into(),
request_id,
status_code: None,
})
}
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
}
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
@@ -237,7 +215,6 @@ pub struct RemoteTable<S: HttpSend = Sender> {
version: RwLock<Option<u64>>,
location: RwLock<Option<String>>,
schema_cache: BackgroundCache<SchemaRef, Error>,
freshness: Mutex<FreshnessState>,
}
impl<S: HttpSend> std::fmt::Debug for RemoteTable<S> {
@@ -266,7 +243,6 @@ impl<S: HttpSend> RemoteTable<S> {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
@@ -276,56 +252,12 @@ impl<S: HttpSend> RemoteTable<S> {
}
async fn describe_version(&self, version: Option<u64>) -> Result<TableDescription> {
let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier));
self.describe_with_request(request, version).await
}
let mut request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
async fn resolve_tag_version_with_request(
&self,
tag: &str,
request: RequestBuilder,
) -> Result<u64> {
let request = request.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
match response.text().await {
Ok(body) => {
let value: serde_json::Value =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse tag version: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
})?;
value
.get("version")
.and_then(|v| v.as_u64())
.ok_or_else(|| Error::Http {
source: format!("Invalid tag version response: {}", body).into(),
request_id,
status_code: None,
})
}
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
}
async fn describe_with_request(
&self,
request: RequestBuilder,
version: Option<u64>,
) -> Result<TableDescription> {
let body = serde_json::json!({ "version": version });
let request = request.json(&body);
request = request.json(&body);
let (request_id, response) = self.send(request, true).await?;
@@ -779,41 +711,14 @@ impl<S: HttpSend> RemoteTable<S> {
*read_guard
}
/// Snapshot the freshness headers to attach to a single read request.
/// Computed at call time so that retries reuse the same snapshot.
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
let state = *self.freshness.lock().unwrap();
FreshnessHeaders {
min_timestamp: compute_min_timestamp(
&state,
self.client.read_consistency_interval,
SystemTime::now(),
),
}
}
/// Build a POST request and attach the `x-lancedb-min-timestamp` freshness
/// header.
fn post_read(&self, uri: &str) -> RequestBuilder {
self.snapshot_freshness_headers()
.apply(self.client.post(uri))
}
/// Record that this handle just performed a write, so the next read forces
/// the server cache past the write time (read-your-write on a single
/// handle).
fn bump_freshness_baseline(&self) {
let now = SystemTime::now();
let mut state = self.freshness.lock().unwrap();
state.freshness_baseline = Some(next_freshness_baseline(state.freshness_baseline, now));
}
async fn execute_query(
&self,
query: &AnyQuery,
options: &QueryExecutionOptions,
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier));
let mut request = self
.client
.post(&format!("/v1/table/{}/query/", self.identifier));
if let Some(timeout) = options.timeout {
// Also send to server, so it can abort the query if it takes too long.
@@ -919,10 +824,9 @@ async fn fetch_schema<S: HttpSend>(
identifier: &str,
table_name: &str,
version: Option<u64>,
freshness_headers: FreshnessHeaders,
) -> Result<SchemaRef> {
let request = freshness_headers
.apply(client.post(&format!("/v1/table/{}/describe/", identifier)))
let request = client
.post(&format!("/v1/table/{}/describe/", identifier))
.json(&serde_json::json!({ "version": version }));
let (request_id, response) = client.send_with_retry(request, None, true).await?;
@@ -970,9 +874,7 @@ mod test_utils {
use super::*;
use crate::remote::ClientConfig;
use crate::remote::client::test_utils::client_with_handler;
use crate::remote::client::test_utils::{
MockSender, client_with_handler_and_config, client_with_handler_and_interval,
};
use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config};
impl RemoteTable<MockSender> {
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
@@ -990,30 +892,6 @@ mod test_utils {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
pub fn new_mock_with_consistency_interval<F, T>(
name: String,
handler: F,
read_consistency_interval: Option<Duration>,
) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
{
let client = client_with_handler_and_interval(handler, read_consistency_interval);
Self {
client,
name: name.clone(),
namespace: vec![],
identifier: name,
server_version: ServerVersion::default(),
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
@@ -1045,7 +923,6 @@ mod test_utils {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
}
@@ -1109,7 +986,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite {
self.invalidate_schema_cache();
}
self.bump_freshness_baseline();
return Ok(add_result);
}
@@ -1147,7 +1023,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite {
self.invalidate_schema_cache();
}
self.bump_freshness_baseline();
return Ok(result);
}
Err(e) => {
@@ -1264,13 +1139,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.describe().await.map(|desc| desc.version)
}
async fn checkout(&self, version: u64) -> Result<()> {
// Validate the version exists. The describe is sent without freshness
// headers so a stale baseline from a previous write doesn't ride
// along on an explicit time-travel request.
let request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
self.describe_with_request(request, Some(version))
// check that the version exists
self.describe_version(Some(version))
.await
.map_err(|e| match e {
// try to map the error to a more user-friendly error telling them
@@ -1286,10 +1156,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = Some(version);
drop(write_guard);
// Explicit time-travel: drop any read-your-write / freshness
// constraints so the user sees exactly the requested version.
*self.freshness.lock().unwrap() = FreshnessState::default();
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1300,12 +1166,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = None;
drop(write_guard);
// Reset the freshness baseline to now so subsequent reads see at least
// the state as of this explicit `checkout_latest()`.
*self.freshness.lock().unwrap() = FreshnessState {
freshness_baseline: Some(SystemTime::now()),
};
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1326,7 +1186,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn list_versions(&self) -> Result<Vec<Version>> {
let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier));
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.identifier));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1359,25 +1221,19 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let client = self.client.clone();
let identifier = self.identifier.clone();
let table_name = self.name.clone();
let freshness_headers = self.snapshot_freshness_headers();
self.schema_cache
.get(move || async move {
fetch_schema(
&client,
&identifier,
&table_name,
version,
freshness_headers,
)
.await
fetch_schema(&client, &identifier, &table_name, version).await
})
.await
.map_err(unwrap_shared_error)
}
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
let mut request = self
.client
.post(&format!("/v1/table/{}/count_rows/", self.identifier));
let version = self.current_version().await;
@@ -1503,7 +1359,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier));
let base_request = self
.client
.post(&format!("/v1/table/{}/explain_plan/", self.identifier));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -1550,7 +1408,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<String> {
let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier));
let request = self
.client
.post(&format!("/v1/table/{}/analyze_plan/", self.identifier));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -1620,17 +1480,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})?;
self.bump_freshness_baseline();
Ok(update_response)
}
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
self.check_mutable().await?;
let predicate_sql = match predicate {
Predicate::String(s) => s.to_string(),
Predicate::Expr(expr) => expr_to_sql_string(expr)?,
};
let body = serde_json::json!({ "predicate": predicate_sql });
let body = serde_json::json!({ "predicate": predicate });
let request = self
.client
.post(&format!("/v1/table/{}/delete/", self.identifier))
@@ -1651,7 +1506,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request_id,
status_code: None,
})?;
self.bump_freshness_baseline();
Ok(delete_response)
}
@@ -1808,7 +1662,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})?;
self.bump_freshness_baseline();
Ok(merge_insert_response)
}
@@ -1834,22 +1687,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(Box::new(RemoteTags { inner: self }))
}
async fn checkout_tag(&self, tag: &str) -> Result<()> {
// Resolve the tag without attaching freshness headers; a stale
// baseline from a previous write should not ride along on an
// explicit time-travel request.
let request = self
.client
.post(&format!("/v1/table/{}/tags/version/", self.identifier));
let version = self.resolve_tag_version_with_request(tag, request).await?;
let tags = self.tags().await?;
let version = tags.get_version(tag).await?;
let mut write_guard = self.version.write().await;
*write_guard = Some(version);
drop(write_guard);
// Explicit time-travel: drop any read-your-write / freshness
// constraints so the user sees exactly the tagged version.
*self.freshness.lock().unwrap() = FreshnessState::default();
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1900,7 +1743,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.bump_freshness_baseline();
Ok(result)
}
@@ -1955,7 +1797,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.bump_freshness_baseline();
Ok(result)
}
@@ -1983,14 +1824,15 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.bump_freshness_baseline();
Ok(result)
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
// Make request to list the indices
let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier));
let mut request = self
.client
.post(&format!("/v1/table/{}/index/list/", self.identifier));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -2054,7 +1896,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
let mut request = self.post_read(&format!(
let mut request = self.client.post(&format!(
"/v1/table/{}/index/{}/stats/",
self.identifier, index_name
));
@@ -2166,7 +2008,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn stats(&self) -> Result<TableStatistics> {
let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier));
let request = self
.client
.post(&format!("/v1/table/{}/stats/", self.identifier));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -3007,33 +2851,6 @@ mod tests {
assert_eq!(result.version, if old_server { 0 } else { 43 });
}
#[tokio::test]
async fn test_delete_expr() {
use datafusion_expr::{col, lit};
let table = Table::new_with_handler("my_table", move |request| {
if request.url().path() == "/v1/table/my_table/delete/" {
assert_eq!(request.method(), "POST");
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
assert!(body.get("predicate").unwrap().is_string());
http::Response::builder()
.status(200)
.body(r#"{"num_deleted_rows": 4, "version": 2}"#)
.unwrap()
} else {
panic!("Unexpected request path: {}", request.url().path());
}
});
let expr = col("id").gt(lit(5));
let result = table.delete(&expr).await.unwrap();
assert_eq!(result.num_deleted_rows, 4);
assert_eq!(result.version, 2);
}
#[rstest]
#[case(true)]
#[case(false)]
@@ -6157,304 +5974,4 @@ mod tests {
assert_eq!(create_count.load(Ordering::SeqCst), 2);
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
}
// ---- Read freshness header tests ------------------------------------
#[test]
fn test_next_freshness_baseline_is_monotonic() {
let t100 = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
let t101 = SystemTime::UNIX_EPOCH + Duration::from_secs(101);
let t99 = SystemTime::UNIX_EPOCH + Duration::from_secs(99);
// No prior baseline -> take the current time.
assert_eq!(next_freshness_baseline(None, t100), t100);
// Clock moved forward -> advance to it.
assert_eq!(next_freshness_baseline(Some(t100), t101), t101);
// Clock moved backward (NTP step / resume) -> keep the higher prior
// baseline so read-your-write isn't lowered below an earlier write.
assert_eq!(next_freshness_baseline(Some(t100), t99), t100);
}
#[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now();
let baseline = now - Duration::from_secs(60);
// No interval, no baseline -> no header.
assert_eq!(
compute_min_timestamp(&FreshnessState::default(), None, now),
None
);
// Baseline only -> baseline.
let state = FreshnessState {
freshness_baseline: Some(baseline),
};
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
// ZERO interval, no baseline -> now.
assert_eq!(
compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now),
Some(now)
);
// Positive interval, no baseline -> now - interval.
assert_eq!(
compute_min_timestamp(
&FreshnessState::default(),
Some(Duration::from_secs(10)),
now
),
Some(now - Duration::from_secs(10))
);
// Both: pick the more-recent (i.e. tighter) constraint.
// baseline = now-60, now-interval = now-10. now-10 is newer.
let state = FreshnessState {
freshness_baseline: Some(baseline),
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both, baseline newer: pick baseline.
let recent_baseline = now - Duration::from_secs(5);
let state = FreshnessState {
freshness_baseline: Some(recent_baseline),
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
Some(recent_baseline)
);
}
/// Allowed slop when comparing a header timestamp against a locally
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1);
fn capturing_handler<F>(
body_for: F,
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<Option<http::HeaderMap>>>,
)
where
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
{
let captured = Arc::new(std::sync::Mutex::new(None));
let captured_c = captured.clone();
let handler = move |request: reqwest::Request| {
*captured_c.lock().unwrap() = Some(request.headers().clone());
let path = request.url().path().to_string();
http::Response::builder()
.status(200)
.body(body_for(&path))
.unwrap()
};
(handler, captured)
}
fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime {
let value = headers
.get("x-lancedb-min-timestamp")
.expect("expected x-lancedb-min-timestamp header")
.to_str()
.unwrap();
chrono::DateTime::parse_from_rfc3339(value)
.unwrap()
.with_timezone(&chrono::Utc)
.into()
}
#[tokio::test]
async fn test_freshness_default_sends_no_headers() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let table = Table::new_with_handler("my_table", handler);
let _ = table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
}
#[tokio::test]
async fn test_freshness_zero_interval_sends_now() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let table =
Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0)));
let before = SystemTime::now();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to wall clock"
);
}
#[tokio::test]
async fn test_freshness_positive_interval_sends_now_minus_interval() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let interval = Duration::from_secs(30);
let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval));
let before = SystemTime::now();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before - interval - FRESHNESS_TOLERANCE
&& sent <= after - interval + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to now - interval"
);
}
#[tokio::test]
async fn test_freshness_checkout_latest_sets_baseline() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
// No interval — only the baseline should drive the timestamp.
let table = Table::new_with_handler_and_interval("my_table", handler, None);
let before_checkout = SystemTime::now();
table.checkout_latest().await.unwrap();
let after_checkout = SystemTime::now();
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before_checkout - FRESHNESS_TOLERANCE
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
"expected timestamp captured at checkout_latest() time"
);
}
#[tokio::test]
async fn test_freshness_baseline_bumped_after_write() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
let before = SystemTime::now();
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
// The write bumped the freshness baseline, so the next read carries a
// min-timestamp at or after the write time (read-your-write).
let headers = captured.lock().unwrap().clone().unwrap();
let ts = parse_min_timestamp(&headers);
assert!(ts >= before - FRESHNESS_TOLERANCE);
assert!(ts <= after + FRESHNESS_TOLERANCE);
}
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
/// from every request so tests can assert on a specific endpoint.
#[allow(clippy::type_complexity)]
fn path_capturing_handler<F>(
body_for: F,
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>>,
)
where
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
{
let captured: Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>> =
Arc::new(std::sync::Mutex::new(HashMap::new()));
let captured_c = captured.clone();
let handler = move |request: reqwest::Request| {
let path = request.url().path().to_string();
captured_c
.lock()
.unwrap()
.insert(path.clone(), request.headers().clone());
http::Response::builder()
.status(200)
.body(body_for(&path))
.unwrap()
};
(handler, captured)
}
#[tokio::test]
async fn test_freshness_checkout_validation_sends_no_freshness_headers() {
// After a write bumps the baseline, calling checkout(v) must not let
// that stale freshness header ride along on the validating /describe/
// request.
let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout(5).await.unwrap();
let captured = captured.lock().unwrap();
let describe_headers = captured
.get("/v1/table/my_table/describe/")
.expect("describe should have been called by checkout(v)");
assert!(
!describe_headers.contains_key("x-lancedb-min-timestamp"),
"checkout(v) describe must not carry a stale freshness baseline",
);
}
#[tokio::test]
async fn test_freshness_checkout_tag_resolve_sends_no_freshness_headers() {
// Same invariant for checkout_tag: the tag-resolve request must not
// pick up a stale freshness baseline from a prior write.
let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout_tag("v_initial").await.unwrap();
let captured = captured.lock().unwrap();
let resolve_headers = captured
.get("/v1/table/my_table/tags/version/")
.expect("tags/version should have been called by checkout_tag");
assert!(
!resolve_headers.contains_key("x-lancedb-min-timestamp"),
"checkout_tag resolve must not carry a stale freshness baseline",
);
}
#[tokio::test]
async fn test_freshness_checkout_clears_baseline() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
// checkout(5) needs to describe version 5 first
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout(5).await.unwrap();
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
}
}

View File

@@ -253,36 +253,6 @@ pub enum Filter {
Datafusion(Expr),
}
/// A predicate for filtering rows in delete operations.
///
/// Accepts either a SQL string or a DataFusion [`Expr`]. Use the [`From`]
/// implementations to convert from `&str` or `&Expr` automatically.
/// See [`Table::delete`] for usage examples.
pub enum Predicate<'a> {
/// A SQL predicate string
String(&'a str),
/// A DataFusion logical expression
Expr(&'a Expr),
}
impl<'a> From<&'a str> for Predicate<'a> {
fn from(s: &'a str) -> Self {
Predicate::String(s)
}
}
impl<'a> From<&'a String> for Predicate<'a> {
fn from(s: &'a String) -> Self {
Predicate::String(s.as_str())
}
}
impl<'a> From<&'a Expr> for Predicate<'a> {
fn from(e: &'a Expr) -> Self {
Predicate::Expr(e)
}
}
#[async_trait]
pub trait Tags: Send + Sync {
/// List the tags of the table.
@@ -312,15 +282,17 @@ pub use self::merge::MergeResult;
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
/// `ShardWriter` configuration recorded in the MemWAL index).
///
/// All variants require the table to have an unenforced primary key.
///
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
/// onto the MemWAL writer is a follow-up.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum LsmWriteSpec {
/// Hash-bucket sharding by a scalar column.
/// Hash-bucket sharding by the unenforced primary key column.
///
/// `column` must be a non-nested column with a supported scalar type.
/// `num_buckets` must be in `[1, 1024]`.
/// `column` must equal the table's currently-set single-column
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
/// `bucket(column, num_buckets)` value is stable across processes.
Bucket {
@@ -519,8 +491,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
/// Add new records to the table.
async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
/// Delete rows from the table matching the given [`Predicate`].
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult>;
/// Delete rows from the table.
async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
/// Update rows in the table.
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
/// Create an index on the provided column(s).
@@ -684,30 +656,6 @@ mod test_utils {
}
}
pub fn new_with_handler_and_interval<T>(
name: impl Into<String>,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
read_consistency_interval: Option<std::time::Duration>,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
name.into(),
handler.clone(),
read_consistency_interval,
),
);
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
Self {
inner,
database: Some(database),
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version<T>(
name: impl Into<String>,
version: semver::Version,
@@ -912,8 +860,7 @@ impl Table {
/// Delete the rows from table that match the predicate.
///
/// # Arguments
/// - `predicate` - A SQL string (`&str`) or DataFusion expression (`&Expr`)
/// that selects the rows to delete.
/// - `predicate` - The SQL predicate string to filter the rows to be deleted.
///
/// # Example
///
@@ -922,7 +869,6 @@ impl Table {
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// use datafusion_expr::{col, lit};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
@@ -952,17 +898,11 @@ impl Table {
/// .execute()
/// .await
/// .unwrap();
///
/// // Using a SQL string:
/// tbl.delete("id > 5").await.unwrap();
///
/// // Using a DataFusion expression:
/// let expr = col("id").lt(lit(4));
/// tbl.delete(&expr).await.unwrap();
/// # });
/// ```
pub async fn delete(&self, predicate: impl Into<Predicate<'_>>) -> Result<DeleteResult> {
self.inner.delete(predicate.into()).await
pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
self.inner.delete(predicate).await
}
/// Create an index on the provided column(s).
@@ -1358,15 +1298,21 @@ impl Table {
///
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
///
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by a scalar column.
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
/// unenforced primary key.
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
///
/// All variants require the table to have an unenforced primary key
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
/// requires it to be the single column being bucketed.
///
/// # Example
///
/// ```
/// # use lancedb::table::{LsmWriteSpec, Table};
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
/// table.set_unenforced_primary_key(["id"]).await?;
/// table
/// .set_lsm_write_spec(
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
@@ -2831,7 +2777,8 @@ impl BaseTable for NativeTable {
}
/// Delete rows from the table
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
// Delegate to the submodule implementation
delete::execute_delete(self, predicate).await
}
@@ -4653,6 +4600,21 @@ mod tests {
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Reject when no PK is set.
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await
.expect_err("should reject without PK");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
// Set PK, then a mismatched column on the spec must be rejected.
table.set_unenforced_primary_key(["id"]).await.unwrap();
let err = table
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
.await
.expect_err("should reject column != PK");
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
// Reject num_buckets out of range.
for bad in [0u32, 1025] {
let err = table
@@ -4718,6 +4680,9 @@ mod tests {
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
// Lance's MemWAL still requires *some* unenforced primary key on
// the dataset; Unsharded just skips the per-row hashing step.
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::unsharded())
.await
@@ -4764,6 +4729,7 @@ mod tests {
.unwrap();
let table = conn.create_table("t", reader).execute().await.unwrap();
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(
LsmWriteSpec::identity("region")
@@ -4819,6 +4785,7 @@ mod tests {
table.unset_lsm_write_spec().await.unwrap_err();
// Install a spec, then unset it.
table.set_unenforced_primary_key(["id"]).await.unwrap();
table
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
.await

View File

@@ -982,105 +982,4 @@ mod tests {
table2.add(struct_batch).execute().await.unwrap();
assert_eq!(table2.count_rows(None).await.unwrap(), 2);
}
/// Regression test: appending `arrow.json` (PyArrow `pa.json_()`) data into a table
/// whose schema was created with `pa.json_()` (internally stored as `lance.json`, backed
/// by `LargeBinary`) must succeed without a schema-mismatch error.
///
/// Previously `build_field_exprs` would attempt a `Utf8 → LargeBinary` DataFusion cast,
/// which produced a field whose Arrow extension metadata still read `arrow.json` instead
/// of `lance.json`. Lance-core then rejected the append with
/// `"json vs large_binary" schema mismatch`.
///
/// PyArrow's `pa.json_()` may be backed by either `Utf8` or `LargeUtf8` depending on the
/// constructor used, so the test is parameterized over the input backing type.
#[rstest::rstest]
#[case::utf8(DataType::Utf8)]
#[case::large_utf8(DataType::LargeUtf8)]
#[tokio::test]
async fn test_add_arrow_json_into_lance_json_table(#[case] input_type: DataType) {
use arrow_array::{Array, cast::AsArray};
use lance_arrow::ARROW_EXT_NAME_KEY;
use lance_arrow::json::{ARROW_JSON_EXT_NAME, JSON_EXT_NAME};
// Build a table whose "data" column is lance.json (LargeBinary +
// ARROW:extension:name = "lance.json").
let lance_json_field = lance_arrow::json::json_field("data", true);
let table_schema = Arc::new(Schema::new(vec![lance_json_field]));
let db = connect("memory://").execute().await.unwrap();
let table = db
.create_empty_table("json_test", table_schema)
.execute()
.await
.unwrap();
// Sanity-check the stored schema.
let stored_field = table.schema().await.unwrap();
let data_field = stored_field.field_with_name("data").unwrap();
assert_eq!(data_field.data_type(), &DataType::LargeBinary);
assert_eq!(
data_field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|s| s.as_str()),
Some(JSON_EXT_NAME),
);
// Build an arrow.json input field (Utf8/LargeUtf8 + arrow.json extension).
// This is what PyArrow produces for pa.json_() arrays.
let arrow_json_metadata = std::collections::HashMap::from([(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
)]);
let arrow_json_field =
Field::new("data", input_type.clone(), true).with_metadata(arrow_json_metadata);
let arrow_json_schema = Arc::new(Schema::new(vec![arrow_json_field]));
let rows: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)];
let string_array: Arc<dyn arrow_array::Array> = match input_type {
DataType::Utf8 => Arc::new(arrow_array::StringArray::from(rows.clone())),
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(rows.clone())),
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
};
let batch = RecordBatch::try_new(arrow_json_schema, vec![string_array]).unwrap();
// This must not fail with a schema-mismatch error.
table.add(batch).execute().await.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), rows.len());
// A lance.json column is read back as Utf8 carrying arrow.json extension metadata.
let results: Vec<RecordBatch> = table
.query()
.select(Select::columns(&["data"]))
.execute()
.await
.unwrap()
.try_collect()
.await
.unwrap();
assert_eq!(results.len(), 1);
let batch = &results[0];
assert_eq!(batch.num_rows(), rows.len());
let json_col = batch.column(0);
assert_eq!(json_col.data_type(), &DataType::Utf8);
let json_strs = json_col.as_string::<i32>();
for (i, expected) in rows.iter().enumerate() {
match expected {
None => assert!(json_strs.is_null(i), "row {i} expected null"),
Some(raw) => {
assert!(!json_strs.is_null(i), "row {i} expected non-null");
let actual: serde_json::Value = serde_json::from_str(json_strs.value(i))
.expect("read-back JSON should be valid");
let expected: serde_json::Value =
serde_json::from_str(raw).expect("expected JSON should be valid");
assert_eq!(actual, expected, "row {i} JSON mismatch");
}
}
}
}
}

View File

@@ -13,7 +13,6 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
use datafusion_physical_plan::expressions::Column;
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
use lance_arrow::json::{is_arrow_json_field, is_json_field};
use crate::{Error, Result};
@@ -65,18 +64,6 @@ fn build_field_exprs(
let input_field = &input_fields[input_idx];
let input_expr = get_input_expr(input_idx);
// Special case: input is arrow.json (PyArrow pa.json_() extension type backed by
// Utf8/LargeUtf8) and the table field is lance.json (backed by LargeBinary).
// Lance-core's write path already handles the arrow.json → lance.json conversion
// (including JSONB encoding), so we pass the expression through unchanged and let
// lance-core deal with it. Attempting to cast Utf8 → LargeBinary here would
// produce a field whose metadata still identifies it as arrow.json, which then
// causes a schema-mismatch error inside lance-core.
if is_arrow_json_field(input_field) && is_json_field(table_field) {
result.push((input_expr, Arc::clone(input_field) as FieldRef));
continue;
}
let expr = match (input_field.data_type(), table_field.data_type()) {
// Both are structs: recurse into sub-fields to handle subschemas and casts.
(DataType::Struct(in_children), DataType::Struct(tbl_children))
@@ -631,75 +618,4 @@ mod tests {
.unwrap();
assert_eq!(a.values(), &[1, 3]);
}
/// `arrow.json` input (PyArrow `pa.json_()`, Utf8/LargeUtf8 + extension metadata) against a
/// `lance.json` table field (LargeBinary + extension metadata) must be passed through
/// without a cast so that lance-core can perform its own arrow.json → JSONB conversion.
///
/// Before the fix, `cast_to_table_schema` attempted a `Utf8 → LargeBinary` DataFusion
/// cast that preserved the wrong extension metadata, causing lance-core to reject the
/// batch with a "json vs large_binary" schema-mismatch error.
#[rstest::rstest]
#[case::utf8(DataType::Utf8)]
#[case::large_utf8(DataType::LargeUtf8)]
#[tokio::test]
async fn test_arrow_json_passthrough_to_lance_json(#[case] input_type: DataType) {
use lance_arrow::ARROW_EXT_NAME_KEY;
use lance_arrow::json::{ARROW_JSON_EXT_NAME, json_field};
// Build a table schema with a lance.json field (LargeBinary + lance.json metadata).
let lance_field = json_field("data", true);
let table_schema = Schema::new(vec![lance_field]);
// Build an input batch with an arrow.json field (Utf8/LargeUtf8 + arrow.json metadata).
let arrow_meta = std::collections::HashMap::from([(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
)]);
let arrow_field = Field::new("data", input_type.clone(), true).with_metadata(arrow_meta);
let input_schema = Arc::new(Schema::new(vec![arrow_field]));
let values = vec![Some(r#"{"x": 1}"#), None, Some(r#"{"y": 2}"#)];
let input_array: Arc<dyn arrow_array::Array> = match input_type {
DataType::Utf8 => Arc::new(StringArray::from(values)),
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(values)),
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
};
let input_batch = RecordBatch::try_new(input_schema, vec![input_array]).unwrap();
let plan = plan_from_batch(input_batch).await;
let projected = cast_to_table_schema(plan, &table_schema).unwrap();
// The projected schema's "data" field must carry arrow.json metadata
// (the input field), not be silently dropped or miscast.
let out_field = projected.schema().field_with_name("data").unwrap().clone();
assert_eq!(out_field.data_type(), &input_type);
assert_eq!(
out_field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|s| s.as_str()),
Some(ARROW_JSON_EXT_NAME),
"output field must still carry arrow.json metadata so lance-core can handle it"
);
// The data must flow through correctly (3 rows, no panic).
let result = collect(projected).await;
assert_eq!(result.num_rows(), 3);
let (v0, v2) = match input_type {
DataType::Utf8 => {
let col: &StringArray = result.column(0).as_any().downcast_ref().unwrap();
(col.value(0).to_string(), col.value(2).to_string())
}
DataType::LargeUtf8 => {
let col: &arrow_array::LargeStringArray =
result.column(0).as_any().downcast_ref().unwrap();
(col.value(0).to_string(), col.value(2).to_string())
}
_ => unreachable!(),
};
assert_eq!(v0, r#"{"x": 1}"#);
assert!(result.column(0).is_null(1));
assert_eq!(v2, r#"{"y": 2}"#);
}
}

View File

@@ -1,12 +1,9 @@
use std::sync::Arc;
use futures::FutureExt;
use lance::dataset::DeleteBuilder;
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use serde::{Deserialize, Serialize};
use super::{NativeTable, Predicate};
use super::NativeTable;
use crate::Result;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
@@ -24,39 +21,17 @@ pub struct DeleteResult {
/// Internal implementation of the delete logic
///
/// This logic was moved from NativeTable::delete to keep table.rs clean.
pub(crate) async fn execute_delete(
table: &NativeTable,
predicate: Predicate<'_>,
) -> Result<DeleteResult> {
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
table.dataset.ensure_mutable()?;
match predicate {
Predicate::String(s) => {
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(s).boxed().await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
Predicate::Expr(expr) => {
let dataset = table.dataset.get().await?;
let delete_result = DeleteBuilder::from_expr(Arc::clone(&dataset), expr.clone())
.execute()
.await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = delete_result.new_dataset.version().version;
table.dataset.update(
Arc::try_unwrap(delete_result.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
);
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
}
let mut dataset = (*table.dataset.get().await?).clone();
let delete_result = dataset.delete(predicate).boxed().await?;
let num_deleted_rows = delete_result.num_deleted_rows;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(DeleteResult {
num_deleted_rows,
version,
})
}
#[cfg(test)]
@@ -201,100 +176,4 @@ mod tests {
"Table version must increment after delete operation"
);
}
#[tokio::test]
async fn test_delete_expr() {
use datafusion_expr::{col, lit};
let conn = connect("memory://").execute().await.unwrap();
// 1. Create a table with values 0 to 9
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
)
.unwrap();
let table = conn
.create_table("test_delete_expr", batch)
.execute()
.await
.unwrap();
// 2. Verify initial state
assert_eq!(table.count_rows(None).await.unwrap(), 10);
let initial_version = table.version().await.unwrap();
// 3. Execute Delete with Expr (removes values > 5)
let expr = col("i").gt(lit(5));
table.delete(&expr).await.unwrap();
// 4. Verify results
assert_eq!(table.count_rows(None).await.unwrap(), 6); // 0, 1, 2, 3, 4, 5 remain
let current_version = table.version().await.unwrap();
assert!(
current_version > initial_version,
"Table version must increment after delete_expr operation"
);
// 5. Verify specific data consistency
let batches = table
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let batch = &batches[0];
let array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
// Ensure no value > 5 exists
for val in array.iter() {
assert!(val.unwrap() <= 5);
}
}
#[tokio::test]
async fn test_delete_expr_increments_version() {
use datafusion_expr::lit;
let conn = connect("memory://").execute().await.unwrap();
// Create a table with 5 rows
let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5])).unwrap();
let table = conn
.create_table("test_delete_expr_noop", batch)
.execute()
.await
.unwrap();
// Capture the initial state (Rows = 5, Version = 1)
let initial_rows = table.count_rows(None).await.unwrap();
let initial_version = table.version().await.unwrap();
assert_eq!(initial_rows, 5);
let expr = lit(false);
table.delete(&expr).await.unwrap();
// Rows should still be 5
let current_rows = table.count_rows(None).await.unwrap();
assert_eq!(
current_rows, initial_rows,
"Data should not change when predicate is false"
);
// version check
let current_version = table.version().await.unwrap();
assert!(
current_version > initial_version,
"Table version must increment after delete_expr operation"
);
}
}