mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-02 20:00:46 +00:00
Compare commits
1 Commits
drop-min-v
...
v0.30.0-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d468a0adb1 |
@@ -29,3 +29,7 @@ runs:
|
||||
args: ${{ inputs.args }}
|
||||
docker-options: "-e PIP_EXTRA_INDEX_URL='https://pypi.fury.io/lance-format/ https://pypi.fury.io/lancedb/'"
|
||||
working-directory: python
|
||||
- uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: windows-wheels
|
||||
path: python\target\wheels
|
||||
|
||||
110
.github/workflows/pypi-publish.yml
vendored
110
.github/workflows/pypi-publish.yml
vendored
@@ -8,9 +8,6 @@ on:
|
||||
# This should trigger a dry run (we skip the final publish step)
|
||||
paths:
|
||||
- .github/workflows/pypi-publish.yml
|
||||
- .github/workflows/build_linux_wheel/action.yml
|
||||
- .github/workflows/build_mac_wheel/action.yml
|
||||
- .github/workflows/build_windows_wheel/action.yml
|
||||
- Cargo.toml # Change in dependency frequently breaks builds
|
||||
- Cargo.lock
|
||||
|
||||
@@ -24,21 +21,32 @@ jobs:
|
||||
linux:
|
||||
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
|
||||
timeout-minutes: 60
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
strategy:
|
||||
matrix:
|
||||
config:
|
||||
- platform: x86_64
|
||||
manylinux: "2_17"
|
||||
extra_args: ""
|
||||
runner: ubuntu-22.04
|
||||
- platform: x86_64
|
||||
manylinux: "2_28"
|
||||
extra_args: "--features fp16kernels"
|
||||
runner: ubuntu-22.04
|
||||
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
|
||||
- platform: aarch64
|
||||
manylinux: "2_17"
|
||||
extra_args: ""
|
||||
# For successful fat LTO builds, we need a large runner to avoid OOM errors.
|
||||
runner: ubuntu-2404-8x-arm64
|
||||
- platform: aarch64
|
||||
manylinux: "2_28"
|
||||
extra_args: "--features fp16kernels"
|
||||
runner: ubuntu-2404-8x-arm64
|
||||
runs-on: ${{ matrix.config.runner }}
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -52,14 +60,15 @@ jobs:
|
||||
args: "--release --strip ${{ matrix.config.extra_args }}"
|
||||
arm-build: ${{ matrix.config.platform == 'aarch64' }}
|
||||
manylinux: ${{ matrix.config.manylinux }}
|
||||
- uses: actions/upload-artifact@v7
|
||||
- uses: ./.github/workflows/upload_wheel
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
with:
|
||||
name: wheels-linux-${{ matrix.config.platform }}-${{ matrix.config.manylinux }}
|
||||
path: target/wheels/lancedb-*.whl
|
||||
if-no-files-found: error
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
mac:
|
||||
timeout-minutes: 90
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
runs-on: ${{ matrix.config.runner }}
|
||||
strategy:
|
||||
matrix:
|
||||
@@ -69,7 +78,7 @@ jobs:
|
||||
env:
|
||||
MACOSX_DEPLOYMENT_TARGET: 10.15
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -81,21 +90,18 @@ jobs:
|
||||
with:
|
||||
python-minor-version: 10
|
||||
args: "--release --strip --target ${{ matrix.config.target }} --features fp16kernels"
|
||||
- uses: actions/upload-artifact@v7
|
||||
- uses: ./.github/workflows/upload_wheel
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
with:
|
||||
name: wheels-mac-${{ matrix.config.target }}
|
||||
path: target/wheels/lancedb-*.whl
|
||||
if-no-files-found: error
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
windows:
|
||||
timeout-minutes: 90
|
||||
timeout-minutes: 60
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
runs-on: windows-latest
|
||||
env:
|
||||
# link.exe is single-threaded and the long pole on Windows builds. Use
|
||||
# rustc's bundled lld-link instead.
|
||||
CARGO_TARGET_X86_64_PC_WINDOWS_MSVC_LINKER: rust-lld
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -107,70 +113,18 @@ jobs:
|
||||
with:
|
||||
python-minor-version: 10
|
||||
args: "--release --strip"
|
||||
- uses: actions/upload-artifact@v7
|
||||
vcpkg_token: ${{ secrets.VCPKG_GITHUB_PACKAGES }}
|
||||
- uses: ./.github/workflows/upload_wheel
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
with:
|
||||
name: wheels-windows
|
||||
path: target/wheels/lancedb-*.whl
|
||||
if-no-files-found: error
|
||||
publish:
|
||||
name: Publish wheels
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
needs: [linux, mac, windows]
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- name: Download wheel artifacts
|
||||
uses: actions/download-artifact@v8
|
||||
with:
|
||||
pattern: wheels-*
|
||||
path: target/wheels
|
||||
merge-multiple: true
|
||||
- name: List wheels
|
||||
run: ls -la target/wheels
|
||||
- name: Choose repo
|
||||
id: choose_repo
|
||||
run: |
|
||||
if [[ ${{ github.ref }} == *beta* ]]; then
|
||||
echo "repo=fury" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "repo=pypi" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
- name: Publish to Fury
|
||||
if: steps.choose_repo.outputs.repo == 'fury'
|
||||
env:
|
||||
FURY_TOKEN: ${{ secrets.FURY_TOKEN }}
|
||||
run: |
|
||||
shopt -s nullglob
|
||||
WHEELS=(target/wheels/lancedb-*.whl)
|
||||
if [[ ${#WHEELS[@]} -eq 0 ]]; then
|
||||
echo "No wheels found in target/wheels/" >&2
|
||||
exit 1
|
||||
fi
|
||||
for WHEEL in "${WHEELS[@]}"; do
|
||||
echo "Uploading $WHEEL to Fury"
|
||||
curl -f -F package=@"$WHEEL" "https://$FURY_TOKEN@push.fury.io/lancedb/"
|
||||
done
|
||||
# NOTE: pypa/gh-action-pypi-publish must be invoked directly from a
|
||||
# workflow file, not from inside a composite action. When called from a
|
||||
# composite, `github.action_repository` is empty (actions/runner#2473)
|
||||
# and the action falls back to `github.repository`, producing a bogus
|
||||
# `docker://ghcr.io/<repo>:<ref>` image reference that GHA tries to pull.
|
||||
- name: Publish to PyPI
|
||||
if: steps.choose_repo.outputs.repo == 'pypi'
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
packages-dir: target/wheels/
|
||||
fury_token: ${{ secrets.FURY_TOKEN }}
|
||||
gh-release:
|
||||
if: startsWith(github.ref, 'refs/tags/python-v')
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
@@ -233,13 +187,13 @@ jobs:
|
||||
report-failure:
|
||||
name: Report Workflow Failure
|
||||
runs-on: ubuntu-latest
|
||||
needs: [linux, mac, windows, publish]
|
||||
needs: [linux, mac, windows]
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
if: always() && failure() && startsWith(github.ref, 'refs/tags/python-v')
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@v4
|
||||
- uses: ./.github/actions/create-failure-issue
|
||||
with:
|
||||
job-results: ${{ toJSON(needs) }}
|
||||
|
||||
34
.github/workflows/upload_wheel/action.yml
vendored
Normal file
34
.github/workflows/upload_wheel/action.yml
vendored
Normal file
@@ -0,0 +1,34 @@
|
||||
name: upload-wheel
|
||||
|
||||
description: "Upload wheels to Pypi"
|
||||
inputs:
|
||||
fury_token:
|
||||
required: true
|
||||
description: "release token for the fury repo"
|
||||
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Choose repo
|
||||
shell: bash
|
||||
id: choose_repo
|
||||
run: |
|
||||
if [[ ${{ github.ref }} == *beta* ]]; then
|
||||
echo "repo=fury" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "repo=pypi" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
- name: Publish to Fury
|
||||
if: steps.choose_repo.outputs.repo == 'fury'
|
||||
shell: bash
|
||||
env:
|
||||
FURY_TOKEN: ${{ inputs.fury_token }}
|
||||
run: |
|
||||
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
|
||||
echo "Uploading $WHEEL to Fury"
|
||||
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
|
||||
- name: Publish to PyPI
|
||||
if: steps.choose_repo.outputs.repo == 'pypi'
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
with:
|
||||
packages-dir: target/wheels/
|
||||
11
AGENTS.md
11
AGENTS.md
@@ -37,13 +37,10 @@ Before committing changes, run formatting for every language you touched. At min
|
||||
and run targeted tests through `cd python && uv run ...`.
|
||||
* TypeScript changes: run the relevant `npm`/`pnpm` lint, format, build, and docs commands in `nodejs`.
|
||||
|
||||
Before creating a PR, the exact value passed to `gh pr create --title` must follow
|
||||
Conventional Commits, such as `fix: support nested field paths in native index creation`
|
||||
or `feat(python): add dataset multiprocessing support`. Do not use a plain natural
|
||||
language summary like `Support nested field paths in native index creation` as the PR
|
||||
title. The semantic-release check uses the PR title and body as the merge commit message,
|
||||
so a non-conventional PR title will fail CI. After creating a PR, read the remote PR title
|
||||
back and fix it immediately if it is not conventional.
|
||||
Before creating a PR, make sure the PR title follows Conventional Commits, such as
|
||||
`fix: support nested field paths in native index creation` or
|
||||
`feat(python): add dataset multiprocessing support`. The semantic-release check uses the
|
||||
PR title and body as the merge commit message, so a non-conventional PR title will fail CI.
|
||||
|
||||
## Coding tips
|
||||
|
||||
|
||||
140
Cargo.lock
generated
140
Cargo.lock
generated
@@ -3284,8 +3284,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.4",
|
||||
@@ -4506,8 +4506,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4525,7 +4525,6 @@ dependencies = [
|
||||
"async_cell",
|
||||
"aws-credential-types",
|
||||
"aws-sdk-dynamodb",
|
||||
"bitpacking",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -4552,11 +4551,9 @@ dependencies = [
|
||||
"lance-io",
|
||||
"lance-linalg",
|
||||
"lance-namespace",
|
||||
"lance-select",
|
||||
"lance-table",
|
||||
"lance-tokenizer",
|
||||
"log",
|
||||
"moka",
|
||||
"object_store",
|
||||
"permutation",
|
||||
"pin-project",
|
||||
@@ -4580,8 +4577,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4601,8 +4598,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4611,8 +4608,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4647,8 +4644,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4678,8 +4675,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4697,8 +4694,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4733,8 +4730,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4765,8 +4762,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4803,7 +4800,6 @@ dependencies = [
|
||||
"lance-file",
|
||||
"lance-io",
|
||||
"lance-linalg",
|
||||
"lance-select",
|
||||
"lance-table",
|
||||
"lance-tokenizer",
|
||||
"libm",
|
||||
@@ -4831,8 +4827,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4874,8 +4870,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4891,8 +4887,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -4904,8 +4900,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -4940,9 +4936,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-reqwest-client"
|
||||
version = "0.7.7"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99"
|
||||
checksum = "f65e31bdaa13e01dab6e7cf566da31df243c34a542f0d915d3601ec0e01e61d2"
|
||||
dependencies = [
|
||||
"reqwest 0.12.28",
|
||||
"serde",
|
||||
@@ -4952,25 +4948,10 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-select"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"deepsize",
|
||||
"itertools 0.13.0",
|
||||
"lance-core",
|
||||
"roaring",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4989,7 +4970,6 @@ dependencies = [
|
||||
"lance-core",
|
||||
"lance-file",
|
||||
"lance-io",
|
||||
"lance-select",
|
||||
"log",
|
||||
"object_store",
|
||||
"prost",
|
||||
@@ -5010,8 +4990,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -5022,8 +5002,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-tokenizer"
|
||||
version = "7.1.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.4#0c0b3e18c0a4c75bda1dd6ec9d6247ef75bd29d9"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
dependencies = [
|
||||
"jieba-rs",
|
||||
"lindera",
|
||||
@@ -5034,7 +5014,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.30.0-beta.1"
|
||||
version = "0.30.0-beta.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
@@ -5104,7 +5084,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"serial_test",
|
||||
"snafu 0.8.9",
|
||||
"tempfile",
|
||||
"test-log",
|
||||
@@ -5117,7 +5096,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.30.0-beta.1"
|
||||
version = "0.30.0-beta.0"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -5140,7 +5119,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.33.0-beta.1"
|
||||
version = "0.33.0-beta.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -8149,15 +8128,6 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scc"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
|
||||
dependencies = [
|
||||
"sdd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.29"
|
||||
@@ -8224,12 +8194,6 @@ dependencies = [
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sdd"
|
||||
version = "3.0.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
|
||||
|
||||
[[package]]
|
||||
name = "sec1"
|
||||
version = "0.3.0"
|
||||
@@ -8420,32 +8384,6 @@ dependencies = [
|
||||
"unsafe-libyaml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "911bd979bf1070a3f3aa7b691a3b3e9968f339ceeec89e08c280a8a22207a32f"
|
||||
dependencies = [
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"log",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
"scc",
|
||||
"serial_test_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serial_test_derive"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.6"
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.1.0-beta.4", default-features = false, "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.1.0-beta.4", default-features = false, "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.1.0-beta.4", default-features = false, "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.1.0-beta.4", "tag" = "v7.1.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
@@ -112,25 +112,25 @@ def fetch_remote_tags() -> List[TagInfo]:
|
||||
"api",
|
||||
"-X",
|
||||
"GET",
|
||||
f"repos/{LANCE_REPO}/releases",
|
||||
f"repos/{LANCE_REPO}/git/refs/tags",
|
||||
"--paginate",
|
||||
"--jq",
|
||||
".[].tag_name",
|
||||
"-F",
|
||||
"per_page=20",
|
||||
".[].ref",
|
||||
]
|
||||
)
|
||||
tags: List[TagInfo] = []
|
||||
for line in output.splitlines():
|
||||
tag = line.strip()
|
||||
if not tag.startswith("v"):
|
||||
ref = line.strip()
|
||||
if not ref.startswith("refs/tags/v"):
|
||||
continue
|
||||
tag = ref.split("refs/tags/")[-1]
|
||||
version = tag.lstrip("v")
|
||||
try:
|
||||
tags.append(TagInfo(tag=tag, version=version, semver=parse_semver(version)))
|
||||
except ValueError:
|
||||
continue
|
||||
if not tags:
|
||||
raise RuntimeError("No Lance releases could be parsed from GitHub API output")
|
||||
raise RuntimeError("No Lance tags could be parsed from GitHub API output")
|
||||
return tags
|
||||
|
||||
|
||||
|
||||
@@ -70,20 +70,16 @@ client used by manifest-enabled native connections.
|
||||
optional readConsistencyInterval: number;
|
||||
```
|
||||
|
||||
The interval, in seconds, at which to check for updates to the table
|
||||
from other processes. If None, then consistency is not checked. For
|
||||
performance reasons, this is the default. For strong consistency, set
|
||||
this to zero seconds. Then every read will check for updates from other
|
||||
processes. As a compromise, you can set this to a non-zero value for
|
||||
eventual consistency. If more than that interval has passed since the
|
||||
last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
(For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
updates to the table from other processes. If None, then consistency is not
|
||||
checked. For performance reasons, this is the default. For strong
|
||||
consistency, set this to zero seconds. Then every read will check for
|
||||
updates from other processes. As a compromise, you can set this to a
|
||||
non-zero value for eventual consistency. If more than that interval
|
||||
has passed since the last check, then the table will be checked for updates.
|
||||
Note: this consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
|
||||
***
|
||||
|
||||
### region?
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.1.0-beta.4</lance-core.version>
|
||||
<lance-core.version>7.0.0-beta.13</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -171,22 +171,18 @@ describe("given a connection", () => {
|
||||
|
||||
let manifestDir =
|
||||
tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions";
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
|
||||
table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], {
|
||||
enableV2ManifestPaths: true,
|
||||
})) as LocalTable;
|
||||
expect(await table.usesV2ManifestPaths()).toBe(true);
|
||||
manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions";
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
});
|
||||
|
||||
it("should be able to migrate tables to the V2 manifest paths", async () => {
|
||||
@@ -203,20 +199,16 @@ describe("given a connection", () => {
|
||||
|
||||
const manifestDir =
|
||||
tmpDir.name + "/test_manifest_path_migration.lance/_versions";
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d\.manifest$/);
|
||||
});
|
||||
|
||||
await table.migrateManifestPathsV2();
|
||||
expect(await table.usesV2ManifestPaths()).toBe(true);
|
||||
|
||||
readdirSync(manifestDir)
|
||||
.filter((f) => f.endsWith(".manifest"))
|
||||
.forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
readdirSync(manifestDir).forEach((file) => {
|
||||
expect(file).toMatch(/^\d{20}\.manifest$/);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.0-beta.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.0-beta.0",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -24,19 +24,15 @@ mod util;
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
/// The interval, in seconds, at which to check for updates to the table
|
||||
/// from other processes. If None, then consistency is not checked. For
|
||||
/// performance reasons, this is the default. For strong consistency, set
|
||||
/// this to zero seconds. Then every read will check for updates from other
|
||||
/// processes. As a compromise, you can set this to a non-zero value for
|
||||
/// eventual consistency. If more than that interval has passed since the
|
||||
/// last check, then the table will be checked for updates. Note: this
|
||||
/// consistency only applies to read operations. Write operations are
|
||||
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
/// updates to the table from other processes. If None, then consistency is not
|
||||
/// checked. For performance reasons, this is the default. For strong
|
||||
/// consistency, set this to zero seconds. Then every read will check for
|
||||
/// updates from other processes. As a compromise, you can set this to a
|
||||
/// non-zero value for eventual consistency. If more than that interval
|
||||
/// has passed since the last check, then the table will be checked for updates.
|
||||
/// Note: this consistency only applies to read operations. Write operations are
|
||||
/// always consistent.
|
||||
///
|
||||
/// Stronger consistency is not free. The smaller the interval, the more
|
||||
/// often each read pays the cost of checking for updates against object
|
||||
/// storage, raising per-read latency and cost.
|
||||
pub read_consistency_interval: Option<f64>,
|
||||
/// (For LanceDB OSS only): configuration for object storage.
|
||||
///
|
||||
|
||||
@@ -94,6 +94,7 @@ def connect(
|
||||
host_override: str, optional
|
||||
The override url for LanceDB Cloud.
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
@@ -103,10 +104,6 @@ def connect(
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
client_config: ClientConfig or dict, optional
|
||||
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
||||
the keys are the attributes of the ClientConfig class. If None, then the
|
||||
@@ -150,13 +147,6 @@ def connect(
|
||||
>>> db = lancedb.connect("s3://my-bucket/lancedb",
|
||||
... storage_options={"aws_access_key_id": "***"})
|
||||
|
||||
For tests and temporary data, use an in-memory database:
|
||||
|
||||
>>> db = lancedb.connect("memory://")
|
||||
|
||||
In-memory databases are not persisted. Tables are dropped when the last
|
||||
connection or table handle referencing them is closed.
|
||||
|
||||
Connect to LanceDB cloud:
|
||||
|
||||
>>> db = lancedb.connect("db://my_database", api_key="ldb_...",
|
||||
@@ -220,7 +210,6 @@ def connect(
|
||||
request_thread_pool=request_thread_pool,
|
||||
client_config=client_config,
|
||||
storage_options=storage_options,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
**kwargs,
|
||||
)
|
||||
_check_s3_bucket_with_dots(str(uri), storage_options)
|
||||
@@ -347,6 +336,7 @@ async def connect_async(
|
||||
host_override: str, optional
|
||||
The override url for LanceDB Cloud.
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
@@ -356,10 +346,6 @@ async def connect_async(
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
client_config: ClientConfig or dict, optional
|
||||
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
||||
the keys are the attributes of the ClientConfig class. If None, then the
|
||||
@@ -392,8 +378,6 @@ async def connect_async(
|
||||
... db = await lancedb.connect_async("s3://my-bucket/lancedb",
|
||||
... storage_options={
|
||||
... "aws_access_key_id": "***"})
|
||||
... # For tests and temporary data, use an in-memory database
|
||||
... db = await lancedb.connect_async("memory://")
|
||||
... # Connect to LanceDB cloud
|
||||
... db = await lancedb.connect_async("db://my_database", api_key="ldb_...",
|
||||
... client_config={
|
||||
|
||||
@@ -50,7 +50,6 @@ class RemoteDBConnection(DBConnection):
|
||||
connection_timeout: Optional[float] = None,
|
||||
read_timeout: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
):
|
||||
"""Connect to a remote LanceDB database."""
|
||||
if isinstance(client_config, dict):
|
||||
@@ -104,7 +103,6 @@ class RemoteDBConnection(DBConnection):
|
||||
host_override=host_override,
|
||||
client_config=client_config,
|
||||
storage_options=storage_options,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -102,15 +102,8 @@ class LinearCombinationReranker(Reranker):
|
||||
|
||||
combined_list = []
|
||||
for row_id, result in results.items():
|
||||
# Convert vector distance to a relevance score in [0, 1] where
|
||||
# higher is better. Missing vector entries are penalised with
|
||||
# `_invert_score(fill)` = 1 - fill (= 0.0 for the default fill=1).
|
||||
vector_score = self._invert_score(result.get("_distance", fill))
|
||||
# FTS scores (BM25) are already in a "higher = more relevant" space.
|
||||
# Missing FTS entries are penalised symmetrically: we use
|
||||
# `1 - fill` so that the same `fill` value drives both missing-vector
|
||||
# and missing-FTS penalties in the same direction.
|
||||
fts_score = result.get("_score", 1 - fill)
|
||||
fts_score = result.get("_score", fill)
|
||||
result["_relevance_score"] = self._combine_score(vector_score, fts_score)
|
||||
combined_list.append(result)
|
||||
|
||||
@@ -130,12 +123,8 @@ class LinearCombinationReranker(Reranker):
|
||||
return tbl
|
||||
|
||||
def _combine_score(self, vector_score, fts_score):
|
||||
# Both vector_score (inverted distance) and fts_score are in a
|
||||
# "higher = more relevant" space. A straight weighted average gives
|
||||
# higher _relevance_score to better matches, as expected.
|
||||
# Previously this returned `1 - (...)` which inverted the final
|
||||
# ranking so that the *least* relevant document ranked first.
|
||||
return self.weight * vector_score + (1 - self.weight) * fts_score
|
||||
# these scores represent distance
|
||||
return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score)
|
||||
|
||||
def _invert_score(self, dist: float):
|
||||
# Invert the score between relevance and distance
|
||||
|
||||
@@ -466,8 +466,7 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
|
||||
assert await tbl.uses_v2_manifest_paths()
|
||||
manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions"
|
||||
for manifest in os.listdir(manifests_dir):
|
||||
if manifest.endswith(".manifest"):
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
|
||||
# Start a table in V1 mode then migrate
|
||||
tbl = await db_no_v2_paths.create_table(
|
||||
@@ -477,15 +476,13 @@ async def test_create_table_v2_manifest_paths_async(tmp_path):
|
||||
assert not await tbl.uses_v2_manifest_paths()
|
||||
manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions"
|
||||
for manifest in os.listdir(manifests_dir):
|
||||
if manifest.endswith(".manifest"):
|
||||
assert re.match(r"\d\.manifest", manifest)
|
||||
assert re.match(r"\d\.manifest", manifest)
|
||||
|
||||
await tbl.migrate_manifest_paths_v2()
|
||||
assert await tbl.uses_v2_manifest_paths()
|
||||
|
||||
for manifest in os.listdir(manifests_dir):
|
||||
if manifest.endswith(".manifest"):
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
assert re.match(r"\d{20}\.manifest", manifest)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -40,6 +40,16 @@ def _make_table(tmp_path):
|
||||
def test_set_lsm_write_spec_validates(tmp_path):
|
||||
_db, table = _make_table(tmp_path)
|
||||
|
||||
# No PK set yet.
|
||||
with pytest.raises(Exception, match="primary key"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
|
||||
table.set_unenforced_primary_key("id")
|
||||
|
||||
# Column mismatch.
|
||||
with pytest.raises(Exception, match="match"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4))
|
||||
|
||||
# Out-of-range num_buckets.
|
||||
with pytest.raises(Exception, match="num_buckets"):
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0))
|
||||
@@ -60,6 +70,7 @@ def test_unset_lsm_write_spec(tmp_path):
|
||||
table.unset_lsm_write_spec()
|
||||
|
||||
# Install a spec, then remove it; afterwards a fresh spec can be set.
|
||||
table.set_unenforced_primary_key("id")
|
||||
table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4))
|
||||
table.unset_lsm_write_spec()
|
||||
# A second unset errors — there is no spec left to remove.
|
||||
|
||||
@@ -603,89 +603,3 @@ def test_cross_encoder_reranker_return_all(tmp_path):
|
||||
assert "_relevance_score" in result.column_names
|
||||
assert "_score" in result.column_names
|
||||
assert "_distance" in result.column_names
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression tests for LinearCombinationReranker scoring bugs (issue #3154)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_linear_combination_best_match_ranks_first():
|
||||
"""
|
||||
The document that is BOTH the closest vector match AND the only FTS match
|
||||
must rank first. Previously _combine_score subtracted from 1, inverting
|
||||
the ranking so the worst document ranked highest.
|
||||
"""
|
||||
reranker = LinearCombinationReranker(weight=0.7, return_score="all")
|
||||
|
||||
# rowid 0: perfect vector match, sole FTS match → should rank 1st
|
||||
# rowid 1: mediocre vector, no FTS match
|
||||
# rowid 2: bad vector, no FTS match
|
||||
vector_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0, 1, 2],
|
||||
"_distance": [0.0, 0.5, 0.9],
|
||||
}
|
||||
)
|
||||
fts_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0],
|
||||
"_score": [1.0],
|
||||
}
|
||||
)
|
||||
|
||||
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
|
||||
scores = dict(
|
||||
zip(
|
||||
combined["_rowid"].to_pylist(),
|
||||
combined["_relevance_score"].to_pylist(),
|
||||
)
|
||||
)
|
||||
|
||||
# rowid 0 must have the highest relevance score
|
||||
assert scores[0] > scores[1], (
|
||||
f"Best match (rowid 0, score={scores[0]:.4f}) should beat "
|
||||
f"mid match (rowid 1, score={scores[1]:.4f})"
|
||||
)
|
||||
assert scores[1] > scores[2], (
|
||||
f"Mid match (rowid 1, score={scores[1]:.4f}) should beat "
|
||||
f"bad match (rowid 2, score={scores[2]:.4f})"
|
||||
)
|
||||
|
||||
|
||||
def test_linear_combination_missing_fts_is_penalised():
|
||||
"""
|
||||
A document with no FTS match must score *lower* than a document that
|
||||
has a mediocre FTS match, everything else being equal. Previously
|
||||
missing-FTS entries used fill=1.0 directly, which gave them a reward
|
||||
(via the 1-(...) inversion) instead of a penalty.
|
||||
"""
|
||||
reranker = LinearCombinationReranker(weight=0.5, return_score="all")
|
||||
|
||||
vector_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0, 1],
|
||||
"_distance": [0.2, 0.2], # identical vector scores
|
||||
}
|
||||
)
|
||||
fts_results = pa.Table.from_pydict(
|
||||
{
|
||||
"_rowid": [0], # rowid 1 has no FTS match
|
||||
"_score": [0.3], # small FTS score
|
||||
}
|
||||
)
|
||||
|
||||
combined = reranker.merge_results(vector_results, fts_results, fill=1.0)
|
||||
scores = dict(
|
||||
zip(
|
||||
combined["_rowid"].to_pylist(),
|
||||
combined["_relevance_score"].to_pylist(),
|
||||
)
|
||||
)
|
||||
|
||||
# rowid 0 has a small FTS score; rowid 1 has none.
|
||||
# Even a small FTS contribution should beat having none at all.
|
||||
assert scores[0] > scores[1], (
|
||||
f"Document with FTS score (rowid 0, {scores[0]:.4f}) should beat "
|
||||
f"document with no FTS match (rowid 1, {scores[1]:.4f})"
|
||||
)
|
||||
|
||||
@@ -104,7 +104,6 @@ datafusion.workspace = true
|
||||
http-body = "1" # Matching reqwest
|
||||
rstest = "0.23.0"
|
||||
test-log = "0.2"
|
||||
serial_test = "3"
|
||||
|
||||
|
||||
[features]
|
||||
|
||||
@@ -812,7 +812,8 @@ impl ConnectBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// The interval at which to check for updates from other processes.
|
||||
/// The interval at which to check for updates from other processes. This
|
||||
/// only affects LanceDB OSS.
|
||||
///
|
||||
/// If left unset, consistency is not checked. For maximum read
|
||||
/// performance, this is the default. For strong consistency, set this to
|
||||
@@ -824,11 +825,8 @@ impl ConnectBuilder {
|
||||
/// This only affects read operations. Write operations are always
|
||||
/// consistent.
|
||||
///
|
||||
/// # Cost
|
||||
///
|
||||
/// Stronger consistency is not free. The smaller the interval, the more
|
||||
/// often each read pays the cost of checking for updates against object
|
||||
/// storage, raising per-read latency and cost.
|
||||
/// LanceDB Cloud uses eventual consistency under the hood, and is not
|
||||
/// currently configurable.
|
||||
pub fn read_consistency_interval(
|
||||
mut self,
|
||||
read_consistency_interval: std::time::Duration,
|
||||
@@ -888,7 +886,6 @@ impl ConnectBuilder {
|
||||
options.host_override,
|
||||
self.request.client_config,
|
||||
storage_options.into(),
|
||||
self.request.read_consistency_interval,
|
||||
)?);
|
||||
Ok(Connection {
|
||||
internal,
|
||||
|
||||
@@ -245,9 +245,6 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
|
||||
pub(crate) sender: S,
|
||||
pub(crate) id_delimiter: String,
|
||||
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
|
||||
/// Connection-level read consistency interval. Drives the
|
||||
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
|
||||
pub(crate) read_consistency_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
|
||||
@@ -341,7 +338,6 @@ impl RestfulLanceDbClient<Sender> {
|
||||
host_override: Option<String>,
|
||||
default_headers: HeaderMap,
|
||||
client_config: ClientConfig,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> Result<Self> {
|
||||
// Get the timeouts
|
||||
let timeout =
|
||||
@@ -439,7 +435,6 @@ impl RestfulLanceDbClient<Sender> {
|
||||
.clone()
|
||||
.unwrap_or("$".to_string()),
|
||||
header_provider: client_config.header_provider,
|
||||
read_consistency_interval,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -845,16 +840,6 @@ pub mod test_utils {
|
||||
pub fn client_with_handler<T>(
|
||||
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
|
||||
) -> RestfulLanceDbClient<MockSender>
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
client_with_handler_and_interval(handler, None)
|
||||
}
|
||||
|
||||
pub fn client_with_handler_and_interval<T>(
|
||||
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> RestfulLanceDbClient<MockSender>
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
@@ -872,7 +857,6 @@ pub mod test_utils {
|
||||
},
|
||||
id_delimiter: "$".to_string(),
|
||||
header_provider: None,
|
||||
read_consistency_interval,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -897,7 +881,6 @@ pub mod test_utils {
|
||||
},
|
||||
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
|
||||
header_provider: config.header_provider,
|
||||
read_consistency_interval: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -905,7 +888,6 @@ pub mod test_utils {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serial_test::serial;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
@@ -1064,7 +1046,6 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Apply dynamic headers
|
||||
@@ -1100,7 +1081,6 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Apply dynamic headers
|
||||
@@ -1138,7 +1118,6 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Header provider errors should fail the request
|
||||
@@ -1164,7 +1143,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_none() {
|
||||
let config = ClientConfig::default();
|
||||
// Clear env vars that might be set from other tests
|
||||
@@ -1177,7 +1155,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_from_env() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
@@ -1192,7 +1169,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_from_env_key() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
@@ -1213,7 +1189,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_direct_takes_precedence() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
@@ -1231,7 +1206,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial(user_id_env)]
|
||||
fn test_resolve_user_id_empty_env_ignored() {
|
||||
// SAFETY: This is only called in tests
|
||||
unsafe {
|
||||
|
||||
@@ -206,7 +206,6 @@ impl RemoteDatabase {
|
||||
host_override: Option<String>,
|
||||
client_config: ClientConfig,
|
||||
options: RemoteOptions,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Result<Self> {
|
||||
let parsed = super::client::parse_db_url(uri)?;
|
||||
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
|
||||
@@ -234,7 +233,6 @@ impl RemoteDatabase {
|
||||
host_override,
|
||||
header_map,
|
||||
client_config.clone(),
|
||||
read_consistency_interval,
|
||||
)?;
|
||||
|
||||
let table_cache = Cache::builder()
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::table::MergeResult;
|
||||
use crate::table::Tags;
|
||||
use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
|
||||
use crate::table::{AnyQuery, Filter, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{
|
||||
resolve_arrow_field_path, supported_btree_data_type, supported_vector_data_type,
|
||||
@@ -62,73 +62,15 @@ use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||
const INDEX_TYPE_KEY: &str = "index_type";
|
||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Per-table state driving the `x-lancedb-min-timestamp` freshness header
|
||||
/// sent on read requests.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessState {
|
||||
/// Wall-clock floor for read freshness, bumped to `now` whenever this
|
||||
/// handle performs a write or an explicit [`BaseTable::checkout_latest`].
|
||||
/// Subsequent reads send `max(baseline, now - read_consistency_interval)`
|
||||
/// as `x-lancedb-min-timestamp`.
|
||||
///
|
||||
/// This is what provides read-your-write on a single handle: after a write
|
||||
/// the next read forces the server cache past the write time. It also
|
||||
/// preserves the `checkout_latest()` signal when `read_consistency_interval`
|
||||
/// is unset (the default), where there is no interval-based floor.
|
||||
freshness_baseline: Option<SystemTime>,
|
||||
}
|
||||
|
||||
/// Snapshot of the headers that should be attached to a single read request.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessHeaders {
|
||||
min_timestamp: Option<SystemTime>,
|
||||
}
|
||||
|
||||
impl FreshnessHeaders {
|
||||
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
|
||||
if let Some(ts) = self.min_timestamp {
|
||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
||||
}
|
||||
request
|
||||
}
|
||||
}
|
||||
|
||||
/// Monotonic floor for the freshness baseline. `SystemTime` is not monotonic
|
||||
/// (NTP steps, hibernate/resume can move it backward), so a write must never
|
||||
/// lower the baseline below a prior write's — that would let the next read send
|
||||
/// a `min_timestamp` earlier than an earlier write and break read-your-write.
|
||||
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
|
||||
prev.map_or(now, |prev| prev.max(now))
|
||||
}
|
||||
|
||||
fn compute_min_timestamp(
|
||||
state: &FreshnessState,
|
||||
interval: Option<Duration>,
|
||||
now: SystemTime,
|
||||
) -> Option<SystemTime> {
|
||||
let interval_based = match interval {
|
||||
None => None,
|
||||
Some(d) if d.is_zero() => Some(now),
|
||||
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
||||
};
|
||||
match (interval_based, state.freshness_baseline) {
|
||||
(None, None) => None,
|
||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||
(Some(a), Some(b)) => Some(a.max(b)),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RemoteTags<'a, S: HttpSend = Sender> {
|
||||
inner: &'a RemoteTable<S>,
|
||||
}
|
||||
@@ -138,7 +80,8 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
|
||||
async fn list(&self) -> Result<HashMap<String, TagContents>> {
|
||||
let request = self
|
||||
.inner
|
||||
.post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
let response = self
|
||||
.inner
|
||||
@@ -169,13 +112,48 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
|
||||
}
|
||||
|
||||
async fn get_version(&self, tag: &str) -> Result<u64> {
|
||||
let request = self.inner.post_read(&format!(
|
||||
"/v1/table/{}/tags/version/",
|
||||
self.inner.identifier
|
||||
));
|
||||
self.inner
|
||||
.resolve_tag_version_with_request(tag, request)
|
||||
.await
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/tags/version/",
|
||||
self.inner.identifier
|
||||
))
|
||||
.json(&serde_json::json!({ "tag": tag }));
|
||||
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
let response = self
|
||||
.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
|
||||
match response.text().await {
|
||||
Ok(body) => {
|
||||
let value: serde_json::Value =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse tag version: {}", e).into(),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
value
|
||||
.get("version")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| Error::Http {
|
||||
source: format!("Invalid tag version response: {}", body).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
|
||||
@@ -237,7 +215,6 @@ pub struct RemoteTable<S: HttpSend = Sender> {
|
||||
version: RwLock<Option<u64>>,
|
||||
location: RwLock<Option<String>>,
|
||||
schema_cache: BackgroundCache<SchemaRef, Error>,
|
||||
freshness: Mutex<FreshnessState>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> std::fmt::Debug for RemoteTable<S> {
|
||||
@@ -266,7 +243,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,56 +252,12 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn describe_version(&self, version: Option<u64>) -> Result<TableDescription> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier));
|
||||
self.describe_with_request(request, version).await
|
||||
}
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/describe/", self.identifier));
|
||||
|
||||
async fn resolve_tag_version_with_request(
|
||||
&self,
|
||||
tag: &str,
|
||||
request: RequestBuilder,
|
||||
) -> Result<u64> {
|
||||
let request = request.json(&serde_json::json!({ "tag": tag }));
|
||||
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
match response.text().await {
|
||||
Ok(body) => {
|
||||
let value: serde_json::Value =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse tag version: {}", e).into(),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
value
|
||||
.get("version")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| Error::Http {
|
||||
source: format!("Invalid tag version response: {}", body).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn describe_with_request(
|
||||
&self,
|
||||
request: RequestBuilder,
|
||||
version: Option<u64>,
|
||||
) -> Result<TableDescription> {
|
||||
let body = serde_json::json!({ "version": version });
|
||||
let request = request.json(&body);
|
||||
request = request.json(&body);
|
||||
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
|
||||
@@ -779,41 +711,14 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
*read_guard
|
||||
}
|
||||
|
||||
/// Snapshot the freshness headers to attach to a single read request.
|
||||
/// Computed at call time so that retries reuse the same snapshot.
|
||||
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
|
||||
let state = *self.freshness.lock().unwrap();
|
||||
FreshnessHeaders {
|
||||
min_timestamp: compute_min_timestamp(
|
||||
&state,
|
||||
self.client.read_consistency_interval,
|
||||
SystemTime::now(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a POST request and attach the `x-lancedb-min-timestamp` freshness
|
||||
/// header.
|
||||
fn post_read(&self, uri: &str) -> RequestBuilder {
|
||||
self.snapshot_freshness_headers()
|
||||
.apply(self.client.post(uri))
|
||||
}
|
||||
|
||||
/// Record that this handle just performed a write, so the next read forces
|
||||
/// the server cache past the write time (read-your-write on a single
|
||||
/// handle).
|
||||
fn bump_freshness_baseline(&self) {
|
||||
let now = SystemTime::now();
|
||||
let mut state = self.freshness.lock().unwrap();
|
||||
state.freshness_baseline = Some(next_freshness_baseline(state.freshness_baseline, now));
|
||||
}
|
||||
|
||||
async fn execute_query(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
options: &QueryExecutionOptions,
|
||||
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier));
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/query/", self.identifier));
|
||||
|
||||
if let Some(timeout) = options.timeout {
|
||||
// Also send to server, so it can abort the query if it takes too long.
|
||||
@@ -919,10 +824,9 @@ async fn fetch_schema<S: HttpSend>(
|
||||
identifier: &str,
|
||||
table_name: &str,
|
||||
version: Option<u64>,
|
||||
freshness_headers: FreshnessHeaders,
|
||||
) -> Result<SchemaRef> {
|
||||
let request = freshness_headers
|
||||
.apply(client.post(&format!("/v1/table/{}/describe/", identifier)))
|
||||
let request = client
|
||||
.post(&format!("/v1/table/{}/describe/", identifier))
|
||||
.json(&serde_json::json!({ "version": version }));
|
||||
|
||||
let (request_id, response) = client.send_with_retry(request, None, true).await?;
|
||||
@@ -970,9 +874,7 @@ mod test_utils {
|
||||
use super::*;
|
||||
use crate::remote::ClientConfig;
|
||||
use crate::remote::client::test_utils::client_with_handler;
|
||||
use crate::remote::client::test_utils::{
|
||||
MockSender, client_with_handler_and_config, client_with_handler_and_interval,
|
||||
};
|
||||
use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config};
|
||||
|
||||
impl RemoteTable<MockSender> {
|
||||
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
|
||||
@@ -990,30 +892,6 @@ mod test_utils {
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_mock_with_consistency_interval<F, T>(
|
||||
name: String,
|
||||
handler: F,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> Self
|
||||
where
|
||||
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
let client = client_with_handler_and_interval(handler, read_consistency_interval);
|
||||
Self {
|
||||
client,
|
||||
name: name.clone(),
|
||||
namespace: vec![],
|
||||
identifier: name,
|
||||
server_version: ServerVersion::default(),
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1045,7 +923,6 @@ mod test_utils {
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1109,7 +986,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
return Ok(add_result);
|
||||
}
|
||||
@@ -1147,7 +1023,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
self.bump_freshness_baseline();
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1264,13 +1139,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
self.describe().await.map(|desc| desc.version)
|
||||
}
|
||||
async fn checkout(&self, version: u64) -> Result<()> {
|
||||
// Validate the version exists. The describe is sent without freshness
|
||||
// headers so a stale baseline from a previous write doesn't ride
|
||||
// along on an explicit time-travel request.
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/describe/", self.identifier));
|
||||
self.describe_with_request(request, Some(version))
|
||||
// check that the version exists
|
||||
self.describe_version(Some(version))
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
// try to map the error to a more user-friendly error telling them
|
||||
@@ -1286,10 +1156,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = Some(version);
|
||||
drop(write_guard);
|
||||
|
||||
// Explicit time-travel: drop any read-your-write / freshness
|
||||
// constraints so the user sees exactly the requested version.
|
||||
*self.freshness.lock().unwrap() = FreshnessState::default();
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
self.invalidate_schema_cache();
|
||||
|
||||
@@ -1300,12 +1166,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = None;
|
||||
drop(write_guard);
|
||||
|
||||
// Reset the freshness baseline to now so subsequent reads see at least
|
||||
// the state as of this explicit `checkout_latest()`.
|
||||
*self.freshness.lock().unwrap() = FreshnessState {
|
||||
freshness_baseline: Some(SystemTime::now()),
|
||||
};
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
self.invalidate_schema_cache();
|
||||
|
||||
@@ -1326,7 +1186,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn list_versions(&self) -> Result<Vec<Version>> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier));
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/version/list/", self.identifier));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
@@ -1359,25 +1221,19 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
let client = self.client.clone();
|
||||
let identifier = self.identifier.clone();
|
||||
let table_name = self.name.clone();
|
||||
let freshness_headers = self.snapshot_freshness_headers();
|
||||
|
||||
self.schema_cache
|
||||
.get(move || async move {
|
||||
fetch_schema(
|
||||
&client,
|
||||
&identifier,
|
||||
&table_name,
|
||||
version,
|
||||
freshness_headers,
|
||||
)
|
||||
.await
|
||||
fetch_schema(&client, &identifier, &table_name, version).await
|
||||
})
|
||||
.await
|
||||
.map_err(unwrap_shared_error)
|
||||
}
|
||||
|
||||
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
|
||||
let version = self.current_version().await;
|
||||
|
||||
@@ -1503,7 +1359,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
|
||||
let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier));
|
||||
let base_request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/explain_plan/", self.identifier));
|
||||
|
||||
let query_bodies = self.prepare_query_bodies(query).await?;
|
||||
let requests: Vec<reqwest::RequestBuilder> = query_bodies
|
||||
@@ -1550,7 +1408,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
query: &AnyQuery,
|
||||
_options: QueryExecutionOptions,
|
||||
) -> Result<String> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier));
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/analyze_plan/", self.identifier));
|
||||
|
||||
let query_bodies = self.prepare_query_bodies(query).await?;
|
||||
let requests: Vec<reqwest::RequestBuilder> = query_bodies
|
||||
@@ -1620,17 +1480,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.bump_freshness_baseline();
|
||||
Ok(update_response)
|
||||
}
|
||||
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
self.check_mutable().await?;
|
||||
let predicate_sql = match predicate {
|
||||
Predicate::String(s) => s.to_string(),
|
||||
Predicate::Expr(expr) => expr_to_sql_string(expr)?,
|
||||
};
|
||||
let body = serde_json::json!({ "predicate": predicate_sql });
|
||||
let body = serde_json::json!({ "predicate": predicate });
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/delete/", self.identifier))
|
||||
@@ -1651,7 +1506,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
self.bump_freshness_baseline();
|
||||
Ok(delete_response)
|
||||
}
|
||||
|
||||
@@ -1808,7 +1662,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.bump_freshness_baseline();
|
||||
Ok(merge_insert_response)
|
||||
}
|
||||
|
||||
@@ -1834,22 +1687,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||
// Resolve the tag without attaching freshness headers; a stale
|
||||
// baseline from a previous write should not ride along on an
|
||||
// explicit time-travel request.
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/version/", self.identifier));
|
||||
let version = self.resolve_tag_version_with_request(tag, request).await?;
|
||||
|
||||
let tags = self.tags().await?;
|
||||
let version = tags.get_version(tag).await?;
|
||||
let mut write_guard = self.version.write().await;
|
||||
*write_guard = Some(version);
|
||||
drop(write_guard);
|
||||
|
||||
// Explicit time-travel: drop any read-your-write / freshness
|
||||
// constraints so the user sees exactly the tagged version.
|
||||
*self.freshness.lock().unwrap() = FreshnessState::default();
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
self.invalidate_schema_cache();
|
||||
|
||||
@@ -1900,7 +1743,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1955,7 +1797,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1983,14 +1824,15 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||
// Make request to list the indices
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier));
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/index/list/", self.identifier));
|
||||
let version = self.current_version().await;
|
||||
let body = serde_json::json!({ "version": version });
|
||||
request = request.json(&body);
|
||||
@@ -2054,7 +1896,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
|
||||
let mut request = self.post_read(&format!(
|
||||
let mut request = self.client.post(&format!(
|
||||
"/v1/table/{}/index/{}/stats/",
|
||||
self.identifier, index_name
|
||||
));
|
||||
@@ -2166,7 +2008,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn stats(&self) -> Result<TableStatistics> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier));
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/stats/", self.identifier));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
@@ -3007,33 +2851,6 @@ mod tests {
|
||||
assert_eq!(result.version, if old_server { 0 } else { 43 });
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_expr() {
|
||||
use datafusion_expr::{col, lit};
|
||||
|
||||
let table = Table::new_with_handler("my_table", move |request| {
|
||||
if request.url().path() == "/v1/table/my_table/delete/" {
|
||||
assert_eq!(request.method(), "POST");
|
||||
|
||||
let body = request.body().unwrap().as_bytes().unwrap();
|
||||
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
|
||||
assert!(body.get("predicate").unwrap().is_string());
|
||||
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"num_deleted_rows": 4, "version": 2}"#)
|
||||
.unwrap()
|
||||
} else {
|
||||
panic!("Unexpected request path: {}", request.url().path());
|
||||
}
|
||||
});
|
||||
|
||||
let expr = col("id").gt(lit(5));
|
||||
let result = table.delete(&expr).await.unwrap();
|
||||
assert_eq!(result.num_deleted_rows, 4);
|
||||
assert_eq!(result.version, 2);
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case(true)]
|
||||
#[case(false)]
|
||||
@@ -6157,304 +5974,4 @@ mod tests {
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
// ---- Read freshness header tests ------------------------------------
|
||||
|
||||
#[test]
|
||||
fn test_next_freshness_baseline_is_monotonic() {
|
||||
let t100 = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
|
||||
let t101 = SystemTime::UNIX_EPOCH + Duration::from_secs(101);
|
||||
let t99 = SystemTime::UNIX_EPOCH + Duration::from_secs(99);
|
||||
|
||||
// No prior baseline -> take the current time.
|
||||
assert_eq!(next_freshness_baseline(None, t100), t100);
|
||||
// Clock moved forward -> advance to it.
|
||||
assert_eq!(next_freshness_baseline(Some(t100), t101), t101);
|
||||
// Clock moved backward (NTP step / resume) -> keep the higher prior
|
||||
// baseline so read-your-write isn't lowered below an earlier write.
|
||||
assert_eq!(next_freshness_baseline(Some(t100), t99), t100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
||||
let now = SystemTime::now();
|
||||
let baseline = now - Duration::from_secs(60);
|
||||
|
||||
// No interval, no baseline -> no header.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&FreshnessState::default(), None, now),
|
||||
None
|
||||
);
|
||||
|
||||
// Baseline only -> baseline.
|
||||
let state = FreshnessState {
|
||||
freshness_baseline: Some(baseline),
|
||||
};
|
||||
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
||||
|
||||
// ZERO interval, no baseline -> now.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now),
|
||||
Some(now)
|
||||
);
|
||||
|
||||
// Positive interval, no baseline -> now - interval.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(
|
||||
&FreshnessState::default(),
|
||||
Some(Duration::from_secs(10)),
|
||||
now
|
||||
),
|
||||
Some(now - Duration::from_secs(10))
|
||||
);
|
||||
|
||||
// Both: pick the more-recent (i.e. tighter) constraint.
|
||||
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
||||
let state = FreshnessState {
|
||||
freshness_baseline: Some(baseline),
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
||||
Some(now - Duration::from_secs(10))
|
||||
);
|
||||
|
||||
// Both, baseline newer: pick baseline.
|
||||
let recent_baseline = now - Duration::from_secs(5);
|
||||
let state = FreshnessState {
|
||||
freshness_baseline: Some(recent_baseline),
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
||||
Some(recent_baseline)
|
||||
);
|
||||
}
|
||||
|
||||
/// Allowed slop when comparing a header timestamp against a locally
|
||||
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
|
||||
const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1);
|
||||
|
||||
fn capturing_handler<F>(
|
||||
body_for: F,
|
||||
) -> (
|
||||
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
|
||||
Arc<std::sync::Mutex<Option<http::HeaderMap>>>,
|
||||
)
|
||||
where
|
||||
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
|
||||
{
|
||||
let captured = Arc::new(std::sync::Mutex::new(None));
|
||||
let captured_c = captured.clone();
|
||||
let handler = move |request: reqwest::Request| {
|
||||
*captured_c.lock().unwrap() = Some(request.headers().clone());
|
||||
let path = request.url().path().to_string();
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(body_for(&path))
|
||||
.unwrap()
|
||||
};
|
||||
(handler, captured)
|
||||
}
|
||||
|
||||
fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime {
|
||||
let value = headers
|
||||
.get("x-lancedb-min-timestamp")
|
||||
.expect("expected x-lancedb-min-timestamp header")
|
||||
.to_str()
|
||||
.unwrap();
|
||||
chrono::DateTime::parse_from_rfc3339(value)
|
||||
.unwrap()
|
||||
.with_timezone(&chrono::Utc)
|
||||
.into()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_default_sends_no_headers() {
|
||||
let (handler, captured) = capturing_handler(|_| "42".to_string());
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
let _ = table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_zero_interval_sends_now() {
|
||||
let (handler, captured) = capturing_handler(|_| "42".to_string());
|
||||
let table =
|
||||
Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0)));
|
||||
|
||||
let before = SystemTime::now();
|
||||
table.count_rows(None).await.unwrap();
|
||||
let after = SystemTime::now();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let sent = parse_min_timestamp(&headers);
|
||||
assert!(
|
||||
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp roughly equal to wall clock"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_positive_interval_sends_now_minus_interval() {
|
||||
let (handler, captured) = capturing_handler(|_| "42".to_string());
|
||||
let interval = Duration::from_secs(30);
|
||||
let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval));
|
||||
|
||||
let before = SystemTime::now();
|
||||
table.count_rows(None).await.unwrap();
|
||||
let after = SystemTime::now();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let sent = parse_min_timestamp(&headers);
|
||||
assert!(
|
||||
sent >= before - interval - FRESHNESS_TOLERANCE
|
||||
&& sent <= after - interval + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp roughly equal to now - interval"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_latest_sets_baseline() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
// No interval — only the baseline should drive the timestamp.
|
||||
let table = Table::new_with_handler_and_interval("my_table", handler, None);
|
||||
|
||||
let before_checkout = SystemTime::now();
|
||||
table.checkout_latest().await.unwrap();
|
||||
let after_checkout = SystemTime::now();
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let sent = parse_min_timestamp(&headers);
|
||||
assert!(
|
||||
sent >= before_checkout - FRESHNESS_TOLERANCE
|
||||
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp captured at checkout_latest() time"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_baseline_bumped_after_write() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
let before = SystemTime::now();
|
||||
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
let after = SystemTime::now();
|
||||
|
||||
// The write bumped the freshness baseline, so the next read carries a
|
||||
// min-timestamp at or after the write time (read-your-write).
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let ts = parse_min_timestamp(&headers);
|
||||
assert!(ts >= before - FRESHNESS_TOLERANCE);
|
||||
assert!(ts <= after + FRESHNESS_TOLERANCE);
|
||||
}
|
||||
|
||||
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
||||
/// from every request so tests can assert on a specific endpoint.
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn path_capturing_handler<F>(
|
||||
body_for: F,
|
||||
) -> (
|
||||
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
|
||||
Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>>,
|
||||
)
|
||||
where
|
||||
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
|
||||
{
|
||||
let captured: Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>> =
|
||||
Arc::new(std::sync::Mutex::new(HashMap::new()));
|
||||
let captured_c = captured.clone();
|
||||
let handler = move |request: reqwest::Request| {
|
||||
let path = request.url().path().to_string();
|
||||
captured_c
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(path.clone(), request.headers().clone());
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(body_for(&path))
|
||||
.unwrap()
|
||||
};
|
||||
(handler, captured)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_validation_sends_no_freshness_headers() {
|
||||
// After a write bumps the baseline, calling checkout(v) must not let
|
||||
// that stale freshness header ride along on the validating /describe/
|
||||
// request.
|
||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.checkout(5).await.unwrap();
|
||||
|
||||
let captured = captured.lock().unwrap();
|
||||
let describe_headers = captured
|
||||
.get("/v1/table/my_table/describe/")
|
||||
.expect("describe should have been called by checkout(v)");
|
||||
assert!(
|
||||
!describe_headers.contains_key("x-lancedb-min-timestamp"),
|
||||
"checkout(v) describe must not carry a stale freshness baseline",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_tag_resolve_sends_no_freshness_headers() {
|
||||
// Same invariant for checkout_tag: the tag-resolve request must not
|
||||
// pick up a stale freshness baseline from a prior write.
|
||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.checkout_tag("v_initial").await.unwrap();
|
||||
|
||||
let captured = captured.lock().unwrap();
|
||||
let resolve_headers = captured
|
||||
.get("/v1/table/my_table/tags/version/")
|
||||
.expect("tags/version should have been called by checkout_tag");
|
||||
assert!(
|
||||
!resolve_headers.contains_key("x-lancedb-min-timestamp"),
|
||||
"checkout_tag resolve must not carry a stale freshness baseline",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_clears_baseline() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
// checkout(5) needs to describe version 5 first
|
||||
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.checkout(5).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,36 +253,6 @@ pub enum Filter {
|
||||
Datafusion(Expr),
|
||||
}
|
||||
|
||||
/// A predicate for filtering rows in delete operations.
|
||||
///
|
||||
/// Accepts either a SQL string or a DataFusion [`Expr`]. Use the [`From`]
|
||||
/// implementations to convert from `&str` or `&Expr` automatically.
|
||||
/// See [`Table::delete`] for usage examples.
|
||||
pub enum Predicate<'a> {
|
||||
/// A SQL predicate string
|
||||
String(&'a str),
|
||||
/// A DataFusion logical expression
|
||||
Expr(&'a Expr),
|
||||
}
|
||||
|
||||
impl<'a> From<&'a str> for Predicate<'a> {
|
||||
fn from(s: &'a str) -> Self {
|
||||
Predicate::String(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a String> for Predicate<'a> {
|
||||
fn from(s: &'a String) -> Self {
|
||||
Predicate::String(s.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Expr> for Predicate<'a> {
|
||||
fn from(e: &'a Expr) -> Self {
|
||||
Predicate::Expr(e)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tags: Send + Sync {
|
||||
/// List the tags of the table.
|
||||
@@ -312,15 +282,17 @@ pub use self::merge::MergeResult;
|
||||
/// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default
|
||||
/// `ShardWriter` configuration recorded in the MemWAL index).
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key.
|
||||
///
|
||||
/// Install a spec with [`Table::set_lsm_write_spec`] and remove it with
|
||||
/// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch
|
||||
/// onto the MemWAL writer is a follow-up.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum LsmWriteSpec {
|
||||
/// Hash-bucket sharding by a scalar column.
|
||||
/// Hash-bucket sharding by the unenforced primary key column.
|
||||
///
|
||||
/// `column` must be a non-nested column with a supported scalar type.
|
||||
/// `num_buckets` must be in `[1, 1024]`.
|
||||
/// `column` must equal the table's currently-set single-column
|
||||
/// unenforced primary key. `num_buckets` must be in `[1, 1024]`.
|
||||
/// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's
|
||||
/// `bucket(column, num_buckets)` value is stable across processes.
|
||||
Bucket {
|
||||
@@ -519,8 +491,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
|
||||
/// Add new records to the table.
|
||||
async fn add(&self, add: AddDataBuilder) -> Result<AddResult>;
|
||||
/// Delete rows from the table matching the given [`Predicate`].
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult>;
|
||||
/// Delete rows from the table.
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult>;
|
||||
/// Update rows in the table.
|
||||
async fn update(&self, update: UpdateBuilder) -> Result<UpdateResult>;
|
||||
/// Create an index on the provided column(s).
|
||||
@@ -684,30 +656,6 @@ mod test_utils {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_and_interval<T>(
|
||||
name: impl Into<String>,
|
||||
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Self
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
let inner = Arc::new(
|
||||
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
|
||||
name.into(),
|
||||
handler.clone(),
|
||||
read_consistency_interval,
|
||||
),
|
||||
);
|
||||
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
|
||||
Self {
|
||||
inner,
|
||||
database: Some(database),
|
||||
// Registry is unused.
|
||||
embedding_registry: Arc::new(MemoryRegistry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_version<T>(
|
||||
name: impl Into<String>,
|
||||
version: semver::Version,
|
||||
@@ -912,8 +860,7 @@ impl Table {
|
||||
/// Delete the rows from table that match the predicate.
|
||||
///
|
||||
/// # Arguments
|
||||
/// - `predicate` - A SQL string (`&str`) or DataFusion expression (`&Expr`)
|
||||
/// that selects the rows to delete.
|
||||
/// - `predicate` - The SQL predicate string to filter the rows to be deleted.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -922,7 +869,6 @@ impl Table {
|
||||
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
|
||||
/// # RecordBatchIterator, Int32Array};
|
||||
/// # use arrow_schema::{Schema, Field, DataType};
|
||||
/// use datafusion_expr::{col, lit};
|
||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
/// let tmpdir = tempfile::tempdir().unwrap();
|
||||
/// let db = lancedb::connect(tmpdir.path().to_str().unwrap())
|
||||
@@ -952,17 +898,11 @@ impl Table {
|
||||
/// .execute()
|
||||
/// .await
|
||||
/// .unwrap();
|
||||
///
|
||||
/// // Using a SQL string:
|
||||
/// tbl.delete("id > 5").await.unwrap();
|
||||
///
|
||||
/// // Using a DataFusion expression:
|
||||
/// let expr = col("id").lt(lit(4));
|
||||
/// tbl.delete(&expr).await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
pub async fn delete(&self, predicate: impl Into<Predicate<'_>>) -> Result<DeleteResult> {
|
||||
self.inner.delete(predicate.into()).await
|
||||
pub async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
self.inner.delete(predicate).await
|
||||
}
|
||||
|
||||
/// Create an index on the provided column(s).
|
||||
@@ -1358,15 +1298,21 @@ impl Table {
|
||||
///
|
||||
/// [`LsmWriteSpec`] chooses one of three sharding strategies:
|
||||
///
|
||||
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by a scalar column.
|
||||
/// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column
|
||||
/// unenforced primary key.
|
||||
/// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column.
|
||||
/// - [`LsmWriteSpec::unsharded`] — route every write to a single shard.
|
||||
///
|
||||
/// All variants require the table to have an unenforced primary key
|
||||
/// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally
|
||||
/// requires it to be the single column being bucketed.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::table::{LsmWriteSpec, Table};
|
||||
/// # async fn example(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// table.set_unenforced_primary_key(["id"]).await?;
|
||||
/// table
|
||||
/// .set_lsm_write_spec(
|
||||
/// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]),
|
||||
@@ -2831,7 +2777,8 @@ impl BaseTable for NativeTable {
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
async fn delete(&self, predicate: Predicate<'_>) -> Result<DeleteResult> {
|
||||
async fn delete(&self, predicate: &str) -> Result<DeleteResult> {
|
||||
// Delegate to the submodule implementation
|
||||
delete::execute_delete(self, predicate).await
|
||||
}
|
||||
|
||||
@@ -4653,6 +4600,21 @@ mod tests {
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Reject when no PK is set.
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
.expect_err("should reject without PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Set PK, then a mismatched column on the spec must be rejected.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
let err = table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("name", 4))
|
||||
.await
|
||||
.expect_err("should reject column != PK");
|
||||
assert!(matches!(err, Error::Lance { .. }), "got {:?}", err);
|
||||
|
||||
// Reject num_buckets out of range.
|
||||
for bad in [0u32, 1025] {
|
||||
let err = table
|
||||
@@ -4718,6 +4680,9 @@ mod tests {
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
// Lance's MemWAL still requires *some* unenforced primary key on
|
||||
// the dataset; Unsharded just skips the per-row hashing step.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::unsharded())
|
||||
.await
|
||||
@@ -4764,6 +4729,7 @@ mod tests {
|
||||
.unwrap();
|
||||
let table = conn.create_table("t", reader).execute().await.unwrap();
|
||||
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(
|
||||
LsmWriteSpec::identity("region")
|
||||
@@ -4819,6 +4785,7 @@ mod tests {
|
||||
table.unset_lsm_write_spec().await.unwrap_err();
|
||||
|
||||
// Install a spec, then unset it.
|
||||
table.set_unenforced_primary_key(["id"]).await.unwrap();
|
||||
table
|
||||
.set_lsm_write_spec(LsmWriteSpec::bucket("id", 4))
|
||||
.await
|
||||
|
||||
@@ -982,105 +982,4 @@ mod tests {
|
||||
table2.add(struct_batch).execute().await.unwrap();
|
||||
assert_eq!(table2.count_rows(None).await.unwrap(), 2);
|
||||
}
|
||||
|
||||
/// Regression test: appending `arrow.json` (PyArrow `pa.json_()`) data into a table
|
||||
/// whose schema was created with `pa.json_()` (internally stored as `lance.json`, backed
|
||||
/// by `LargeBinary`) must succeed without a schema-mismatch error.
|
||||
///
|
||||
/// Previously `build_field_exprs` would attempt a `Utf8 → LargeBinary` DataFusion cast,
|
||||
/// which produced a field whose Arrow extension metadata still read `arrow.json` instead
|
||||
/// of `lance.json`. Lance-core then rejected the append with
|
||||
/// `"json vs large_binary" schema mismatch`.
|
||||
///
|
||||
/// PyArrow's `pa.json_()` may be backed by either `Utf8` or `LargeUtf8` depending on the
|
||||
/// constructor used, so the test is parameterized over the input backing type.
|
||||
#[rstest::rstest]
|
||||
#[case::utf8(DataType::Utf8)]
|
||||
#[case::large_utf8(DataType::LargeUtf8)]
|
||||
#[tokio::test]
|
||||
async fn test_add_arrow_json_into_lance_json_table(#[case] input_type: DataType) {
|
||||
use arrow_array::{Array, cast::AsArray};
|
||||
use lance_arrow::ARROW_EXT_NAME_KEY;
|
||||
use lance_arrow::json::{ARROW_JSON_EXT_NAME, JSON_EXT_NAME};
|
||||
|
||||
// Build a table whose "data" column is lance.json (LargeBinary +
|
||||
// ARROW:extension:name = "lance.json").
|
||||
let lance_json_field = lance_arrow::json::json_field("data", true);
|
||||
let table_schema = Arc::new(Schema::new(vec![lance_json_field]));
|
||||
|
||||
let db = connect("memory://").execute().await.unwrap();
|
||||
let table = db
|
||||
.create_empty_table("json_test", table_schema)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Sanity-check the stored schema.
|
||||
let stored_field = table.schema().await.unwrap();
|
||||
let data_field = stored_field.field_with_name("data").unwrap();
|
||||
assert_eq!(data_field.data_type(), &DataType::LargeBinary);
|
||||
assert_eq!(
|
||||
data_field
|
||||
.metadata()
|
||||
.get(ARROW_EXT_NAME_KEY)
|
||||
.map(|s| s.as_str()),
|
||||
Some(JSON_EXT_NAME),
|
||||
);
|
||||
|
||||
// Build an arrow.json input field (Utf8/LargeUtf8 + arrow.json extension).
|
||||
// This is what PyArrow produces for pa.json_() arrays.
|
||||
let arrow_json_metadata = std::collections::HashMap::from([(
|
||||
ARROW_EXT_NAME_KEY.to_string(),
|
||||
ARROW_JSON_EXT_NAME.to_string(),
|
||||
)]);
|
||||
let arrow_json_field =
|
||||
Field::new("data", input_type.clone(), true).with_metadata(arrow_json_metadata);
|
||||
let arrow_json_schema = Arc::new(Schema::new(vec![arrow_json_field]));
|
||||
|
||||
let rows: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)];
|
||||
let string_array: Arc<dyn arrow_array::Array> = match input_type {
|
||||
DataType::Utf8 => Arc::new(arrow_array::StringArray::from(rows.clone())),
|
||||
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(rows.clone())),
|
||||
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
|
||||
};
|
||||
let batch = RecordBatch::try_new(arrow_json_schema, vec![string_array]).unwrap();
|
||||
|
||||
// This must not fail with a schema-mismatch error.
|
||||
table.add(batch).execute().await.unwrap();
|
||||
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), rows.len());
|
||||
|
||||
// A lance.json column is read back as Utf8 carrying arrow.json extension metadata.
|
||||
let results: Vec<RecordBatch> = table
|
||||
.query()
|
||||
.select(Select::columns(&["data"]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(results.len(), 1);
|
||||
let batch = &results[0];
|
||||
assert_eq!(batch.num_rows(), rows.len());
|
||||
|
||||
let json_col = batch.column(0);
|
||||
assert_eq!(json_col.data_type(), &DataType::Utf8);
|
||||
let json_strs = json_col.as_string::<i32>();
|
||||
|
||||
for (i, expected) in rows.iter().enumerate() {
|
||||
match expected {
|
||||
None => assert!(json_strs.is_null(i), "row {i} expected null"),
|
||||
Some(raw) => {
|
||||
assert!(!json_strs.is_null(i), "row {i} expected non-null");
|
||||
let actual: serde_json::Value = serde_json::from_str(json_strs.value(i))
|
||||
.expect("read-back JSON should be valid");
|
||||
let expected: serde_json::Value =
|
||||
serde_json::from_str(raw).expect("expected JSON should be valid");
|
||||
assert_eq!(actual, expected, "row {i} JSON mismatch");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal};
|
||||
use datafusion_physical_plan::expressions::Column;
|
||||
use datafusion_physical_plan::projection::ProjectionExec;
|
||||
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
|
||||
use lance_arrow::json::{is_arrow_json_field, is_json_field};
|
||||
|
||||
use crate::{Error, Result};
|
||||
|
||||
@@ -65,18 +64,6 @@ fn build_field_exprs(
|
||||
let input_field = &input_fields[input_idx];
|
||||
let input_expr = get_input_expr(input_idx);
|
||||
|
||||
// Special case: input is arrow.json (PyArrow pa.json_() extension type backed by
|
||||
// Utf8/LargeUtf8) and the table field is lance.json (backed by LargeBinary).
|
||||
// Lance-core's write path already handles the arrow.json → lance.json conversion
|
||||
// (including JSONB encoding), so we pass the expression through unchanged and let
|
||||
// lance-core deal with it. Attempting to cast Utf8 → LargeBinary here would
|
||||
// produce a field whose metadata still identifies it as arrow.json, which then
|
||||
// causes a schema-mismatch error inside lance-core.
|
||||
if is_arrow_json_field(input_field) && is_json_field(table_field) {
|
||||
result.push((input_expr, Arc::clone(input_field) as FieldRef));
|
||||
continue;
|
||||
}
|
||||
|
||||
let expr = match (input_field.data_type(), table_field.data_type()) {
|
||||
// Both are structs: recurse into sub-fields to handle subschemas and casts.
|
||||
(DataType::Struct(in_children), DataType::Struct(tbl_children))
|
||||
@@ -631,75 +618,4 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(a.values(), &[1, 3]);
|
||||
}
|
||||
|
||||
/// `arrow.json` input (PyArrow `pa.json_()`, Utf8/LargeUtf8 + extension metadata) against a
|
||||
/// `lance.json` table field (LargeBinary + extension metadata) must be passed through
|
||||
/// without a cast so that lance-core can perform its own arrow.json → JSONB conversion.
|
||||
///
|
||||
/// Before the fix, `cast_to_table_schema` attempted a `Utf8 → LargeBinary` DataFusion
|
||||
/// cast that preserved the wrong extension metadata, causing lance-core to reject the
|
||||
/// batch with a "json vs large_binary" schema-mismatch error.
|
||||
#[rstest::rstest]
|
||||
#[case::utf8(DataType::Utf8)]
|
||||
#[case::large_utf8(DataType::LargeUtf8)]
|
||||
#[tokio::test]
|
||||
async fn test_arrow_json_passthrough_to_lance_json(#[case] input_type: DataType) {
|
||||
use lance_arrow::ARROW_EXT_NAME_KEY;
|
||||
use lance_arrow::json::{ARROW_JSON_EXT_NAME, json_field};
|
||||
|
||||
// Build a table schema with a lance.json field (LargeBinary + lance.json metadata).
|
||||
let lance_field = json_field("data", true);
|
||||
let table_schema = Schema::new(vec![lance_field]);
|
||||
|
||||
// Build an input batch with an arrow.json field (Utf8/LargeUtf8 + arrow.json metadata).
|
||||
let arrow_meta = std::collections::HashMap::from([(
|
||||
ARROW_EXT_NAME_KEY.to_string(),
|
||||
ARROW_JSON_EXT_NAME.to_string(),
|
||||
)]);
|
||||
let arrow_field = Field::new("data", input_type.clone(), true).with_metadata(arrow_meta);
|
||||
let input_schema = Arc::new(Schema::new(vec![arrow_field]));
|
||||
|
||||
let values = vec![Some(r#"{"x": 1}"#), None, Some(r#"{"y": 2}"#)];
|
||||
let input_array: Arc<dyn arrow_array::Array> = match input_type {
|
||||
DataType::Utf8 => Arc::new(StringArray::from(values)),
|
||||
DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(values)),
|
||||
other => panic!("unsupported arrow.json backing type for this test: {other:?}"),
|
||||
};
|
||||
let input_batch = RecordBatch::try_new(input_schema, vec![input_array]).unwrap();
|
||||
|
||||
let plan = plan_from_batch(input_batch).await;
|
||||
let projected = cast_to_table_schema(plan, &table_schema).unwrap();
|
||||
|
||||
// The projected schema's "data" field must carry arrow.json metadata
|
||||
// (the input field), not be silently dropped or miscast.
|
||||
let out_field = projected.schema().field_with_name("data").unwrap().clone();
|
||||
assert_eq!(out_field.data_type(), &input_type);
|
||||
assert_eq!(
|
||||
out_field
|
||||
.metadata()
|
||||
.get(ARROW_EXT_NAME_KEY)
|
||||
.map(|s| s.as_str()),
|
||||
Some(ARROW_JSON_EXT_NAME),
|
||||
"output field must still carry arrow.json metadata so lance-core can handle it"
|
||||
);
|
||||
|
||||
// The data must flow through correctly (3 rows, no panic).
|
||||
let result = collect(projected).await;
|
||||
assert_eq!(result.num_rows(), 3);
|
||||
let (v0, v2) = match input_type {
|
||||
DataType::Utf8 => {
|
||||
let col: &StringArray = result.column(0).as_any().downcast_ref().unwrap();
|
||||
(col.value(0).to_string(), col.value(2).to_string())
|
||||
}
|
||||
DataType::LargeUtf8 => {
|
||||
let col: &arrow_array::LargeStringArray =
|
||||
result.column(0).as_any().downcast_ref().unwrap();
|
||||
(col.value(0).to_string(), col.value(2).to_string())
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
assert_eq!(v0, r#"{"x": 1}"#);
|
||||
assert!(result.column(0).is_null(1));
|
||||
assert_eq!(v2, r#"{"y": 2}"#);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
use lance::dataset::DeleteBuilder;
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{NativeTable, Predicate};
|
||||
use super::NativeTable;
|
||||
use crate::Result;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
@@ -24,39 +21,17 @@ pub struct DeleteResult {
|
||||
/// Internal implementation of the delete logic
|
||||
///
|
||||
/// This logic was moved from NativeTable::delete to keep table.rs clean.
|
||||
pub(crate) async fn execute_delete(
|
||||
table: &NativeTable,
|
||||
predicate: Predicate<'_>,
|
||||
) -> Result<DeleteResult> {
|
||||
pub(crate) async fn execute_delete(table: &NativeTable, predicate: &str) -> Result<DeleteResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
match predicate {
|
||||
Predicate::String(s) => {
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let delete_result = dataset.delete(s).boxed().await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(DeleteResult {
|
||||
num_deleted_rows,
|
||||
version,
|
||||
})
|
||||
}
|
||||
Predicate::Expr(expr) => {
|
||||
let dataset = table.dataset.get().await?;
|
||||
let delete_result = DeleteBuilder::from_expr(Arc::clone(&dataset), expr.clone())
|
||||
.execute()
|
||||
.await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = delete_result.new_dataset.version().version;
|
||||
table.dataset.update(
|
||||
Arc::try_unwrap(delete_result.new_dataset).unwrap_or_else(|arc| (*arc).clone()),
|
||||
);
|
||||
Ok(DeleteResult {
|
||||
num_deleted_rows,
|
||||
version,
|
||||
})
|
||||
}
|
||||
}
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
let delete_result = dataset.delete(predicate).boxed().await?;
|
||||
let num_deleted_rows = delete_result.num_deleted_rows;
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(DeleteResult {
|
||||
num_deleted_rows,
|
||||
version,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -201,100 +176,4 @@ mod tests {
|
||||
"Table version must increment after delete operation"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_expr() {
|
||||
use datafusion_expr::{col, lit};
|
||||
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
// 1. Create a table with values 0 to 9
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Array::from_iter_values(0..10))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("test_delete_expr", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 2. Verify initial state
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 10);
|
||||
let initial_version = table.version().await.unwrap();
|
||||
|
||||
// 3. Execute Delete with Expr (removes values > 5)
|
||||
let expr = col("i").gt(lit(5));
|
||||
table.delete(&expr).await.unwrap();
|
||||
|
||||
// 4. Verify results
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 6); // 0, 1, 2, 3, 4, 5 remain
|
||||
let current_version = table.version().await.unwrap();
|
||||
assert!(
|
||||
current_version > initial_version,
|
||||
"Table version must increment after delete_expr operation"
|
||||
);
|
||||
|
||||
// 5. Verify specific data consistency
|
||||
let batches = table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.unwrap();
|
||||
let batch = &batches[0];
|
||||
let array = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.unwrap();
|
||||
|
||||
// Ensure no value > 5 exists
|
||||
for val in array.iter() {
|
||||
assert!(val.unwrap() <= 5);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_expr_increments_version() {
|
||||
use datafusion_expr::lit;
|
||||
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
|
||||
// Create a table with 5 rows
|
||||
let batch = record_batch!(("id", Int32, [1, 2, 3, 4, 5])).unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("test_delete_expr_noop", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Capture the initial state (Rows = 5, Version = 1)
|
||||
let initial_rows = table.count_rows(None).await.unwrap();
|
||||
let initial_version = table.version().await.unwrap();
|
||||
|
||||
assert_eq!(initial_rows, 5);
|
||||
let expr = lit(false);
|
||||
table.delete(&expr).await.unwrap();
|
||||
|
||||
// Rows should still be 5
|
||||
let current_rows = table.count_rows(None).await.unwrap();
|
||||
assert_eq!(
|
||||
current_rows, initial_rows,
|
||||
"Data should not change when predicate is false"
|
||||
);
|
||||
|
||||
// version check
|
||||
let current_version = table.version().await.unwrap();
|
||||
assert!(
|
||||
current_version > initial_version,
|
||||
"Table version must increment after delete_expr operation"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user