mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-05 05:10:41 +00:00
Compare commits
21 Commits
codex/upda
...
feat/check
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
331f03cd98 | ||
|
|
5696df2791 | ||
|
|
09518a3c1b | ||
|
|
59824ab438 | ||
|
|
88c48a1bf0 | ||
|
|
735a7ce6fe | ||
|
|
1ee490d125 | ||
|
|
08745dc1e1 | ||
|
|
2660f96475 | ||
|
|
d96ae4b986 | ||
|
|
38454969cd | ||
|
|
c13c3184cf | ||
|
|
a7a7350eb3 | ||
|
|
c3c2887c02 | ||
|
|
2ca6d41f17 | ||
|
|
341cb04c2f | ||
|
|
0d4cb346f9 | ||
|
|
379684391e | ||
|
|
d065be0474 | ||
|
|
7b874905fd | ||
|
|
a327044e2f |
7
.agents/skills/README.md
Normal file
7
.agents/skills/README.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Agent Skills
|
||||
|
||||
This directory contains repo-scoped code agent skills for the LanceDB project.
|
||||
|
||||
Each skill is a folder that contains a required `SKILL.md` and optional bundled resources.
|
||||
|
||||
Codex discovers skills from `.agents/skills` in the current working directory and parent directories.
|
||||
98
.agents/skills/lancedb-update-lance-dependency/SKILL.md
Normal file
98
.agents/skills/lancedb-update-lance-dependency/SKILL.md
Normal file
@@ -0,0 +1,98 @@
|
||||
---
|
||||
name: lancedb-update-lance-dependency
|
||||
description: Update LanceDB to a specific Lance release or tag. Use when bumping Lance dependencies in the lancedb repository, including Rust workspace Lance crates, Java lance-core, validation, branch creation, commit, push, and PR creation when requested.
|
||||
---
|
||||
|
||||
# LanceDB Update Lance Dependency
|
||||
|
||||
## Scope
|
||||
|
||||
Use this skill in the `lancedb/lancedb` repository when updating the Lance dependency to a specific Lance version or tag.
|
||||
|
||||
Inputs can be a version (`7.2.0-beta.1`), a tag (`v7.2.0-beta.1`), a tag ref (`refs/tags/v7.2.0-beta.1`), or `latest`.
|
||||
|
||||
## Workflow
|
||||
|
||||
1. Confirm the worktree status with `git status --short`.
|
||||
2. Resolve the target Lance version:
|
||||
|
||||
- If the input is `latest`, empty, or omitted, run:
|
||||
|
||||
```bash
|
||||
python3 ci/check_lance_release.py
|
||||
```
|
||||
|
||||
Parse the JSON output. If `needs_update` is not `true`, stop without creating a PR. Otherwise use `latest_tag`.
|
||||
|
||||
- If the input is explicit, use it directly.
|
||||
|
||||
3. Compute update metadata without changing files:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION" --metadata-only
|
||||
```
|
||||
|
||||
Before making changes, check for an existing open PR with the emitted `pr_title`:
|
||||
|
||||
```bash
|
||||
gh pr list --search "\"$PR_TITLE\" in:title" --state open --limit 1 --json number,url,title
|
||||
```
|
||||
|
||||
If a matching open PR exists, stop and report it instead of creating a duplicate.
|
||||
|
||||
4. Run the deterministic update entrypoint:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION"
|
||||
```
|
||||
|
||||
This updates the Rust workspace Lance dependencies through `ci/set_lance_version.py`, updates `java/pom.xml`, refreshes Cargo metadata, and prints JSON metadata containing `branch_name`, `commit_message`, and `pr_title`.
|
||||
|
||||
5. Run validation:
|
||||
|
||||
```bash
|
||||
cargo clippy --quiet --workspace --tests --all-features -- -D warnings
|
||||
cargo fmt --all --quiet
|
||||
```
|
||||
|
||||
Fix real diagnostics and rerun clippy until it succeeds. Do not skip warnings.
|
||||
|
||||
6. Inspect `git status --short` and `git diff` to ensure only the Lance dependency update and required compatibility fixes are present.
|
||||
|
||||
7. If the task only asks to prepare local changes, stop here and report the changed files and validation result.
|
||||
|
||||
8. If the task asks to publish the update, create a branch using the printed `branch_name`, stage all relevant files, and commit using the printed `commit_message`. Do not amend or rewrite existing commits.
|
||||
|
||||
9. Push to `origin`. Before creating the PR, check that the current token has push permission:
|
||||
|
||||
```bash
|
||||
gh api repos/lancedb/lancedb --jq .permissions.push
|
||||
```
|
||||
|
||||
If the remote branch already exists for the same generated branch name, delete the remote ref with `gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/$BRANCH_NAME`, then push. Do not force-push.
|
||||
|
||||
10. Create a PR targeting `main` with the printed `pr_title`. If there is no PR template, keep the body to two or three concise sentences: state the Lance dependency bump, note any required compatibility fixes, and link the triggering Lance tag or release.
|
||||
|
||||
11. Read back the remote PR title after creation. If it is not a Conventional Commit title, fix it immediately.
|
||||
|
||||
12. When running in GitHub Actions after creating the LanceDB PR, trigger the Sophon dependency update:
|
||||
|
||||
```bash
|
||||
gh workflow run codex-bump-lancedb-lance.yml \
|
||||
--repo lancedb/sophon \
|
||||
-f lance_ref="$LANCE_TAG" \
|
||||
-f lancedb_ref="$BRANCH_NAME"
|
||||
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
|
||||
```
|
||||
|
||||
Use the emitted metadata `tag` value as `LANCE_TAG`. Do this only after a new LanceDB PR has been created. If the update was skipped because no update is needed or an open PR already exists, do not trigger Sophon.
|
||||
|
||||
## GitHub Actions
|
||||
|
||||
When this skill is used from GitHub Actions, `TAG`, `GH_TOKEN`, and `GITHUB_TOKEN` may already be set. Resolve `latest` first when `TAG` is empty. Once an explicit tag or version is known, use:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG" --github-output "$GITHUB_OUTPUT"
|
||||
```
|
||||
|
||||
Then use the emitted `branch_name`, `commit_message`, and `pr_title` values for branch, commit, and PR creation.
|
||||
@@ -4,14 +4,16 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
tag:
|
||||
description: "Tag name from Lance"
|
||||
required: true
|
||||
description: "Tag name from Lance. If omitted, the skill will use the latest Lance release that needs an update."
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
tag:
|
||||
description: "Tag name from Lance"
|
||||
required: true
|
||||
description: "Tag name from Lance. Leave empty to use the latest Lance release that needs an update."
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
@@ -25,7 +27,7 @@ jobs:
|
||||
steps:
|
||||
- name: Show inputs
|
||||
run: |
|
||||
echo "tag = ${{ inputs.tag }}"
|
||||
echo "tag = ${{ inputs.tag || 'latest' }}"
|
||||
|
||||
- name: Checkout Repo LanceDB
|
||||
uses: actions/checkout@v4
|
||||
@@ -71,65 +73,21 @@ jobs:
|
||||
OPENAI_API_KEY: ${{ secrets.CODEX_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
VERSION="${TAG#refs/tags/}"
|
||||
VERSION="${VERSION#v}"
|
||||
BRANCH_NAME="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
|
||||
|
||||
# Use "chore" for beta/rc versions, "feat" for stable releases
|
||||
if [[ "${VERSION}" == *beta* ]] || [[ "${VERSION}" == *rc* ]]; then
|
||||
COMMIT_TYPE="chore"
|
||||
else
|
||||
COMMIT_TYPE="feat"
|
||||
fi
|
||||
TARGET_TAG="${TAG:-latest}"
|
||||
|
||||
cat <<EOF >/tmp/codex-prompt.txt
|
||||
You are running inside the lancedb repository on a GitHub Actions runner. Update the Lance dependency to version ${VERSION} and prepare a pull request for maintainers to review.
|
||||
You are running inside the lancedb repository on a GitHub Actions runner.
|
||||
|
||||
Follow these steps exactly:
|
||||
1. Use script "ci/set_lance_version.py" to update Lance Rust dependencies. The script already refreshes Cargo metadata, so allow it to finish even if it takes time.
|
||||
2. Update the Java lance-core dependency version in "java/pom.xml": change the "<lance-core.version>...</lance-core.version>" property to "${VERSION}".
|
||||
3. Run "cargo clippy --workspace --tests --all-features -- -D warnings". If diagnostics appear, fix them yourself and rerun clippy until it exits cleanly. Do not skip any warnings.
|
||||
4. After clippy succeeds, run "cargo fmt --all" to format the workspace.
|
||||
5. Ensure the repository is clean except for intentional changes. Inspect "git status --short" and "git diff" to confirm the dependency update and any required fixes.
|
||||
6. Create and switch to a new branch named "${BRANCH_NAME}" (replace any duplicated hyphens if necessary).
|
||||
7. Stage all relevant files with "git add -A". Commit using the message "${COMMIT_TYPE}: update lance dependency to v${VERSION}".
|
||||
8. Push the branch to origin. If the remote branch already exists, delete it first with "gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/${BRANCH_NAME}" then push with "git push origin ${BRANCH_NAME}". Do NOT use "git push --force" or "git push -f".
|
||||
9. env "GH_TOKEN" is available, use "gh" tools for github related operations like creating pull request.
|
||||
10. Create a pull request targeting "main" with title "${COMMIT_TYPE}: update lance dependency to v${VERSION}". First, write the PR body to /tmp/pr-body.md using a heredoc (cat <<'EOF' > /tmp/pr-body.md). The body should summarize the dependency bump, clippy/fmt verification, and link the triggering tag (${TAG}). Then run "gh pr create --body-file /tmp/pr-body.md".
|
||||
11. After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
|
||||
Use \$lancedb-update-lance-dependency with target "${TARGET_TAG}".
|
||||
|
||||
Constraints:
|
||||
- Use bash commands; avoid modifying GitHub workflow files other than through the scripted task above.
|
||||
- Do not merge the PR.
|
||||
- If any command fails, diagnose and fix the issue instead of aborting.
|
||||
- Use env "GH_TOKEN" for GitHub operations.
|
||||
- Do not merge the pull request.
|
||||
- Do not force-push.
|
||||
- Do not create a duplicate pull request if an open PR already exists for the target Lance version.
|
||||
- If any command fails, diagnose and fix the root cause instead of aborting.
|
||||
- After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
|
||||
EOF
|
||||
|
||||
printenv OPENAI_API_KEY | codex login --with-api-key
|
||||
codex --config shell_environment_policy.ignore_default_excludes=true exec --dangerously-bypass-approvals-and-sandbox "$(cat /tmp/codex-prompt.txt)"
|
||||
|
||||
- name: Trigger sophon dependency update
|
||||
env:
|
||||
TAG: ${{ inputs.tag }}
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
VERSION="${TAG#refs/tags/}"
|
||||
VERSION="${VERSION#v}"
|
||||
LANCEDB_BRANCH="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
|
||||
|
||||
echo "Triggering sophon workflow with:"
|
||||
echo " lance_ref: ${TAG#refs/tags/}"
|
||||
echo " lancedb_ref: ${LANCEDB_BRANCH}"
|
||||
|
||||
gh workflow run codex-bump-lancedb-lance.yml \
|
||||
--repo lancedb/sophon \
|
||||
-f lance_ref="${TAG#refs/tags/}" \
|
||||
-f lancedb_ref="${LANCEDB_BRANCH}"
|
||||
|
||||
- name: Show latest sophon workflow run
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "Latest sophon workflow run:"
|
||||
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
|
||||
|
||||
62
.github/workflows/lance-release-timer.yml
vendored
62
.github/workflows/lance-release-timer.yml
vendored
@@ -1,62 +0,0 @@
|
||||
name: Lance Release Timer
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "*/10 * * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
actions: write
|
||||
|
||||
concurrency:
|
||||
group: lance-release-timer
|
||||
cancel-in-progress: false
|
||||
|
||||
jobs:
|
||||
trigger-update:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Check for new Lance tag
|
||||
id: check
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
python3 ci/check_lance_release.py --github-output "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Look for existing PR
|
||||
if: steps.check.outputs.needs_update == 'true'
|
||||
id: pr
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TITLE="chore: update lance dependency to v${{ steps.check.outputs.latest_version }}"
|
||||
COUNT=$(gh pr list --search "\"$TITLE\" in:title" --state open --limit 1 --json number --jq 'length')
|
||||
if [ "$COUNT" -gt 0 ]; then
|
||||
echo "Open PR already exists for $TITLE"
|
||||
echo "pr_exists=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "No existing PR for $TITLE"
|
||||
echo "pr_exists=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Trigger codex update workflow
|
||||
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TAG=${{ steps.check.outputs.latest_tag }}
|
||||
gh workflow run codex-update-lance-dependency.yml -f tag=refs/tags/$TAG
|
||||
|
||||
- name: Show latest codex workflow run
|
||||
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
gh run list --workflow codex-update-lance-dependency.yml --limit 1 --json databaseId,url,displayTitle
|
||||
78
Cargo.lock
generated
78
Cargo.lock
generated
@@ -3306,8 +3306,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.4",
|
||||
@@ -4574,8 +4574,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4648,8 +4648,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4670,7 +4670,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "lance-arrow-scalar"
|
||||
version = "58.0.0"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4684,7 +4684,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "lance-arrow-stats"
|
||||
version = "58.0.0"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -4693,8 +4693,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4703,8 +4703,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4739,8 +4739,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4770,8 +4770,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4789,8 +4789,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4825,8 +4825,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4857,8 +4857,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4924,8 +4924,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4967,8 +4967,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4984,8 +4984,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -4997,8 +4997,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -5047,25 +5047,23 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-select"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"arrow-schema",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"deepsize",
|
||||
"itertools 0.13.0",
|
||||
"lance-core",
|
||||
"roaring",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -5105,8 +5103,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -5117,8 +5115,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-tokenizer"
|
||||
version = "7.2.0-beta.4"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.4#c3126cd11121158927102afc764875fc1dedc2c5"
|
||||
version = "7.2.0-beta.3"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.2.0-beta.3#7c070f760fa8e24c8015cb2afbd22c5e6b7898e8"
|
||||
dependencies = [
|
||||
"icu_segmenter",
|
||||
"jieba-rs",
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.2.0-beta.4", default-features = false, "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.2.0-beta.4", default-features = false, "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.2.0-beta.4", default-features = false, "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.2.0-beta.4", "tag" = "v7.2.0-beta.4", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
126
ci/update_lance_dependency.py
Normal file
126
ci/update_lance_dependency.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Prepare a Lance dependency update for LanceDB."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
|
||||
try:
|
||||
from check_lance_release import parse_semver
|
||||
except ModuleNotFoundError:
|
||||
# Supports importing as ci.update_lance_dependency from tests or ad hoc checks.
|
||||
from ci.check_lance_release import parse_semver # type: ignore
|
||||
|
||||
|
||||
def normalize_version(raw: str) -> str:
|
||||
value = raw.strip()
|
||||
value = value.removeprefix("refs/tags/")
|
||||
value = value.removeprefix("v")
|
||||
try:
|
||||
parse_semver(value)
|
||||
except ValueError:
|
||||
raise ValueError(f"Unsupported Lance version or tag: {raw}")
|
||||
return value
|
||||
|
||||
|
||||
def normalized_tag(version: str) -> str:
|
||||
return f"v{version}"
|
||||
|
||||
|
||||
def branch_name(version: str) -> str:
|
||||
suffix = re.sub(r"[^a-zA-Z0-9]+", "-", version).strip("-")
|
||||
suffix = re.sub(r"-+", "-", suffix)
|
||||
return f"codex/update-lance-{suffix}"
|
||||
|
||||
|
||||
def commit_type(version: str) -> str:
|
||||
prerelease = version.split("-", maxsplit=1)[1] if "-" in version else ""
|
||||
return "chore" if "beta" in prerelease or "rc" in prerelease else "feat"
|
||||
|
||||
|
||||
def metadata_for(version: str) -> dict[str, str]:
|
||||
kind = commit_type(version)
|
||||
message = f"{kind}: update lance dependency to v{version}"
|
||||
return {
|
||||
"version": version,
|
||||
"tag": normalized_tag(version),
|
||||
"branch_name": branch_name(version),
|
||||
"commit_type": kind,
|
||||
"commit_message": message,
|
||||
"pr_title": message,
|
||||
}
|
||||
|
||||
|
||||
def run_command(cmd: Sequence[str], *, cwd: Path) -> None:
|
||||
subprocess.run(cmd, cwd=cwd, check=True)
|
||||
|
||||
|
||||
def update_java_lance_core_version(repo_root: Path, version: str) -> None:
|
||||
pom_path = repo_root / "java" / "pom.xml"
|
||||
contents = pom_path.read_text(encoding="utf-8")
|
||||
updated, count = re.subn(
|
||||
r"(<lance-core\.version>)[^<]+(</lance-core\.version>)",
|
||||
rf"\g<1>{version}\g<2>",
|
||||
contents,
|
||||
count=1,
|
||||
)
|
||||
if count != 1:
|
||||
raise RuntimeError(
|
||||
"Expected exactly one <lance-core.version> entry in java/pom.xml"
|
||||
)
|
||||
pom_path.write_text(updated, encoding="utf-8")
|
||||
|
||||
|
||||
def write_github_outputs(path: str | None, payload: dict[str, str]) -> None:
|
||||
if not path:
|
||||
return
|
||||
with open(path, "a", encoding="utf-8") as output:
|
||||
for key, value in payload.items():
|
||||
output.write(f"{key}={value}\n")
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"tag_or_version",
|
||||
help="Lance tag or version, for example refs/tags/v7.2.0-beta.1 or 7.2.0",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--repo-root",
|
||||
type=Path,
|
||||
default=Path(__file__).resolve().parents[1],
|
||||
help="Path to the lancedb repository root",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--github-output",
|
||||
default=None,
|
||||
help="Optional GitHub Actions output file to receive metadata fields",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--metadata-only",
|
||||
action="store_true",
|
||||
help="Only print derived metadata; do not modify dependency files",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
repo_root = args.repo_root.resolve()
|
||||
version = normalize_version(args.tag_or_version)
|
||||
payload = metadata_for(version)
|
||||
|
||||
if not args.metadata_only:
|
||||
run_command([sys.executable, "ci/set_lance_version.py", version], cwd=repo_root)
|
||||
update_java_lance_core_version(repo_root, version)
|
||||
|
||||
write_github_outputs(args.github_output, payload)
|
||||
print(json.dumps(payload, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
43
docs/src/js/classes/BranchContents.md
Normal file
43
docs/src/js/classes/BranchContents.md
Normal file
@@ -0,0 +1,43 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / BranchContents
|
||||
|
||||
# Class: BranchContents
|
||||
|
||||
## Constructors
|
||||
|
||||
### new BranchContents()
|
||||
|
||||
```ts
|
||||
new BranchContents(): BranchContents
|
||||
```
|
||||
|
||||
#### Returns
|
||||
|
||||
[`BranchContents`](BranchContents.md)
|
||||
|
||||
## Properties
|
||||
|
||||
### manifestSize
|
||||
|
||||
```ts
|
||||
manifestSize: number;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### parentBranch?
|
||||
|
||||
```ts
|
||||
optional parentBranch: string;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### parentVersion
|
||||
|
||||
```ts
|
||||
parentVersion: number;
|
||||
```
|
||||
96
docs/src/js/classes/Branches.md
Normal file
96
docs/src/js/classes/Branches.md
Normal file
@@ -0,0 +1,96 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / Branches
|
||||
|
||||
# Class: Branches
|
||||
|
||||
Branch manager for a [Table](Table.md).
|
||||
|
||||
Unlike tags, `create` and `checkout` return a new [Table](Table.md) handle scoped
|
||||
to the branch; writes on it do not affect `main`.
|
||||
|
||||
## Methods
|
||||
|
||||
### checkout()
|
||||
|
||||
```ts
|
||||
checkout(name, version?): Promise<Table>
|
||||
```
|
||||
|
||||
Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
With `version` set, the returned handle is pinned to that version of the
|
||||
branch (a read-only, detached view); otherwise it tracks the branch's
|
||||
latest and stays writable.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
|
||||
* **version?**: `number`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Table`](Table.md)>
|
||||
|
||||
***
|
||||
|
||||
### create()
|
||||
|
||||
```ts
|
||||
create(
|
||||
name,
|
||||
fromRef?,
|
||||
fromVersion?): Promise<Table>
|
||||
```
|
||||
|
||||
Create a branch and return a handle scoped to it.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
Name of the new branch.
|
||||
|
||||
* **fromRef?**: `string`
|
||||
Source branch to fork from. Defaults to `main`.
|
||||
|
||||
* **fromVersion?**: `number`
|
||||
A specific version on `fromRef`. Defaults to latest.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Table`](Table.md)>
|
||||
|
||||
***
|
||||
|
||||
### delete()
|
||||
|
||||
```ts
|
||||
delete(name): Promise<void>
|
||||
```
|
||||
|
||||
Delete a branch.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **name**: `string`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### list()
|
||||
|
||||
```ts
|
||||
list(): Promise<Record<string, BranchContents>>
|
||||
```
|
||||
|
||||
List all branches, mapping name to branch metadata.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`Record`<`string`, [`BranchContents`](BranchContents.md)>>
|
||||
@@ -110,6 +110,23 @@ containing the new version number of the table after altering the columns.
|
||||
|
||||
***
|
||||
|
||||
### branches()
|
||||
|
||||
```ts
|
||||
abstract branches(): Promise<Branches>
|
||||
```
|
||||
|
||||
Get the branch manager for this table.
|
||||
|
||||
Branches are isolated, writable lines of history forked from another
|
||||
branch (or version). Writes on a branch do not affect `main`.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Branches`](Branches.md)>
|
||||
|
||||
***
|
||||
|
||||
### checkout()
|
||||
|
||||
```ts
|
||||
@@ -994,6 +1011,29 @@ based on the row being updated (e.g. "my_col + 1")
|
||||
|
||||
***
|
||||
|
||||
### updateFieldMetadata()
|
||||
|
||||
```ts
|
||||
abstract updateFieldMetadata(updates): Promise<UpdateFieldMetadataResult>
|
||||
```
|
||||
|
||||
Update per-field (column) metadata.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **updates**: [`FieldMetadataUpdate`](../interfaces/FieldMetadataUpdate.md)[]
|
||||
One or more per-field updates. Each
|
||||
update's metadata is merged into the field's existing metadata by default;
|
||||
a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`UpdateFieldMetadataResult`](../interfaces/UpdateFieldMetadataResult.md)>
|
||||
|
||||
resolves to the new table version.
|
||||
|
||||
***
|
||||
|
||||
### vectorSearch()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
|
||||
- [BooleanQuery](classes/BooleanQuery.md)
|
||||
- [BoostQuery](classes/BoostQuery.md)
|
||||
- [BranchContents](classes/BranchContents.md)
|
||||
- [Branches](classes/Branches.md)
|
||||
- [Connection](classes/Connection.md)
|
||||
- [HeaderProvider](classes/HeaderProvider.md)
|
||||
- [Index](classes/Index.md)
|
||||
@@ -65,6 +67,7 @@
|
||||
- [DropNamespaceOptions](interfaces/DropNamespaceOptions.md)
|
||||
- [DropNamespaceResponse](interfaces/DropNamespaceResponse.md)
|
||||
- [ExecutableQuery](interfaces/ExecutableQuery.md)
|
||||
- [FieldMetadataUpdate](interfaces/FieldMetadataUpdate.md)
|
||||
- [FragmentStatistics](interfaces/FragmentStatistics.md)
|
||||
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
|
||||
- [FtsOptions](interfaces/FtsOptions.md)
|
||||
@@ -101,6 +104,7 @@
|
||||
- [TimeoutConfig](interfaces/TimeoutConfig.md)
|
||||
- [TlsConfig](interfaces/TlsConfig.md)
|
||||
- [TokenResponse](interfaces/TokenResponse.md)
|
||||
- [UpdateFieldMetadataResult](interfaces/UpdateFieldMetadataResult.md)
|
||||
- [UpdateOptions](interfaces/UpdateOptions.md)
|
||||
- [UpdateResult](interfaces/UpdateResult.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
|
||||
41
docs/src/js/interfaces/FieldMetadataUpdate.md
Normal file
41
docs/src/js/interfaces/FieldMetadataUpdate.md
Normal file
@@ -0,0 +1,41 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FieldMetadataUpdate
|
||||
|
||||
# Interface: FieldMetadataUpdate
|
||||
|
||||
A per-field metadata update, addressed by dot-path.
|
||||
|
||||
## Properties
|
||||
|
||||
### metadata
|
||||
|
||||
```ts
|
||||
metadata: Record<string, null | string>;
|
||||
```
|
||||
|
||||
Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
default; a value of `null` deletes that key.
|
||||
|
||||
***
|
||||
|
||||
### path
|
||||
|
||||
```ts
|
||||
path: string;
|
||||
```
|
||||
|
||||
Dot-separated path to the field. For a top-level column this is just its
|
||||
name; for a nested field it's the path, e.g. "a.b.c".
|
||||
|
||||
***
|
||||
|
||||
### replace?
|
||||
|
||||
```ts
|
||||
optional replace: boolean;
|
||||
```
|
||||
|
||||
If true, replace the field's entire metadata map instead of merging.
|
||||
@@ -8,6 +8,18 @@
|
||||
|
||||
## Properties
|
||||
|
||||
### branch?
|
||||
|
||||
```ts
|
||||
optional branch: string;
|
||||
```
|
||||
|
||||
Open the table scoped to this branch instead of the default branch.
|
||||
|
||||
Reads and writes on the returned table operate in the branch's context.
|
||||
|
||||
***
|
||||
|
||||
### ~~indexCacheSize?~~
|
||||
|
||||
```ts
|
||||
@@ -43,3 +55,17 @@ Options already set on the connection will be inherited by the table,
|
||||
but can be overridden here.
|
||||
|
||||
The available options are described at https://docs.lancedb.com/storage/
|
||||
|
||||
***
|
||||
|
||||
### version?
|
||||
|
||||
```ts
|
||||
optional version: number;
|
||||
```
|
||||
|
||||
Open the table pinned to this version, producing a read-only view.
|
||||
|
||||
Composes with [OpenTableOptions.branch](OpenTableOptions.md#branch): when both are set, opens
|
||||
that branch at the version; otherwise opens `main` at the version. Call
|
||||
`checkoutLatest` to return to a writable state.
|
||||
|
||||
15
docs/src/js/interfaces/UpdateFieldMetadataResult.md
Normal file
15
docs/src/js/interfaces/UpdateFieldMetadataResult.md
Normal file
@@ -0,0 +1,15 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / UpdateFieldMetadataResult
|
||||
|
||||
# Interface: UpdateFieldMetadataResult
|
||||
|
||||
## Properties
|
||||
|
||||
### version
|
||||
|
||||
```ts
|
||||
version: number;
|
||||
```
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.2.0-beta.4</lance-core.version>
|
||||
<lance-core.version>7.2.0-beta.1</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -191,6 +191,34 @@ describe("remote connection", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("allows version on remote but rejects a non-main branch", async () => {
|
||||
await withMockDatabase(
|
||||
(_req, res) => {
|
||||
// describe (table open + version validation) always succeeds
|
||||
const body = JSON.stringify({
|
||||
name: "t",
|
||||
version: 2,
|
||||
schema: { fields: [] },
|
||||
});
|
||||
res.writeHead(200, { "Content-Type": "application/json" }).end(body);
|
||||
},
|
||||
async (db) => {
|
||||
// version-only (and "main" + version) is allowed: remote supports
|
||||
// version time-travel even though it has no branches
|
||||
await db.openTable("t", undefined, { version: 2 });
|
||||
await db.openTable("t", undefined, { branch: "main", version: 2 });
|
||||
|
||||
// a non-main branch is rejected, with or without a version
|
||||
await expect(
|
||||
db.openTable("t", undefined, { branch: "exp" }),
|
||||
).rejects.toThrow(/branching/);
|
||||
await expect(
|
||||
db.openTable("t", undefined, { branch: "exp", version: 2 }),
|
||||
).rejects.toThrow(/branching/);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe("TlsConfig", () => {
|
||||
it("should create TlsConfig with all fields", () => {
|
||||
const tlsConfig: TlsConfig = {
|
||||
|
||||
@@ -85,6 +85,136 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
await expect(table.countRows()).resolves.toBe(3);
|
||||
});
|
||||
|
||||
it("should support branches", async () => {
|
||||
await table.add([{ id: 1 }]);
|
||||
expect(await table.countRows()).toBe(1);
|
||||
|
||||
// fork an isolated, writable branch from main
|
||||
const branch = await (await table.branches()).create("exp");
|
||||
expect(await branch.countRows()).toBe(1);
|
||||
await branch.add([{ id: 2 }]);
|
||||
expect(await branch.countRows()).toBe(2);
|
||||
// main is untouched by branch writes
|
||||
expect(await table.countRows()).toBe(1);
|
||||
|
||||
// listed, with main (null) as the parent
|
||||
const list = await (await table.branches()).list();
|
||||
expect(Object.keys(list)).toContain("exp");
|
||||
expect(list["exp"].parentBranch).toBeNull();
|
||||
|
||||
// fromRef="main" is equivalent to the default
|
||||
await (await table.branches()).create("exp2", "main");
|
||||
const list2 = await (await table.branches()).list();
|
||||
expect(list2["exp2"].parentBranch).toBeNull();
|
||||
|
||||
// checkout returns a handle scoped to the branch's latest
|
||||
const checkedOut = await (await table.branches()).checkout("exp");
|
||||
expect(await checkedOut.countRows()).toBe(2);
|
||||
|
||||
// delete removes it
|
||||
await (await table.branches()).delete("exp");
|
||||
await (await table.branches()).delete("exp2");
|
||||
const after = await (await table.branches()).list();
|
||||
expect(Object.keys(after)).not.toContain("exp");
|
||||
});
|
||||
|
||||
it("should open a branch via open_table", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
await table.add([{ id: 1 }]);
|
||||
const branch = await (await table.branches()).create("exp");
|
||||
await branch.add([{ id: 2 }]);
|
||||
|
||||
// open_table(..., { branch }) returns a handle scoped to the branch
|
||||
const opened = await db.openTable("some_table", undefined, {
|
||||
branch: "exp",
|
||||
});
|
||||
expect(await opened.countRows()).toBe(2);
|
||||
// opening without branch still tracks main
|
||||
expect(await (await db.openTable("some_table")).countRows()).toBe(1);
|
||||
});
|
||||
|
||||
it("should open a branch at a version isolated from main and HEAD", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
// main: a single fork-point row
|
||||
const t = await db.createTable("bv_table", [{ id: 0 }]);
|
||||
const mainV1 = await t.version();
|
||||
|
||||
// fork "exp", then advance exp AND main independently past the fork so
|
||||
// they diverge while sharing version numbers
|
||||
const exp = await (await t.branches()).create("exp");
|
||||
await exp.add([{ id: 1 }]); // exp: {0, 1}
|
||||
const expV2 = await exp.version();
|
||||
await exp.add([{ id: 2 }]); // exp HEAD: {0, 1, 2}
|
||||
await t.add([{ id: 100 }, { id: 101 }, { id: 102 }]); // main HEAD: {0,100,101,102}
|
||||
expect(await t.version()).toBe(expV2);
|
||||
|
||||
// open exp at the shared version: the data must be exp's, not main's.
|
||||
// count alone cannot prove this (main@v2 also exists), so assert
|
||||
// provenance by content.
|
||||
const pinned = await db.openTable("bv_table", undefined, {
|
||||
branch: "exp",
|
||||
version: expV2,
|
||||
});
|
||||
expect(await pinned.countRows()).toBe(2); // not exp HEAD (3), not main@v2 (4)
|
||||
expect(await pinned.countRows("id = 1")).toBe(1); // exp's post-fork row
|
||||
expect(await pinned.countRows("id = 100")).toBe(0); // main's rows invisible
|
||||
|
||||
// the same coordinate is reachable directly via branches().checkout(name, version)
|
||||
const pinnedDirect = await (await t.branches()).checkout("exp", expV2);
|
||||
expect(await pinnedDirect.countRows()).toBe(2);
|
||||
|
||||
// the HEADs are unaffected
|
||||
expect(
|
||||
await (
|
||||
await db.openTable("bv_table", undefined, { branch: "exp" })
|
||||
).countRows(),
|
||||
).toBe(3);
|
||||
expect(await (await db.openTable("bv_table")).countRows()).toBe(4);
|
||||
|
||||
// version-only (no branch) time-travels main itself: its fork-point
|
||||
// version holds only main's first row, and the shared version number
|
||||
// resolves to main's data, not the branch's ("opens main at the version")
|
||||
const oldMain = await db.openTable("bv_table", undefined, {
|
||||
version: mainV1,
|
||||
});
|
||||
expect(await oldMain.countRows()).toBe(1);
|
||||
const sharedOnMain = await db.openTable("bv_table", undefined, {
|
||||
version: expV2,
|
||||
});
|
||||
expect(await sharedOnMain.countRows()).toBe(4); // main@v2, not exp@v2 (2)
|
||||
|
||||
// detached head: writing to a pinned version is rejected
|
||||
await expect(pinned.add([{ id: 9 }])).rejects.toThrow(
|
||||
/cannot be modified/,
|
||||
);
|
||||
|
||||
// a nonexistent version is rejected -- on main, and on a branch (a
|
||||
// distinct resolution path, on the branch's manifests)
|
||||
await expect(
|
||||
db.openTable("bv_table", undefined, { version: 9999 }),
|
||||
).rejects.toThrow();
|
||||
await expect(
|
||||
db.openTable("bv_table", undefined, { branch: "exp", version: 9999 }),
|
||||
).rejects.toThrow();
|
||||
|
||||
// checkoutLatest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
// (writable again), not main's HEAD (4), and not staying pinned (2)
|
||||
await pinned.checkoutLatest();
|
||||
expect(await pinned.countRows()).toBe(3); // exp HEAD
|
||||
await pinned.add([{ id: 3 }]);
|
||||
expect(await pinned.countRows()).toBe(4); // writable again
|
||||
});
|
||||
|
||||
it("rejects invalid branch inputs", async () => {
|
||||
const branches = await table.branches();
|
||||
await expect(branches.create("")).rejects.toThrow("non-empty");
|
||||
await expect(branches.checkout("")).rejects.toThrow("non-empty");
|
||||
await expect(branches.delete("")).rejects.toThrow("non-empty");
|
||||
await expect(branches.create("bad", "main", -1)).rejects.toThrow(
|
||||
"non-negative",
|
||||
);
|
||||
});
|
||||
|
||||
it("should show table stats", async () => {
|
||||
await table.add([{ id: 1 }, { id: 2 }]);
|
||||
await table.add([{ id: 1 }]);
|
||||
@@ -1571,6 +1701,33 @@ describe("schema evolution", function () {
|
||||
expect(await table.schema()).toEqual(expectedSchema3);
|
||||
});
|
||||
|
||||
it("can update field metadata", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
const table = await con.createTable("fm", [
|
||||
{ id: 1, category: "a" },
|
||||
{ id: 2, category: "b" },
|
||||
]);
|
||||
|
||||
const res = await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { unit: "label", pii: "false" } },
|
||||
]);
|
||||
expect(res).toHaveProperty("version");
|
||||
expect(res.version).toBe(2);
|
||||
|
||||
let cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label");
|
||||
expect(cat?.metadata.get("pii")).toBe("false");
|
||||
|
||||
// merge: add a key, delete one via null, keep the rest
|
||||
await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { source: "import", pii: null } },
|
||||
]);
|
||||
cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label"); // preserved
|
||||
expect(cat?.metadata.get("source")).toBe("import"); // added
|
||||
expect(cat?.metadata.has("pii")).toBe(false); // deleted
|
||||
});
|
||||
|
||||
it("can cast to various types", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
|
||||
|
||||
@@ -84,6 +84,20 @@ export interface CreateTableOptions {
|
||||
}
|
||||
|
||||
export interface OpenTableOptions {
|
||||
/**
|
||||
* Open the table scoped to this branch instead of the default branch.
|
||||
*
|
||||
* Reads and writes on the returned table operate in the branch's context.
|
||||
*/
|
||||
branch?: string;
|
||||
/**
|
||||
* Open the table pinned to this version, producing a read-only view.
|
||||
*
|
||||
* Composes with {@link OpenTableOptions.branch}: when both are set, opens
|
||||
* that branch at the version; otherwise opens `main` at the version. Call
|
||||
* `checkoutLatest` to return to a writable state.
|
||||
*/
|
||||
version?: number;
|
||||
/**
|
||||
* Configuration for object storage.
|
||||
*
|
||||
@@ -483,7 +497,20 @@ export class LocalConnection extends Connection {
|
||||
options?.indexCacheSize,
|
||||
);
|
||||
|
||||
return new LocalTable(innerTable);
|
||||
let table: Table = new LocalTable(innerTable);
|
||||
// "main" is the default branch, so treat it as no branch. On a real branch,
|
||||
// scope and pin in one step (yielding "version V of branch B"); otherwise
|
||||
// pin the version, if any, against main.
|
||||
const branch =
|
||||
options?.branch != null && options.branch !== "main"
|
||||
? options.branch
|
||||
: undefined;
|
||||
if (branch != null) {
|
||||
table = await (await table.branches()).checkout(branch, options?.version);
|
||||
} else if (options?.version != null) {
|
||||
await table.checkout(options.version);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
async cloneTable(
|
||||
|
||||
@@ -38,10 +38,12 @@ export {
|
||||
FragmentSummaryStats,
|
||||
Tags,
|
||||
TagContents,
|
||||
BranchContents,
|
||||
MergeResult,
|
||||
AddResult,
|
||||
AddColumnsResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
UpdateResult,
|
||||
@@ -110,6 +112,7 @@ export {
|
||||
|
||||
export {
|
||||
Table,
|
||||
Branches,
|
||||
AddDataOptions,
|
||||
UpdateOptions,
|
||||
OptimizeOptions,
|
||||
@@ -117,6 +120,7 @@ export {
|
||||
WriteProgress,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
FieldMetadataUpdate,
|
||||
} from "./table";
|
||||
|
||||
export {
|
||||
|
||||
@@ -25,13 +25,16 @@ import {
|
||||
AddColumnsSql,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
BranchContents,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
IndexStatistics,
|
||||
Branches as NativeBranches,
|
||||
OptimizeStats,
|
||||
TableStatistics,
|
||||
Tags,
|
||||
UpdateFieldMetadataResult,
|
||||
UpdateResult,
|
||||
Table as _NativeTable,
|
||||
} from "./native";
|
||||
@@ -508,6 +511,18 @@ export abstract class Table {
|
||||
abstract alterColumns(
|
||||
columnAlterations: ColumnAlteration[],
|
||||
): Promise<AlterColumnsResult>;
|
||||
|
||||
/**
|
||||
* Update per-field (column) metadata.
|
||||
* @param {FieldMetadataUpdate[]} updates One or more per-field updates. Each
|
||||
* update's metadata is merged into the field's existing metadata by default;
|
||||
* a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
* @returns {Promise<UpdateFieldMetadataResult>} resolves to the new table version.
|
||||
*/
|
||||
abstract updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult>;
|
||||
|
||||
/**
|
||||
* Drop one or more columns from the dataset
|
||||
*
|
||||
@@ -640,6 +655,14 @@ export abstract class Table {
|
||||
*/
|
||||
abstract tags(): Promise<Tags>;
|
||||
|
||||
/**
|
||||
* Get the branch manager for this table.
|
||||
*
|
||||
* Branches are isolated, writable lines of history forked from another
|
||||
* branch (or version). Writes on a branch do not affect `main`.
|
||||
*/
|
||||
abstract branches(): Promise<Branches>;
|
||||
|
||||
/**
|
||||
* Restore the table to the currently checked out version
|
||||
*
|
||||
@@ -1037,6 +1060,12 @@ export class LocalTable extends Table {
|
||||
return await this.inner.alterColumns(processedAlterations);
|
||||
}
|
||||
|
||||
async updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult> {
|
||||
return await this.inner.updateFieldMetadata(updates);
|
||||
}
|
||||
|
||||
async dropColumns(columnNames: string[]): Promise<DropColumnsResult> {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
@@ -1089,6 +1118,10 @@ export class LocalTable extends Table {
|
||||
return await this.inner.tags();
|
||||
}
|
||||
|
||||
async branches(): Promise<Branches> {
|
||||
return new Branches(await this.inner.branches());
|
||||
}
|
||||
|
||||
async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> {
|
||||
let cleanupOlderThanMs;
|
||||
if (
|
||||
@@ -1203,3 +1236,73 @@ export interface ColumnAlteration {
|
||||
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
|
||||
nullable?: boolean;
|
||||
}
|
||||
|
||||
/** A per-field metadata update, addressed by dot-path. */
|
||||
export interface FieldMetadataUpdate {
|
||||
/**
|
||||
* Dot-separated path to the field. For a top-level column this is just its
|
||||
* name; for a nested field it's the path, e.g. "a.b.c".
|
||||
*/
|
||||
path: string;
|
||||
/**
|
||||
* Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
* default; a value of `null` deletes that key.
|
||||
*/
|
||||
metadata: Record<string, string | null>;
|
||||
/** If true, replace the field's entire metadata map instead of merging. */
|
||||
replace?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Branch manager for a {@link Table}.
|
||||
*
|
||||
* Unlike tags, `create` and `checkout` return a new {@link Table} handle scoped
|
||||
* to the branch; writes on it do not affect `main`.
|
||||
*/
|
||||
export class Branches {
|
||||
#inner: NativeBranches;
|
||||
|
||||
/**
|
||||
* Construct a Branches manager. Internal use only.
|
||||
* @hidden
|
||||
*/
|
||||
constructor(inner: NativeBranches) {
|
||||
this.#inner = inner;
|
||||
}
|
||||
|
||||
/** List all branches, mapping name to branch metadata. */
|
||||
async list(): Promise<Record<string, BranchContents>> {
|
||||
return await this.#inner.list();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a branch and return a handle scoped to it.
|
||||
*
|
||||
* @param name Name of the new branch.
|
||||
* @param fromRef Source branch to fork from. Defaults to `main`.
|
||||
* @param fromVersion A specific version on `fromRef`. Defaults to latest.
|
||||
*/
|
||||
async create(
|
||||
name: string,
|
||||
fromRef?: string,
|
||||
fromVersion?: number,
|
||||
): Promise<Table> {
|
||||
return new LocalTable(await this.#inner.create(name, fromRef, fromVersion));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check out an existing branch and return a handle scoped to it.
|
||||
*
|
||||
* With `version` set, the returned handle is pinned to that version of the
|
||||
* branch (a read-only, detached view); otherwise it tracks the branch's
|
||||
* latest and stays writable.
|
||||
*/
|
||||
async checkout(name: string, version?: number): Promise<Table> {
|
||||
return new LocalTable(await this.#inner.checkout(name, version));
|
||||
}
|
||||
|
||||
/** Delete a branch. */
|
||||
async delete(name: string): Promise<void> {
|
||||
return await this.#inner.delete(name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,9 @@ use std::collections::HashMap;
|
||||
|
||||
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration,
|
||||
FieldMetadataUpdate as LanceFieldMetadataUpdate, NewColumnTransform, OptimizeAction,
|
||||
OptimizeOptions, Ref, Table as LanceDbTable,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
|
||||
@@ -355,6 +356,23 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: Vec<FieldMetadataUpdate>,
|
||||
) -> napi::Result<UpdateFieldMetadataResult> {
|
||||
let updates = updates
|
||||
.into_iter()
|
||||
.map(LanceFieldMetadataUpdate::from)
|
||||
.collect::<Vec<_>>();
|
||||
let res = self
|
||||
.inner_ref()?
|
||||
.update_field_metadata(&updates)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<DropColumnsResult> {
|
||||
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
@@ -460,6 +478,13 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn branches(&self) -> napi::Result<Branches> {
|
||||
Ok(Branches {
|
||||
inner: self.inner_ref()?.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn optimize(
|
||||
&self,
|
||||
@@ -747,6 +772,29 @@ pub struct ColumnAlteration {
|
||||
pub nullable: Option<bool>,
|
||||
}
|
||||
|
||||
/// A per-field metadata update, addressed by dot-path. Merges into the field's
|
||||
/// existing metadata by default; a `null` value deletes a key, and `replace`
|
||||
/// swaps the field's entire metadata map.
|
||||
#[napi(object)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. "embedding" or "a.b.c").
|
||||
pub path: String,
|
||||
/// Metadata keys to set; a `null` value deletes that key.
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If true, replace the field's entire metadata map instead of merging.
|
||||
pub replace: Option<bool>,
|
||||
}
|
||||
|
||||
impl From<FieldMetadataUpdate> for LanceFieldMetadataUpdate {
|
||||
fn from(js: FieldMetadataUpdate) -> Self {
|
||||
Self {
|
||||
path: js.path,
|
||||
metadata: js.metadata,
|
||||
replace: js.replace.unwrap_or(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
|
||||
type Error = String;
|
||||
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
|
||||
@@ -987,6 +1035,19 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(value: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: value.version as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct DropColumnsResult {
|
||||
pub version: i64,
|
||||
@@ -1006,6 +1067,13 @@ pub struct TagContents {
|
||||
pub manifest_size: i64,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct BranchContents {
|
||||
pub parent_branch: Option<String>,
|
||||
pub parent_version: i64,
|
||||
pub manifest_size: i64,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct Tags {
|
||||
inner: LanceDbTable,
|
||||
@@ -1074,3 +1142,75 @@ impl Tags {
|
||||
.default_error()
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct Branches {
|
||||
inner: LanceDbTable,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl Branches {
|
||||
#[napi]
|
||||
pub async fn list(&self) -> napi::Result<HashMap<String, BranchContents>> {
|
||||
let branches = self.inner.list_branches().await.default_error()?;
|
||||
let result = branches
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
(
|
||||
k,
|
||||
BranchContents {
|
||||
parent_branch: v.parent_branch,
|
||||
parent_version: v.parent_version as i64,
|
||||
manifest_size: v.manifest_size as i64,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn create(
|
||||
&self,
|
||||
name: String,
|
||||
from_ref: Option<String>,
|
||||
from_version: Option<i64>,
|
||||
) -> napi::Result<Table> {
|
||||
let from_ref = from_ref.filter(|b| b != "main");
|
||||
let from_version = from_version
|
||||
.map(|v| {
|
||||
u64::try_from(v).map_err(|_| {
|
||||
napi::Error::from_reason("from_version must be a non-negative integer")
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
let from = Ref::Version(from_ref, from_version);
|
||||
let table = self
|
||||
.inner
|
||||
.create_branch(&name, from)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(Table::new(table))
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn checkout(&self, name: String, version: Option<i64>) -> napi::Result<Table> {
|
||||
let version = version
|
||||
.map(|v| {
|
||||
u64::try_from(v)
|
||||
.map_err(|_| napi::Error::from_reason("version must be a non-negative integer"))
|
||||
})
|
||||
.transpose()?;
|
||||
let table = self
|
||||
.inner
|
||||
.checkout_branch(&name, version)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(Table::new(table))
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn delete(&self, name: String) -> napi::Result<()> {
|
||||
self.inner.delete_branch(&name).await.default_error()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,6 +315,15 @@ def deserialize_conn(
|
||||
manifest_enabled=parsed.get("manifest_enabled", False),
|
||||
namespace_client_properties=parsed.get("namespace_client_properties"),
|
||||
)
|
||||
elif connection_type == "remote":
|
||||
return RemoteDBConnection(
|
||||
parsed["db_url"],
|
||||
parsed["api_key"],
|
||||
parsed.get("region", "us-east-1"),
|
||||
host_override=parsed.get("host_override"),
|
||||
client_config=parsed.get("client_config"),
|
||||
storage_options=storage_options,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown connection_type: {connection_type}")
|
||||
|
||||
|
||||
@@ -208,6 +208,9 @@ class Table:
|
||||
async def alter_columns(
|
||||
self, columns: list[dict[str, Any]]
|
||||
) -> AlterColumnsResult: ...
|
||||
async def update_field_metadata(
|
||||
self, updates: list[dict[str, Any]]
|
||||
) -> UpdateFieldMetadataResult: ...
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -223,6 +226,9 @@ class Table:
|
||||
async def close_lsm_writers(self) -> None: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
@property
|
||||
def branches(self) -> Branches: ...
|
||||
def current_branch(self) -> Optional[str]: ...
|
||||
def query(self) -> Query: ...
|
||||
def take_offsets(self, offsets: list[int]) -> TakeQuery: ...
|
||||
def take_row_ids(self, row_ids: list[int]) -> TakeQuery: ...
|
||||
@@ -235,6 +241,17 @@ class Tags:
|
||||
async def delete(self, tag: str): ...
|
||||
async def update(self, tag: str, version: int): ...
|
||||
|
||||
class Branches:
|
||||
async def list(self) -> Dict[str, Any]: ...
|
||||
async def create(
|
||||
self,
|
||||
name: str,
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> Table: ...
|
||||
async def checkout(self, name: str, version: Optional[int] = None) -> Table: ...
|
||||
async def delete(self, name: str) -> None: ...
|
||||
|
||||
class IndexConfig:
|
||||
name: str
|
||||
index_type: str
|
||||
@@ -460,6 +477,9 @@ class AddColumnsResult:
|
||||
class AlterColumnsResult:
|
||||
version: int
|
||||
|
||||
class UpdateFieldMetadataResult:
|
||||
version: int
|
||||
|
||||
class DropColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -416,6 +416,8 @@ class DBConnection(EnforceOverrides):
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> Table:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -444,6 +446,14 @@ class DBConnection(EnforceOverrides):
|
||||
connection will be inherited by the table, but can be overridden here.
|
||||
See available options at
|
||||
<https://docs.lancedb.com/storage/>
|
||||
branch: str, optional
|
||||
If provided, open a handle scoped to this branch instead of the
|
||||
default branch. Reads and writes operate in the branch's context.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Composes with ``branch``: when both are given,
|
||||
opens that branch at the version; otherwise opens ``main`` at the
|
||||
version. Call ``checkout_latest`` to return to a writable state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -958,6 +968,8 @@ class LanceDBConnection(DBConnection):
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> LanceTable:
|
||||
"""Open a table in the database.
|
||||
|
||||
@@ -968,6 +980,14 @@ class LanceDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from. When non-empty, the
|
||||
table is resolved through the directory namespace client.
|
||||
branch: str, optional
|
||||
If provided, open a handle scoped to this branch instead of the
|
||||
default branch. Reads and writes operate in the branch's context.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Composes with ``branch``: when both are given,
|
||||
opens that branch at the version; otherwise opens ``main`` at the
|
||||
version. Call ``checkout_latest`` to return to a writable state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -987,20 +1007,26 @@ class LanceDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
if namespace_path:
|
||||
return self._namespace_conn().open_table(
|
||||
tbl = self._namespace_conn().open_table(
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
else:
|
||||
tbl = LanceTable.open(
|
||||
self,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
|
||||
return LanceTable.open(
|
||||
self,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
index_cache_size=index_cache_size,
|
||||
)
|
||||
if branch is not None:
|
||||
tbl = tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
@@ -1641,6 +1667,8 @@ class AsyncConnection(object):
|
||||
location: Optional[str] = None,
|
||||
namespace_client: Optional[Any] = None,
|
||||
managed_versioning: Optional[bool] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> AsyncTable:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -1676,6 +1704,14 @@ class AsyncConnection(object):
|
||||
managed_versioning: bool, optional
|
||||
Whether managed versioning is enabled for this table. If provided,
|
||||
avoids a redundant describe_table call when namespace_client is set.
|
||||
branch: str, optional
|
||||
If provided, open a handle scoped to this branch instead of the
|
||||
default branch. Reads and writes operate in the branch's context.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Composes with ``branch``: when both are given,
|
||||
opens that branch at the version; otherwise opens ``main`` at the
|
||||
version. Call ``checkout_latest`` to return to a writable state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -1692,7 +1728,14 @@ class AsyncConnection(object):
|
||||
namespace_client=namespace_client,
|
||||
managed_versioning=managed_versioning,
|
||||
)
|
||||
return AsyncTable(table)
|
||||
tbl = AsyncTable(table)
|
||||
# "main" is the default branch, so treat it as no branch: remote rejects
|
||||
# every branch checkout (even "main"), and the version still applies.
|
||||
if branch is not None and branch != "main":
|
||||
tbl = await tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
await tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
async def clone_table(
|
||||
self,
|
||||
|
||||
@@ -544,6 +544,8 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> Table:
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
@@ -562,7 +564,7 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
|
||||
return LanceTable(
|
||||
tbl = LanceTable(
|
||||
self,
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
@@ -570,6 +572,11 @@ class LanceNamespaceDBConnection(DBConnection):
|
||||
pushdown_operations=self._namespace_client_pushdown_operations,
|
||||
_async=async_table,
|
||||
)
|
||||
if branch is not None:
|
||||
tbl = tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
@override
|
||||
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
@@ -974,12 +981,14 @@ class AsyncLanceNamespaceDBConnection:
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> AsyncTable:
|
||||
"""Open an existing table from the namespace."""
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
try:
|
||||
return await self._inner.open_table(
|
||||
tbl = await self._inner.open_table(
|
||||
name,
|
||||
namespace_path=namespace_path,
|
||||
storage_options=storage_options,
|
||||
@@ -990,6 +999,13 @@ class AsyncLanceNamespaceDBConnection:
|
||||
table_id = namespace_path + [name]
|
||||
raise TableNotFoundError(f"Table not found: {'$'.join(table_id)}")
|
||||
raise
|
||||
# "main" is the default branch, so treat it as no branch (mirrors the
|
||||
# sync remote path); the version still applies.
|
||||
if branch is not None and branch != "main":
|
||||
tbl = await tbl.branches.checkout(branch, version)
|
||||
elif version is not None:
|
||||
await tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
async def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
"""Drop a table from the namespace."""
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
|
||||
from deprecation import deprecated
|
||||
import pyarrow as pa
|
||||
|
||||
from ._lancedb import async_permutation_builder, PermutationReader
|
||||
from .table import LanceTable
|
||||
from .table import LanceTable, Table
|
||||
from .background_loop import LOOP
|
||||
from .util import batch_to_tensor, batch_to_tensor_rows
|
||||
from typing import Any, Callable, Iterator, Literal, Optional, TYPE_CHECKING, Union
|
||||
@@ -354,6 +355,49 @@ class Transforms:
|
||||
DEFAULT_BATCH_SIZE = 100
|
||||
|
||||
|
||||
def _table_to_pickle_state(table: Table) -> dict[str, Any]:
|
||||
from .remote.table import RemoteTable
|
||||
|
||||
if isinstance(table, RemoteTable):
|
||||
return {
|
||||
"kind": "remote",
|
||||
"table": table,
|
||||
}
|
||||
|
||||
if not isinstance(table, LanceTable):
|
||||
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
|
||||
|
||||
base_uri = table._conn.uri
|
||||
if base_uri.startswith("memory://"):
|
||||
return {
|
||||
"kind": "memory",
|
||||
"name": table.name,
|
||||
"data": table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
"kind": "local",
|
||||
"name": table.name,
|
||||
"uri": base_uri,
|
||||
"namespace": table._namespace_path,
|
||||
"storage_options": table._conn.storage_options,
|
||||
}
|
||||
|
||||
|
||||
def _table_from_pickle_state(state: dict[str, Any]) -> Table:
|
||||
from . import connect
|
||||
|
||||
kind = state["kind"]
|
||||
if kind == "remote":
|
||||
return state["table"]
|
||||
if kind == "memory":
|
||||
return connect("memory://").create_table(state["name"], state["data"])
|
||||
if kind == "local":
|
||||
db = connect(state["uri"], storage_options=state["storage_options"])
|
||||
return db.open_table(state["name"], namespace_path=state["namespace"] or None)
|
||||
raise ValueError(f"Unknown table pickle state kind: {kind}")
|
||||
|
||||
|
||||
class Permutation:
|
||||
"""
|
||||
A Permutation is a view of a dataset that can be used as input to model training
|
||||
@@ -369,15 +413,15 @@ class Permutation:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable],
|
||||
base_table: Table,
|
||||
permutation_table: Optional[Table],
|
||||
split: int,
|
||||
selection: dict[str, str],
|
||||
batch_size: int,
|
||||
transform_fn: Callable[pa.RecordBatch, Any],
|
||||
offset: Optional[int] = None,
|
||||
limit: Optional[int] = None,
|
||||
connection_factory: Optional[Callable[[str], LanceTable]] = None,
|
||||
connection_factory: Optional[Callable[[str], Table]] = None,
|
||||
_reader: Optional[PermutationReader] = None,
|
||||
):
|
||||
"""
|
||||
@@ -397,6 +441,7 @@ class Permutation:
|
||||
if _reader is None:
|
||||
_reader = LOOP.run(self._build_reader())
|
||||
self.reader: PermutationReader = _reader
|
||||
self._pid = os.getpid()
|
||||
|
||||
async def _build_reader(self) -> PermutationReader:
|
||||
reader = await PermutationReader.from_tables(
|
||||
@@ -428,29 +473,25 @@ class Permutation:
|
||||
return new
|
||||
|
||||
def with_connection_factory(
|
||||
self, connection_factory: Callable[[str], LanceTable]
|
||||
self, connection_factory: Callable[[str], Table]
|
||||
) -> "Permutation":
|
||||
"""
|
||||
Creates a new permutation that will use ``connection_factory`` to reopen
|
||||
the base table when this permutation is unpickled in a worker process.
|
||||
|
||||
The factory is a callable that takes a single argument — the base table
|
||||
name — and returns a [LanceTable]. It must be picklable; the worker
|
||||
The factory is a callable that takes a single argument, the base table
|
||||
name, and returns a LanceDB table. It must be picklable; the worker
|
||||
will pickle it via standard ``pickle`` and call it to recover the base
|
||||
table. Picklable callables in practice means top-level (module-level)
|
||||
functions, ``functools.partial`` of such functions, or instances of
|
||||
picklable classes implementing ``__call__``. Lambdas and closures over
|
||||
local variables don't pickle with the default protocol.
|
||||
|
||||
Setting a factory is necessary when the URI alone is not enough to
|
||||
re-open the connection — most importantly for LanceDB Cloud (``db://``)
|
||||
connections, where ``api_key`` and ``region`` aren't recoverable from
|
||||
the connection object after construction.
|
||||
|
||||
For local file or cloud-storage paths the factory is optional: if not
|
||||
set, ``__getstate__`` falls back to capturing
|
||||
``(uri, storage_options, namespace_path)`` and re-opening via
|
||||
``lancedb.connect(uri, storage_options=...)``.
|
||||
A factory is optional for normal local and remote LanceDB connections:
|
||||
if not set, ``__getstate__`` captures the table's own picklable reopen
|
||||
state. Use a factory when that default state is not enough, for example
|
||||
when credentials should be loaded from the worker environment instead
|
||||
of being embedded in the pickle.
|
||||
|
||||
Examples
|
||||
--------
|
||||
@@ -508,7 +549,7 @@ class Permutation:
|
||||
return new
|
||||
|
||||
@classmethod
|
||||
def identity(cls, table: LanceTable) -> "Permutation":
|
||||
def identity(cls, table: Table) -> "Permutation":
|
||||
"""
|
||||
Creates an identity permutation for the given table.
|
||||
"""
|
||||
@@ -517,8 +558,8 @@ class Permutation:
|
||||
@classmethod
|
||||
def from_tables(
|
||||
cls,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable] = None,
|
||||
base_table: Table,
|
||||
permutation_table: Optional[Table] = None,
|
||||
split: Optional[Union[str, int]] = None,
|
||||
) -> "Permutation":
|
||||
"""
|
||||
@@ -594,11 +635,10 @@ class Permutation:
|
||||
|
||||
The base table is captured either via a user-supplied
|
||||
``connection_factory`` (see [with_connection_factory]) or, as a
|
||||
fallback, by introspecting ``(uri, storage_options, namespace_path)``
|
||||
on the connection. The permutation table — always an in-memory
|
||||
LanceDB table — is captured as a pyarrow Table (which pickles via
|
||||
Arrow IPC natively). The reader is dropped from the wire format;
|
||||
``__setstate__`` rebuilds it from the restored tables.
|
||||
fallback, by the table's own picklable reopen state. The permutation
|
||||
table is captured as a pyarrow Table (which pickles via Arrow IPC
|
||||
natively). The reader is dropped from the wire format and rebuilt
|
||||
lazily on first use.
|
||||
"""
|
||||
permutation_data: Optional[pa.Table] = None
|
||||
if self.permutation_table is not None:
|
||||
@@ -622,39 +662,9 @@ class Permutation:
|
||||
# namespace from the existing connection.
|
||||
return common
|
||||
|
||||
# URI-introspection fallback: only viable for native (OSS) connections
|
||||
# where (uri, storage_options) is enough to reopen. Remote / cloud
|
||||
# connections don't expose recoverable api_key / region — those users
|
||||
# must call with_connection_factory().
|
||||
try:
|
||||
base_uri = self.base_table._conn.uri
|
||||
storage_options = self.base_table._conn.storage_options
|
||||
except AttributeError as e:
|
||||
raise ValueError(
|
||||
"Cannot pickle this Permutation: the base table's connection "
|
||||
"does not expose a uri/storage_options, which usually means it "
|
||||
"is a remote (LanceDB Cloud) connection. Call "
|
||||
"Permutation.with_connection_factory(...) first to provide a "
|
||||
"picklable callable that re-opens the base table from a worker "
|
||||
"process."
|
||||
) from e
|
||||
|
||||
if base_uri.startswith("memory://"):
|
||||
# In-memory base tables don't exist in any worker process by
|
||||
# default, so dump the entire base table into the pickle. This
|
||||
# can be expensive for large datasets — users with large
|
||||
# in-memory base tables should either persist them or set a
|
||||
# connection_factory.
|
||||
return {
|
||||
**common,
|
||||
"base_table_data": self.base_table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
**common,
|
||||
"base_table_uri": base_uri,
|
||||
"base_table_namespace": self.base_table._namespace_path,
|
||||
"base_table_storage_options": storage_options,
|
||||
"base_table_state": _table_to_pickle_state(self.base_table),
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict[str, Any]) -> None:
|
||||
@@ -663,6 +673,8 @@ class Permutation:
|
||||
connection_factory = state["connection_factory"]
|
||||
if connection_factory is not None:
|
||||
base_table = connection_factory(state["base_table_name"])
|
||||
elif "base_table_state" in state:
|
||||
base_table = _table_from_pickle_state(state["base_table_state"])
|
||||
elif "base_table_data" in state:
|
||||
# In-memory base table inlined into the pickle; rebuild the same
|
||||
# way we rebuild the in-memory permutation table.
|
||||
@@ -680,7 +692,7 @@ class Permutation:
|
||||
namespace_path=state["base_table_namespace"] or None,
|
||||
)
|
||||
|
||||
permutation_table: Optional[LanceTable] = None
|
||||
permutation_table: Optional[Table] = None
|
||||
if state["permutation_data"] is not None:
|
||||
mem_db = connect("memory://")
|
||||
permutation_table = mem_db.create_table(
|
||||
@@ -696,10 +708,28 @@ class Permutation:
|
||||
self.offset = state["offset"]
|
||||
self.limit = state["limit"]
|
||||
self.connection_factory = connection_factory
|
||||
self.reader = None
|
||||
self._pid = None
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if self.reader is not None and getattr(self, "_pid", None) == pid:
|
||||
return
|
||||
# The reader owns Rust-side table handles. Rebuild it after unpickle or
|
||||
# fork even though the Python table wrappers reopen themselves.
|
||||
if hasattr(self.base_table, "_ensure_open"):
|
||||
self.base_table._ensure_open()
|
||||
if self.permutation_table is not None and hasattr(
|
||||
self.permutation_table, "_ensure_open"
|
||||
):
|
||||
self.permutation_table._ensure_open()
|
||||
self.reader = LOOP.run(self._build_reader())
|
||||
self._pid = pid
|
||||
|
||||
@property
|
||||
def schema(self) -> pa.Schema:
|
||||
self._ensure_open()
|
||||
|
||||
async def do_output_schema():
|
||||
return await self.reader.output_schema(self.selection)
|
||||
|
||||
@@ -717,6 +747,7 @@ class Permutation:
|
||||
"""
|
||||
The number of rows in the permutation
|
||||
"""
|
||||
self._ensure_open()
|
||||
return self.reader.count_rows()
|
||||
|
||||
@property
|
||||
@@ -875,6 +906,7 @@ class Permutation:
|
||||
If skip_last_batch is True, the last batch will be skipped if it is not a
|
||||
multiple of batch_size.
|
||||
"""
|
||||
self._ensure_open()
|
||||
|
||||
async def get_iter():
|
||||
return await self.reader.read(self.selection, batch_size=batch_size)
|
||||
@@ -976,6 +1008,7 @@ class Permutation:
|
||||
so `with_format` and `with_transform` affect this method in the same way
|
||||
they affect iteration.
|
||||
"""
|
||||
self._ensure_open()
|
||||
|
||||
async def do_take_offsets():
|
||||
return await self.reader.take_offsets(offsets, selection=self.selection)
|
||||
@@ -1011,9 +1044,11 @@ class Permutation:
|
||||
"""
|
||||
Skip the first `skip` rows of the permutation
|
||||
"""
|
||||
self._ensure_open()
|
||||
new = copy.copy(self)
|
||||
new.offset = skip
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
new._pid = os.getpid()
|
||||
return new
|
||||
|
||||
@deprecated(details="Use with_take instead")
|
||||
@@ -1032,9 +1067,11 @@ class Permutation:
|
||||
"""
|
||||
Limit the permutation to `limit` rows (following any `skip`)
|
||||
"""
|
||||
self._ensure_open()
|
||||
new = copy.copy(self)
|
||||
new.limit = limit
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
new._pid = os.getpid()
|
||||
return new
|
||||
|
||||
@deprecated(details="Use with_repeat instead")
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import sys
|
||||
@@ -17,7 +18,7 @@ else:
|
||||
|
||||
# Remove this import to fix circular dependency
|
||||
# from lancedb import connect_async
|
||||
from lancedb.remote import ClientConfig
|
||||
from lancedb.remote import ClientConfig, RetryConfig, TimeoutConfig, TlsConfig
|
||||
import pyarrow as pa
|
||||
|
||||
from ..common import DATA
|
||||
@@ -36,6 +37,64 @@ from ..table import Table
|
||||
from ..util import validate_table_name
|
||||
|
||||
|
||||
def _duration_seconds(value: Optional[timedelta]) -> Optional[float]:
|
||||
return value.total_seconds() if value is not None else None
|
||||
|
||||
|
||||
def _timeout_config_to_dict(
|
||||
config: Optional[TimeoutConfig],
|
||||
) -> Optional[dict[str, Any]]:
|
||||
if config is None:
|
||||
return None
|
||||
return {
|
||||
"timeout": _duration_seconds(config.timeout),
|
||||
"connect_timeout": _duration_seconds(config.connect_timeout),
|
||||
"read_timeout": _duration_seconds(config.read_timeout),
|
||||
"pool_idle_timeout": _duration_seconds(config.pool_idle_timeout),
|
||||
}
|
||||
|
||||
|
||||
def _retry_config_to_dict(config: RetryConfig) -> dict[str, Any]:
|
||||
return {
|
||||
"retries": config.retries,
|
||||
"connect_retries": config.connect_retries,
|
||||
"read_retries": config.read_retries,
|
||||
"backoff_factor": config.backoff_factor,
|
||||
"backoff_jitter": config.backoff_jitter,
|
||||
"statuses": config.statuses,
|
||||
}
|
||||
|
||||
|
||||
def _tls_config_to_dict(config: Optional[TlsConfig]) -> Optional[dict[str, Any]]:
|
||||
if config is None:
|
||||
return None
|
||||
return {
|
||||
"cert_file": config.cert_file,
|
||||
"key_file": config.key_file,
|
||||
"ssl_ca_cert": config.ssl_ca_cert,
|
||||
"assert_hostname": config.assert_hostname,
|
||||
}
|
||||
|
||||
|
||||
def _client_config_to_dict(config: ClientConfig) -> dict[str, Any]:
|
||||
if config.header_provider is not None:
|
||||
raise ValueError(
|
||||
"Cannot serialize a remote connection with a header_provider. "
|
||||
"Use static api_key/extra_headers or provide a worker-side "
|
||||
"connection factory instead."
|
||||
)
|
||||
return {
|
||||
"user_agent": config.user_agent,
|
||||
"retry_config": _retry_config_to_dict(config.retry_config),
|
||||
"timeout_config": _timeout_config_to_dict(config.timeout_config),
|
||||
"extra_headers": config.extra_headers,
|
||||
"id_delimiter": config.id_delimiter,
|
||||
"tls_config": _tls_config_to_dict(config.tls_config),
|
||||
"header_provider": None,
|
||||
"user_id": config.user_id,
|
||||
}
|
||||
|
||||
|
||||
class RemoteDBConnection(DBConnection):
|
||||
"""A connection to a remote LanceDB database."""
|
||||
|
||||
@@ -89,6 +148,11 @@ class RemoteDBConnection(DBConnection):
|
||||
parsed = urlparse(db_url)
|
||||
if parsed.scheme != "db":
|
||||
raise ValueError(f"Invalid scheme: {parsed.scheme}, only accepts db://")
|
||||
self.db_url = db_url
|
||||
self.api_key = api_key
|
||||
self.region = region
|
||||
self.host_override = host_override
|
||||
self.storage_options = storage_options
|
||||
self.db_name = parsed.netloc
|
||||
|
||||
self.client_config = client_config
|
||||
@@ -111,6 +175,20 @@ class RemoteDBConnection(DBConnection):
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteConnect(name={self.db_name})"
|
||||
|
||||
@override
|
||||
def serialize(self) -> str:
|
||||
return json.dumps(
|
||||
{
|
||||
"connection_type": "remote",
|
||||
"db_url": self.db_url,
|
||||
"api_key": self.api_key,
|
||||
"region": self.region,
|
||||
"host_override": self.host_override,
|
||||
"client_config": _client_config_to_dict(self.client_config),
|
||||
"storage_options": self.storage_options,
|
||||
}
|
||||
)
|
||||
|
||||
@override
|
||||
def list_namespaces(
|
||||
self,
|
||||
@@ -305,6 +383,8 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
index_cache_size: Optional[int] = None,
|
||||
branch: Optional[str] = None,
|
||||
version: Optional[int] = None,
|
||||
) -> Table:
|
||||
"""Open a Lance Table in the database.
|
||||
|
||||
@@ -315,6 +395,14 @@ class RemoteDBConnection(DBConnection):
|
||||
namespace_path: List[str], optional
|
||||
The namespace to open the table from.
|
||||
None or empty list represents root namespace.
|
||||
branch: str, optional
|
||||
Branching is not yet supported on remote tables, so only the
|
||||
default branch is accepted (``None`` or ``"main"``); any other
|
||||
value raises ``NotImplementedError``.
|
||||
version: int, optional
|
||||
If provided, open the table pinned to this version, producing a
|
||||
read-only handle. Call ``checkout_latest`` to return to a writable
|
||||
state.
|
||||
|
||||
Returns
|
||||
-------
|
||||
@@ -322,6 +410,11 @@ class RemoteDBConnection(DBConnection):
|
||||
"""
|
||||
from .table import RemoteTable
|
||||
|
||||
# Remote supports version time-travel but not branches: reject a non-main
|
||||
# branch, but allow a version-only open (or "main").
|
||||
if branch is not None and branch != "main":
|
||||
raise NotImplementedError("branching is not yet supported on remote tables")
|
||||
|
||||
if namespace_path is None:
|
||||
namespace_path = []
|
||||
if index_cache_size is not None:
|
||||
@@ -331,7 +424,15 @@ class RemoteDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
|
||||
return RemoteTable(table, self.db_name)
|
||||
tbl = RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
if version is not None:
|
||||
tbl.checkout(version)
|
||||
return tbl
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
@@ -380,7 +481,12 @@ class RemoteDBConnection(DBConnection):
|
||||
is_shallow=is_shallow,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=target_namespace_path,
|
||||
)
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
@@ -525,7 +631,12 @@ class RemoteDBConnection(DBConnection):
|
||||
fill_value=fill_value,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
|
||||
@override
|
||||
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
|
||||
@@ -5,6 +5,7 @@ from datetime import timedelta
|
||||
import deprecation
|
||||
import logging
|
||||
from functools import cached_property
|
||||
import os
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
@@ -24,6 +25,7 @@ from lancedb._lancedb import (
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
@@ -63,14 +65,80 @@ class RemoteTable(Table):
|
||||
self,
|
||||
table: AsyncTable,
|
||||
db_name: str,
|
||||
*,
|
||||
connection_state: Optional[Union[str, Callable[[], str]]] = None,
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
):
|
||||
self._table = table
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self.db_name = db_name
|
||||
self._connection_state = connection_state
|
||||
self._namespace_path = list(namespace_path or [])
|
||||
self._checkout_version: Optional[int] = None
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _serialized_connection_state(self) -> str:
|
||||
if self._connection_state is None:
|
||||
raise RuntimeError(
|
||||
"Cannot reopen this remote table because it does not carry "
|
||||
"serialized connection state"
|
||||
)
|
||||
if callable(self._connection_state):
|
||||
self._connection_state = self._connection_state()
|
||||
return self._connection_state
|
||||
|
||||
@property
|
||||
def _table(self) -> AsyncTable:
|
||||
self._ensure_open()
|
||||
assert self._table_handle is not None
|
||||
return self._table_handle
|
||||
|
||||
@_table.setter
|
||||
def _table(self, table: AsyncTable) -> None:
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if self._table_handle is not None and self._pid == pid:
|
||||
return
|
||||
|
||||
# Pickle clears the handle; fork inherits a handle created in the
|
||||
# parent process. In both cases reopen before touching the Rust client.
|
||||
from lancedb import deserialize_conn
|
||||
|
||||
db = deserialize_conn(self._serialized_connection_state(), for_worker=True)
|
||||
table = db.open_table(self._name, namespace_path=self._namespace_path)
|
||||
if self._checkout_version is not None:
|
||||
table.checkout(self._checkout_version)
|
||||
|
||||
self._table_handle = table._table
|
||||
self.db_name = table.db_name
|
||||
self._pid = pid
|
||||
|
||||
def __getstate__(self) -> dict:
|
||||
return {
|
||||
"connection_state": self._serialized_connection_state(),
|
||||
"db_name": self.db_name,
|
||||
"name": self.name,
|
||||
"namespace_path": self._namespace_path,
|
||||
"checkout_version": self._checkout_version,
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict) -> None:
|
||||
self._table_handle = None
|
||||
self._name = state["name"]
|
||||
self.db_name = state["db_name"]
|
||||
self._connection_state = state["connection_state"]
|
||||
self._namespace_path = state["namespace_path"]
|
||||
self._checkout_version = state["checkout_version"]
|
||||
self._pid = None
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table"""
|
||||
return self._table.name
|
||||
return self._name
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteTable({self.db_name}.{self.name})"
|
||||
@@ -120,13 +188,19 @@ class RemoteTable(Table):
|
||||
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
|
||||
|
||||
def checkout(self, version: Union[int, str]):
|
||||
return LOOP.run(self._table.checkout(version))
|
||||
result = LOOP.run(self._table.checkout(version))
|
||||
self._checkout_version = self.version
|
||||
return result
|
||||
|
||||
def checkout_latest(self):
|
||||
return LOOP.run(self._table.checkout_latest())
|
||||
result = LOOP.run(self._table.checkout_latest())
|
||||
self._checkout_version = None
|
||||
return result
|
||||
|
||||
def restore(self, version: Optional[Union[int, str]] = None):
|
||||
return LOOP.run(self._table.restore(version))
|
||||
result = LOOP.run(self._table.restore(version))
|
||||
self._checkout_version = None
|
||||
return result
|
||||
|
||||
def list_indices(self) -> Iterable[IndexConfig]:
|
||||
"""List all the indices on the table"""
|
||||
@@ -777,6 +851,11 @@ class RemoteTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
|
||||
@@ -154,6 +154,7 @@ if TYPE_CHECKING:
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
@@ -757,6 +758,15 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def branches(self) -> "Branches":
|
||||
"""Branch management for the table.
|
||||
|
||||
Branches are isolated, writable lines of history forked from another
|
||||
branch (or version). Writes on a branch do not affect ``main``.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""The number of rows in this Table"""
|
||||
return self.count_rows(None)
|
||||
@@ -1799,6 +1809,29 @@ class Table(ABC):
|
||||
version: the new version number of the table after the alteration.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""
|
||||
Update per-field (column) metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
updates : dict
|
||||
One or more dicts, each with:
|
||||
- "path": str — dot-path to the field (e.g. "embedding" or "a.b.c").
|
||||
- "metadata": dict[str, str | None] — keys to set; a value of ``None``
|
||||
deletes that key.
|
||||
- "replace": bool, optional — replace the field's whole metadata map
|
||||
instead of merging (default False).
|
||||
|
||||
Returns
|
||||
-------
|
||||
UpdateFieldMetadataResult
|
||||
version: the new table version after the update.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
"""
|
||||
@@ -2062,22 +2095,27 @@ class LanceTable(Table):
|
||||
"Please install with `pip install pylance`."
|
||||
)
|
||||
|
||||
branch = self.current_branch()
|
||||
version = None if branch is not None else self.version
|
||||
if self._namespace_client is not None:
|
||||
table_id = self._namespace_path + [self.name]
|
||||
return lance.dataset(
|
||||
version=self.version,
|
||||
ds = lance.dataset(
|
||||
version=version,
|
||||
storage_options=self._conn.storage_options,
|
||||
namespace_client=self._namespace_client,
|
||||
table_id=table_id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return lance.dataset(
|
||||
self._dataset_path,
|
||||
version=self.version,
|
||||
storage_options=self._conn.storage_options,
|
||||
**kwargs,
|
||||
)
|
||||
else:
|
||||
ds = lance.dataset(
|
||||
self._dataset_path,
|
||||
version=version,
|
||||
storage_options=self._conn.storage_options,
|
||||
**kwargs,
|
||||
)
|
||||
if branch is not None:
|
||||
ds = ds.checkout_version((branch, self.version))
|
||||
return ds
|
||||
|
||||
@property
|
||||
def schema(self) -> pa.Schema:
|
||||
@@ -2143,6 +2181,19 @@ class LanceTable(Table):
|
||||
"""
|
||||
return Tags(self._table)
|
||||
|
||||
@property
|
||||
def branches(self) -> "Branches":
|
||||
"""Branch management for the table.
|
||||
|
||||
``create``/``checkout`` return a new table handle scoped to the branch;
|
||||
writes on it do not affect ``main``.
|
||||
"""
|
||||
return Branches(self)
|
||||
|
||||
def current_branch(self) -> Optional[str]:
|
||||
"""The branch this table handle is scoped to, or ``None`` for ``main``."""
|
||||
return self._table.current_branch()
|
||||
|
||||
def checkout(self, version: Union[int, str]):
|
||||
"""Checkout a version of the table. This is an in-place operation.
|
||||
|
||||
@@ -3397,9 +3448,13 @@ class LanceTable(Table):
|
||||
batch_size: Optional[int] = None,
|
||||
timeout: Optional[timedelta] = None,
|
||||
) -> pa.RecordBatchReader:
|
||||
# Branch queries run locally: the server-side query protocol can't
|
||||
# carry a branch yet.
|
||||
# TODO: push down server-side once it can (with remote table support).
|
||||
if (
|
||||
"QueryTable" in self._pushdown_operations
|
||||
and self._namespace_client is not None
|
||||
and self.current_branch() is None
|
||||
):
|
||||
from lancedb.namespace import _execute_server_side_query
|
||||
|
||||
@@ -3583,6 +3638,11 @@ class LanceTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
@@ -3637,10 +3697,18 @@ class LanceTable(Table):
|
||||
"""
|
||||
LOOP.run(self._table.migrate_v2_manifest_paths())
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
@@ -4291,12 +4359,20 @@ class AsyncTable:
|
||||
"Please install with `pip install pylance`."
|
||||
)
|
||||
|
||||
return lance.dataset(
|
||||
# lance.dataset() can't open a branch directly, so open the base table
|
||||
# and check out the branch ref (a None branch resolves to main).
|
||||
branch = self.current_branch()
|
||||
table_version = await self.version()
|
||||
version = None if branch is not None else table_version
|
||||
ds = lance.dataset(
|
||||
await self.uri(),
|
||||
version=await self.version(),
|
||||
version=version,
|
||||
storage_options=await self.latest_storage_options(),
|
||||
**kwargs,
|
||||
)
|
||||
if branch is not None:
|
||||
ds = ds.checkout_version((branch, table_version))
|
||||
return ds
|
||||
|
||||
async def to_pandas(self, blob_mode: BlobMode = "lazy", **kwargs) -> "pd.DataFrame":
|
||||
"""Return the table as a pandas DataFrame.
|
||||
@@ -5234,6 +5310,13 @@ class AsyncTable:
|
||||
"""
|
||||
return await self._inner.alter_columns(alterations)
|
||||
|
||||
async def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""Update per-field metadata. See
|
||||
[`Table.update_field_metadata`][lancedb.table.Table.update_field_metadata]."""
|
||||
return await self._inner.update_field_metadata(updates)
|
||||
|
||||
async def drop_columns(self, columns: Iterable[str]):
|
||||
"""
|
||||
Drop columns from the table.
|
||||
@@ -5398,6 +5481,19 @@ class AsyncTable:
|
||||
"""
|
||||
return AsyncTags(self._inner)
|
||||
|
||||
@property
|
||||
def branches(self) -> AsyncBranches:
|
||||
"""Branch management for the table.
|
||||
|
||||
Branches are isolated, writable lines of history forked from another
|
||||
branch (or version). Writes on a branch do not affect ``main``.
|
||||
"""
|
||||
return AsyncBranches(self._inner)
|
||||
|
||||
def current_branch(self) -> Optional[str]:
|
||||
"""The branch this table handle is scoped to, or ``None`` for ``main``."""
|
||||
return self._inner.current_branch()
|
||||
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -5518,12 +5614,20 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.migrate_manifest_paths_v2()
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
async def replace_field_metadata(
|
||||
self, field_name: str, new_metadata: dict[str, str]
|
||||
):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
@@ -5725,6 +5829,75 @@ class Tags:
|
||||
LOOP.run(self._table.tags.update(tag, version))
|
||||
|
||||
|
||||
class Branches:
|
||||
"""
|
||||
Table branch manager.
|
||||
"""
|
||||
|
||||
def __init__(self, parent: "LanceTable"):
|
||||
self._parent = parent
|
||||
self._table = parent._table
|
||||
|
||||
def list(self) -> Dict[str, Any]:
|
||||
"""List all branches, mapping name to branch metadata."""
|
||||
return LOOP.run(self._table.branches.list())
|
||||
|
||||
def create(
|
||||
self,
|
||||
name: str,
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> "LanceTable":
|
||||
"""Create a branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the new branch.
|
||||
from_ref: str, optional
|
||||
Source branch to fork from. Defaults to ``main``.
|
||||
from_version: int, optional
|
||||
A specific version on ``from_ref`` to fork from. Defaults to latest.
|
||||
"""
|
||||
async_table = LOOP.run(
|
||||
self._table.branches.create(name, from_ref, from_version)
|
||||
)
|
||||
return self._wrap(async_table)
|
||||
|
||||
def checkout(self, name: str, version: Optional[int] = None) -> "LanceTable":
|
||||
"""Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the branch to check out.
|
||||
version: int, optional
|
||||
A specific version on the branch to pin. When set, the returned
|
||||
handle is a read-only view of that version; when omitted it tracks
|
||||
the branch's latest and stays writable.
|
||||
"""
|
||||
async_table = LOOP.run(self._table.branches.checkout(name, version))
|
||||
return self._wrap(async_table)
|
||||
|
||||
def delete(self, name: str) -> None:
|
||||
"""Delete a branch."""
|
||||
LOOP.run(self._table.branches.delete(name))
|
||||
|
||||
def _wrap(self, async_table: "AsyncTable") -> "LanceTable":
|
||||
# Reuse the parent's connection + namespace context; from_inner would drop
|
||||
# it and break identity/query routing for namespace-backed tables.
|
||||
parent = self._parent
|
||||
return LanceTable(
|
||||
parent._conn,
|
||||
async_table.name,
|
||||
namespace_path=parent._namespace_path,
|
||||
namespace_client=parent._namespace_client,
|
||||
pushdown_operations=parent._pushdown_operations,
|
||||
location=parent._location,
|
||||
_async=async_table,
|
||||
)
|
||||
|
||||
|
||||
class AsyncTags:
|
||||
"""
|
||||
Async table tag manager.
|
||||
@@ -5792,3 +5965,56 @@ class AsyncTags:
|
||||
The new table version to tag.
|
||||
"""
|
||||
await self._table.tags.update(tag, version)
|
||||
|
||||
|
||||
class AsyncBranches:
|
||||
"""Async table branch manager."""
|
||||
|
||||
def __init__(self, table):
|
||||
self._table = table
|
||||
|
||||
async def list(self) -> Dict[str, Any]:
|
||||
"""List all branches, mapping name to branch metadata."""
|
||||
return await self._table.branches.list()
|
||||
|
||||
async def create(
|
||||
self,
|
||||
name: str,
|
||||
from_ref: Optional[str] = None,
|
||||
from_version: Optional[int] = None,
|
||||
) -> "AsyncTable":
|
||||
"""Create a branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the new branch.
|
||||
from_ref: str, optional
|
||||
Source branch to fork from. Defaults to ``main``.
|
||||
from_version: int, optional
|
||||
A specific version on ``from_ref`` to fork from. Defaults to latest.
|
||||
"""
|
||||
# "main" and None are two spellings of the root branch in lance; normalize
|
||||
# so from_ref="main" behaves identically to the default.
|
||||
if from_ref == "main":
|
||||
from_ref = None
|
||||
inner = await self._table.branches.create(name, from_ref, from_version)
|
||||
return AsyncTable(inner)
|
||||
|
||||
async def checkout(self, name: str, version: Optional[int] = None) -> "AsyncTable":
|
||||
"""Check out an existing branch and return a handle scoped to it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name: str
|
||||
Name of the branch to check out.
|
||||
version: int, optional
|
||||
A specific version on the branch to pin. When set, the returned
|
||||
handle is a read-only view of that version; when omitted it tracks
|
||||
the branch's latest and stays writable.
|
||||
"""
|
||||
return AsyncTable(await self._table.branches.checkout(name, version))
|
||||
|
||||
async def delete(self, name: str) -> None:
|
||||
"""Delete a branch."""
|
||||
await self._table.branches.delete(name)
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
import re
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import contextlib
|
||||
from datetime import timedelta
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
@@ -153,6 +154,52 @@ async def test_async_checkout():
|
||||
assert await table.count_rows() == 300
|
||||
|
||||
|
||||
def test_remote_open_table_branch_and_version():
|
||||
def handler(request):
|
||||
# describe (table open + version validation) always succeeds
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(
|
||||
json.dumps({"version": 2, "schema": {"fields": []}}).encode()
|
||||
)
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
# version-only (and "main" + version) is allowed: remote supports
|
||||
# version time-travel even though it has no branches
|
||||
assert db.open_table("test", version=2) is not None
|
||||
assert db.open_table("test", branch="main", version=2) is not None
|
||||
|
||||
# a non-main branch is rejected, with or without a version
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
db.open_table("test", branch="exp")
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
db.open_table("test", branch="exp", version=2)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_remote_open_table_branch_and_version():
|
||||
def handler(request):
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(
|
||||
json.dumps({"version": 2, "schema": {"fields": []}}).encode()
|
||||
)
|
||||
|
||||
async with mock_lancedb_connection_async(handler) as db:
|
||||
# version-only (and "main" + version) is allowed: "main" is the default
|
||||
# branch, so it must not hit the unsupported remote branch path
|
||||
assert await db.open_table("test", version=2) is not None
|
||||
assert await db.open_table("test", branch="main", version=2) is not None
|
||||
|
||||
# a non-main branch is rejected, with or without a version
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
await db.open_table("test", branch="exp")
|
||||
with pytest.raises(NotImplementedError, match="branching"):
|
||||
await db.open_table("test", branch="exp", version=2)
|
||||
|
||||
|
||||
def test_table_len_sync():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=create":
|
||||
@@ -171,6 +218,155 @@ def test_table_len_sync():
|
||||
assert len(table) == 1
|
||||
|
||||
|
||||
def test_remote_connection_serializes():
|
||||
def handler(request):
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"tables": []}')
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
serialized = json.loads(db.serialize())
|
||||
assert isinstance(serialized["client_config"], dict)
|
||||
restored = lancedb.deserialize_conn(db.serialize())
|
||||
assert restored.table_names() == []
|
||||
|
||||
|
||||
def test_remote_table_is_picklable():
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "id", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.open_table("test")
|
||||
restored = pickle.loads(pickle.dumps(table))
|
||||
assert restored.count_rows() == 3
|
||||
|
||||
|
||||
def test_remote_table_open_does_not_require_picklable_client_config():
|
||||
from lancedb.remote import HeaderProvider
|
||||
|
||||
class LocalHeaderProvider(HeaderProvider):
|
||||
def get_headers(self):
|
||||
return {"X-Test-Header": "present"}
|
||||
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
assert request.headers.get("X-Test-Header") == "present"
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with http.server.HTTPServer(
|
||||
("localhost", 0), make_mock_http_handler(handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
"header_provider": LocalHeaderProvider(),
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
assert table.count_rows() == 3
|
||||
with pytest.raises(ValueError, match="header_provider"):
|
||||
pickle.dumps(table)
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
def test_remote_permutation_is_picklable():
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
rows = list(range(10))
|
||||
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "a", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(str(len(rows)).encode())
|
||||
elif request.path == "/v1/table/test/query/":
|
||||
content_len = int(request.headers.get("Content-Length"))
|
||||
body = json.loads(request.rfile.read(content_len))
|
||||
if "filter" in body:
|
||||
match = re.search(r"_rowoffset in \((.*?)\)", body["filter"])
|
||||
offsets = [int(offset.strip()) for offset in match.group(1).split(",")]
|
||||
else:
|
||||
offsets = rows
|
||||
table = pa.table({"a": [rows[offset] for offset in offsets]})
|
||||
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
|
||||
request.end_headers()
|
||||
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
permutation = Permutation.identity(db.open_table("test"))
|
||||
restored = pickle.loads(pickle.dumps(permutation))
|
||||
assert restored.__getitems__([0, 2, 4]) == [{"a": 0}, {"a": 2}, {"a": 4}]
|
||||
|
||||
|
||||
def test_create_table_exist_ok():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=exist_ok":
|
||||
@@ -1400,6 +1596,10 @@ def _remote_fork_child(port: int, queue) -> None:
|
||||
queue.put(db.table_names())
|
||||
|
||||
|
||||
def _remote_table_fork_child(table, queue) -> None:
|
||||
queue.put(table.count_rows())
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
@@ -1462,3 +1662,65 @@ def test_remote_connection_after_fork():
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_inherited_remote_table_reopens_after_fork():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"7")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
|
||||
port = server.server_address[1]
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
assert table.count_rows() == 7
|
||||
|
||||
ctx = mp.get_context("fork")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(target=_remote_table_fork_child, args=(table, queue))
|
||||
proc.start()
|
||||
proc.join(timeout=15)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail("Remote table hung after fork")
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no result"
|
||||
assert queue.get() == 7
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
@@ -903,6 +903,346 @@ async def test_async_tags(mem_db_async: AsyncConnection):
|
||||
)
|
||||
|
||||
|
||||
def test_branches(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(0))
|
||||
table = db.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
|
||||
],
|
||||
)
|
||||
assert table.count_rows() == 2
|
||||
|
||||
# fork an isolated, writable branch from main
|
||||
branch = table.branches.create("exp")
|
||||
assert branch.count_rows() == 2
|
||||
branch.add(data=[{"vector": [10.0, 11.0], "item": "baz", "price": 30.0}])
|
||||
|
||||
# writes on the branch do not touch main
|
||||
assert branch.count_rows() == 3
|
||||
assert table.count_rows() == 2
|
||||
|
||||
# the branch is listed, with main (None) as its parent
|
||||
branches = table.branches.list()
|
||||
assert "exp" in branches
|
||||
assert branches["exp"]["parent_branch"] is None
|
||||
|
||||
# from_ref="main" is equivalent to the default
|
||||
table.branches.create("exp2", from_ref="main")
|
||||
assert table.branches.list()["exp2"]["parent_branch"] is None
|
||||
|
||||
# checkout returns a handle scoped to the branch's latest
|
||||
checked_out = table.branches.checkout("exp")
|
||||
assert checked_out.count_rows() == 3
|
||||
|
||||
# delete removes it
|
||||
table.branches.delete("exp")
|
||||
table.branches.delete("exp2")
|
||||
assert "exp" not in table.branches.list()
|
||||
|
||||
|
||||
def test_branch_handle_tracks_concurrent_writes(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(0))
|
||||
table = db.create_table("t", [{"id": 1}])
|
||||
|
||||
# two independent handles on the same branch
|
||||
writer = table.branches.create("exp")
|
||||
reader = db.open_table("t", branch="exp")
|
||||
assert reader.count_rows() == 1
|
||||
|
||||
# a concurrent write on the branch is visible to the other handle
|
||||
writer.add([{"id": 2}])
|
||||
assert reader.count_rows() == 2
|
||||
# main is unaffected
|
||||
assert table.count_rows() == 1
|
||||
|
||||
|
||||
def test_branch_name_validation(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
table = db.create_table("t", [{"id": 1}])
|
||||
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
table.branches.create("")
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
table.branches.checkout("")
|
||||
with pytest.raises(ValueError, match="non-empty"):
|
||||
table.branches.delete("")
|
||||
|
||||
|
||||
def test_branches_preserve_namespace(tmp_path):
|
||||
pytest.importorskip(
|
||||
"lance"
|
||||
) # namespace_path routes through lance's DirectoryNamespace
|
||||
db = lancedb.connect(tmp_path)
|
||||
table = db.create_table("t", [{"id": 1}], namespace_path=["ns1"])
|
||||
assert table.namespace == ["ns1"]
|
||||
|
||||
branch = table.branches.create("exp")
|
||||
assert branch.namespace == ["ns1"]
|
||||
assert branch.id == table.id
|
||||
|
||||
# opening the branch directly also preserves namespace identity
|
||||
opened = db.open_table("t", namespace_path=["ns1"], branch="exp")
|
||||
assert opened.namespace == ["ns1"]
|
||||
|
||||
|
||||
def test_open_table_with_branch(tmp_path):
|
||||
db = lancedb.connect(tmp_path)
|
||||
table = db.create_table("t", [{"i": 1}])
|
||||
table.branches.create("exp").add([{"i": 2}])
|
||||
|
||||
# open_table(branch=...) returns a handle scoped to the branch
|
||||
assert db.open_table("t", branch="exp").count_rows() == 2
|
||||
# opening without branch still tracks main
|
||||
assert db.open_table("t").count_rows() == 1
|
||||
|
||||
|
||||
def test_open_table_with_branch_version(tmp_path):
|
||||
db = lancedb.connect(tmp_path, read_consistency_interval=timedelta(0))
|
||||
|
||||
# main: a single fork-point row
|
||||
t = db.create_table("t", [{"i": 0}])
|
||||
main_v1 = t.version
|
||||
|
||||
# fork "exp", then advance exp AND main independently past the fork so they
|
||||
# diverge while sharing version numbers
|
||||
exp = t.branches.create("exp")
|
||||
exp.add([{"i": 1}]) # exp: {0, 1}
|
||||
exp_v2 = exp.version
|
||||
exp.add([{"i": 2}]) # exp HEAD: {0, 1, 2}
|
||||
t.add([{"i": 100}, {"i": 101}, {"i": 102}]) # main HEAD: {0, 100, 101, 102}
|
||||
assert exp_v2 == t.version, "branch and main must share the version number"
|
||||
|
||||
# open exp at the shared version: the data must be exp's, not main's. count
|
||||
# alone cannot prove this (main@v2 also exists), so assert provenance by
|
||||
# content.
|
||||
pinned = db.open_table("t", branch="exp", version=exp_v2)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert pinned.count_rows("i = 100") == 0 # main's divergent rows are invisible
|
||||
|
||||
# the same coordinate is reachable directly via branches.checkout(name, version)
|
||||
pinned_direct = t.branches.checkout("exp", exp_v2)
|
||||
assert pinned_direct.current_branch() == "exp"
|
||||
assert pinned_direct.count_rows() == 2
|
||||
|
||||
# the HEADs are unaffected
|
||||
assert db.open_table("t", branch="exp").count_rows() == 3
|
||||
assert db.open_table("t").count_rows() == 4
|
||||
|
||||
# version-only (no branch) time-travels main itself: its fork-point version
|
||||
# holds only main's first row, and the shared version number resolves to
|
||||
# main's data, not the branch's ("opens main at the version")
|
||||
old_main = db.open_table("t", version=main_v1)
|
||||
assert old_main.current_branch() is None
|
||||
assert old_main.count_rows() == 1
|
||||
shared_on_main = db.open_table("t", version=exp_v2)
|
||||
assert shared_on_main.current_branch() is None
|
||||
assert shared_on_main.count_rows() == 4
|
||||
|
||||
# detached head: writing to a pinned version is rejected
|
||||
with pytest.raises((ValueError, RuntimeError), match="cannot be modified"):
|
||||
pinned.add([{"i": 9}])
|
||||
|
||||
# a nonexistent version is rejected -- on main, and on a branch (a distinct
|
||||
# resolution path, on the branch's manifests)
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
db.open_table("t", version=9999)
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
db.open_table("t", branch="exp", version=9999)
|
||||
|
||||
# checkout_latest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
# (writable again), not main's HEAD, and not staying pinned
|
||||
pinned.checkout_latest()
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert pinned.count_rows() == 3 # exp HEAD, not main's 4
|
||||
pinned.add([{"i": 3}])
|
||||
assert pinned.count_rows() == 4 # writable again
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace_async("dir", {"root": str(tmp_path)})
|
||||
await db.create_namespace(["ns1"])
|
||||
table = await db.create_table("t", [{"id": 1}], namespace_path=["ns1"])
|
||||
branch = await table.branches.create("exp")
|
||||
await branch.add([{"id": 2}])
|
||||
|
||||
# open_table(branch=...) on the async namespace connection must work
|
||||
opened = await db.open_table("t", namespace_path=["ns1"], branch="exp")
|
||||
assert await opened.count_rows() == 2
|
||||
|
||||
|
||||
def test_namespace_open_table_with_branch_version(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace("dir", {"root": str(tmp_path)})
|
||||
db.create_namespace(["ns1"])
|
||||
t = db.create_table("t", [{"i": 0}], namespace_path=["ns1"])
|
||||
|
||||
# fork "exp", then advance exp AND main past the fork so they diverge while
|
||||
# sharing version numbers
|
||||
exp = t.branches.create("exp")
|
||||
exp.add([{"i": 1}])
|
||||
exp_v2 = exp.version
|
||||
exp.add([{"i": 2}])
|
||||
t.add([{"i": 100}, {"i": 101}, {"i": 102}])
|
||||
assert exp_v2 == t.version, "branch and main must share the version number"
|
||||
|
||||
# open_table(branch=, version=) on the namespace connection reads the
|
||||
# branch's data at that version, not main's
|
||||
pinned = db.open_table("t", namespace_path=["ns1"], branch="exp", version=exp_v2)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert pinned.count_rows("i = 100") == 0 # main's divergent rows are invisible
|
||||
assert db.open_table("t", namespace_path=["ns1"], branch="exp").count_rows() == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_namespace_open_table_with_branch_version(tmp_path):
|
||||
pytest.importorskip("lance") # "dir" impl is lance.namespace.DirectoryNamespace
|
||||
db = lancedb.connect_namespace_async("dir", {"root": str(tmp_path)})
|
||||
await db.create_namespace(["ns1"])
|
||||
t = await db.create_table("t", [{"i": 0}], namespace_path=["ns1"])
|
||||
|
||||
# fork "exp", then advance exp AND main past the fork so they diverge while
|
||||
# sharing version numbers
|
||||
exp = await t.branches.create("exp")
|
||||
await exp.add([{"i": 1}])
|
||||
exp_v2 = await exp.version()
|
||||
await exp.add([{"i": 2}])
|
||||
await t.add([{"i": 100}, {"i": 101}, {"i": 102}])
|
||||
assert exp_v2 == await t.version(), "branch and main must share the version number"
|
||||
|
||||
# open_table(branch=, version=) on the async namespace connection reads the
|
||||
# branch's data at that version, not main's
|
||||
pinned = await db.open_table(
|
||||
"t", namespace_path=["ns1"], branch="exp", version=exp_v2
|
||||
)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert await pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert await pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert await pinned.count_rows("i = 100") == 0 # main's rows are invisible
|
||||
assert (
|
||||
await (
|
||||
await db.open_table("t", namespace_path=["ns1"], branch="exp")
|
||||
).count_rows()
|
||||
== 3
|
||||
)
|
||||
|
||||
|
||||
def test_branch_to_lance_targets_branch(tmp_path):
|
||||
pytest.importorskip("lance")
|
||||
db = lancedb.connect(tmp_path)
|
||||
table = db.create_table("t", [{"i": 1}])
|
||||
branch = table.branches.create("exp")
|
||||
branch.add([{"i": 2}]) # branch: 2 rows, main: 1 row
|
||||
|
||||
assert branch.to_lance().count_rows() == 2
|
||||
assert table.to_lance().count_rows() == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_branches(tmp_path):
|
||||
db = await lancedb.connect_async(tmp_path)
|
||||
table = await db.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
|
||||
],
|
||||
)
|
||||
assert await table.count_rows() == 2
|
||||
|
||||
branch = await table.branches.create("exp")
|
||||
assert await branch.count_rows() == 2
|
||||
await branch.add(data=[{"vector": [10.0, 11.0], "item": "baz", "price": 30.0}])
|
||||
|
||||
assert await branch.count_rows() == 3
|
||||
assert await table.count_rows() == 2
|
||||
|
||||
branches = await table.branches.list()
|
||||
assert "exp" in branches
|
||||
assert branches["exp"]["parent_branch"] is None
|
||||
|
||||
await table.branches.create("exp2", from_ref="main")
|
||||
assert (await table.branches.list())["exp2"]["parent_branch"] is None
|
||||
|
||||
checked_out = await table.branches.checkout("exp")
|
||||
assert await checked_out.count_rows() == 3
|
||||
|
||||
await table.branches.delete("exp")
|
||||
await table.branches.delete("exp2")
|
||||
assert "exp" not in await table.branches.list()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_open_table_with_branch_version(tmp_path):
|
||||
db = await lancedb.connect_async(tmp_path, read_consistency_interval=timedelta(0))
|
||||
|
||||
# main: a single fork-point row
|
||||
t = await db.create_table("t", [{"i": 0}])
|
||||
main_v1 = await t.version()
|
||||
|
||||
# fork "exp", then advance exp AND main independently past the fork so they
|
||||
# diverge while sharing version numbers
|
||||
exp = await t.branches.create("exp")
|
||||
await exp.add([{"i": 1}]) # exp: {0, 1}
|
||||
exp_v2 = await exp.version()
|
||||
await exp.add([{"i": 2}]) # exp HEAD: {0, 1, 2}
|
||||
await t.add([{"i": 100}, {"i": 101}, {"i": 102}]) # main HEAD: {0, 100, 101, 102}
|
||||
assert exp_v2 == await t.version(), "branch and main must share the version number"
|
||||
|
||||
# open exp at the shared version: the data must be exp's, not main's. count
|
||||
# alone cannot prove this (main@v2 also exists), so assert provenance by
|
||||
# content.
|
||||
pinned = await db.open_table("t", branch="exp", version=exp_v2)
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert await pinned.count_rows() == 2 # not exp HEAD (3), not main@v2 (4)
|
||||
assert await pinned.count_rows("i = 1") == 1 # exp's post-fork row is visible
|
||||
assert await pinned.count_rows("i = 100") == 0 # main's rows are invisible
|
||||
|
||||
# the same coordinate is reachable directly via branches.checkout(name, version)
|
||||
pinned_direct = await t.branches.checkout("exp", exp_v2)
|
||||
assert pinned_direct.current_branch() == "exp"
|
||||
assert await pinned_direct.count_rows() == 2
|
||||
|
||||
# the HEADs are unaffected
|
||||
assert await (await db.open_table("t", branch="exp")).count_rows() == 3
|
||||
assert await (await db.open_table("t")).count_rows() == 4
|
||||
|
||||
# version-only (no branch) time-travels main itself: its fork-point version
|
||||
# holds only main's first row, and the shared version number resolves to
|
||||
# main's data, not the branch's ("opens main at the version")
|
||||
old_main = await db.open_table("t", version=main_v1)
|
||||
assert old_main.current_branch() is None
|
||||
assert await old_main.count_rows() == 1
|
||||
shared_on_main = await db.open_table("t", version=exp_v2)
|
||||
assert shared_on_main.current_branch() is None
|
||||
assert await shared_on_main.count_rows() == 4
|
||||
|
||||
# detached head: writing to a pinned version is rejected
|
||||
with pytest.raises((ValueError, RuntimeError), match="cannot be modified"):
|
||||
await pinned.add([{"i": 9}])
|
||||
|
||||
# a nonexistent version is rejected -- on main, and on a branch
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
await db.open_table("t", version=9999)
|
||||
with pytest.raises((ValueError, RuntimeError)):
|
||||
await db.open_table("t", branch="exp", version=9999)
|
||||
|
||||
# checkout_latest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
# (writable again), not main's HEAD, and not staying pinned
|
||||
await pinned.checkout_latest()
|
||||
assert pinned.current_branch() == "exp"
|
||||
assert await pinned.count_rows() == 3 # exp HEAD, not main's 4
|
||||
await pinned.add([{"i": 3}])
|
||||
assert await pinned.count_rows() == 4 # writable again
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
@@ -2472,6 +2812,30 @@ def test_alter_columns(mem_db: DBConnection):
|
||||
assert table.to_arrow().column_names == ["new_id"]
|
||||
|
||||
|
||||
def test_update_field_metadata(mem_db: DBConnection):
|
||||
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
|
||||
table = mem_db.create_table("my_table", data=data)
|
||||
|
||||
res = table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"unit": "label", "pii": "false"}}
|
||||
)
|
||||
assert res.version == 2
|
||||
# Arrow field metadata is bytes-keyed
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"pii": b"false",
|
||||
}
|
||||
|
||||
# merge: add a key, delete one via None, keep the rest
|
||||
table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"source": "import", "pii": None}}
|
||||
)
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"source": b"import",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alter_columns_async(mem_db_async: AsyncConnection):
|
||||
data = pa.table({"id": [0, 1]})
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
@@ -15,6 +20,107 @@ from lancedb.util import tbl_to_tensor
|
||||
torch = pytest.importorskip("torch")
|
||||
|
||||
|
||||
REMOTE_ROWS = list(range(100))
|
||||
|
||||
|
||||
def _make_mock_http_handler(handler):
|
||||
class MockLanceDBHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
handler(self)
|
||||
|
||||
def do_POST(self):
|
||||
handler(self)
|
||||
|
||||
return MockLanceDBHandler
|
||||
|
||||
|
||||
def _remote_schema_payload():
|
||||
return {
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "a", "type": {"type": "int64"}, "nullable": False},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _offsets_from_filter(filter_sql: str | None) -> list[int]:
|
||||
if filter_sql is None:
|
||||
return REMOTE_ROWS
|
||||
match = re.search(r"_rowoffset in \((.*?)\)", filter_sql)
|
||||
if match is None:
|
||||
return REMOTE_ROWS
|
||||
raw_offsets = match.group(1).strip()
|
||||
if raw_offsets == "":
|
||||
return []
|
||||
return [int(offset.strip()) for offset in raw_offsets.split(",")]
|
||||
|
||||
|
||||
def _remote_dataset_handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(json.dumps(_remote_schema_payload()).encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(str(len(REMOTE_ROWS)).encode())
|
||||
elif request.path == "/v1/table/test/query/":
|
||||
content_len = int(request.headers.get("Content-Length"))
|
||||
body = json.loads(request.rfile.read(content_len))
|
||||
offsets = _offsets_from_filter(body.get("filter"))
|
||||
requested_columns = body.get("columns") or ["a"]
|
||||
if isinstance(requested_columns, dict):
|
||||
requested_columns = list(requested_columns)
|
||||
|
||||
data = {}
|
||||
for column in requested_columns:
|
||||
if column == "a":
|
||||
data[column] = [REMOTE_ROWS[offset] for offset in offsets]
|
||||
elif column == "_rowoffset":
|
||||
data[column] = offsets
|
||||
elif column == "_rowid":
|
||||
data[column] = offsets
|
||||
|
||||
table = pa.table(data)
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
|
||||
request.end_headers()
|
||||
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _remote_dataset_table():
|
||||
with http.server.ThreadingHTTPServer(
|
||||
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
yield db.open_table("test")
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
def _open_native_table(uri: str, table_name: str):
|
||||
"""Top-level connection factory used by the explicit-factory pickle test.
|
||||
|
||||
@@ -107,6 +213,39 @@ def test_permutation_dataloader_multiprocessing(tmp_db):
|
||||
assert seen == 1000
|
||||
|
||||
|
||||
def test_remote_table_dataloader_multiprocessing():
|
||||
with _remote_dataset_table() as table:
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
table,
|
||||
collate_fn=tbl_to_tensor,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch.size(0) == 1
|
||||
assert batch.size(1) == 10
|
||||
seen += batch.size(1)
|
||||
assert seen == len(REMOTE_ROWS)
|
||||
|
||||
|
||||
def test_remote_permutation_dataloader_multiprocessing():
|
||||
with _remote_dataset_table() as table:
|
||||
permutation = Permutation.identity(table)
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
seen += batch["a"].size(0)
|
||||
assert seen == len(REMOTE_ROWS)
|
||||
|
||||
|
||||
def test_permutation_pickle_with_connection_factory(tmp_path):
|
||||
"""When the user provides a connection_factory, pickling should round-trip
|
||||
through that factory rather than introspecting the connection URI. Useful
|
||||
@@ -171,6 +310,35 @@ def _multiworker_dataloader_target(db_uri: str, result_queue):
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
def _remote_multiworker_dataloader_target(port: int, result_queue):
|
||||
import lancedb
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="fork",
|
||||
)
|
||||
count = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
count += 1
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
@@ -208,3 +376,46 @@ def test_permutation_dataloader_fork_workers(tmp_path):
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 100
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_remote_permutation_dataloader_fork_workers():
|
||||
with http.server.ThreadingHTTPServer(
|
||||
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
ctx = mp.get_context("spawn")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(
|
||||
target=_remote_multiworker_dataloader_target,
|
||||
args=(port, queue),
|
||||
)
|
||||
proc.start()
|
||||
proc.join(timeout=30)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail(
|
||||
"Remote permutation hung when iterated in a fork-based "
|
||||
"DataLoader worker"
|
||||
)
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 10
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
@@ -16,7 +16,7 @@ use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateResult,
|
||||
MergeResult, Table, UpdateFieldMetadataResult, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -50,6 +50,7 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<RecordBatchStream>()?;
|
||||
m.add_class::<AddColumnsResult>()?;
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<UpdateFieldMetadataResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
|
||||
@@ -16,12 +16,12 @@ use arrow::{
|
||||
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
|
||||
};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
|
||||
Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Ref, Table as LanceDbTable,
|
||||
};
|
||||
use pyo3::{
|
||||
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
||||
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
|
||||
exceptions::{PyRuntimeError, PyValueError},
|
||||
pyclass, pymethods,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
|
||||
};
|
||||
@@ -357,6 +357,27 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl UpdateFieldMetadataResult {
|
||||
pub fn __repr__(&self) -> String {
|
||||
format!("UpdateFieldMetadataResult(version={})", self.version)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(result: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: result.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DropColumnsResult {
|
||||
@@ -843,6 +864,15 @@ impl Table {
|
||||
Ok(Tags::new(self.inner_ref()?.clone()))
|
||||
}
|
||||
|
||||
pub fn current_branch(&self) -> PyResult<Option<String>> {
|
||||
Ok(self.inner_ref()?.current_branch())
|
||||
}
|
||||
|
||||
#[getter]
|
||||
pub fn branches(&self) -> PyResult<Branches> {
|
||||
Ok(Branches::new(self.inner_ref()?.clone()))
|
||||
}
|
||||
|
||||
#[pyo3(signature = (offsets))]
|
||||
pub fn take_offsets(self_: PyRef<'_, Self>, offsets: Vec<u64>) -> PyResult<TakeQuery> {
|
||||
Ok(TakeQuery::new(
|
||||
@@ -1102,31 +1132,57 @@ impl Table {
|
||||
field_name: String,
|
||||
metadata: &Bound<'_, PyDict>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let mut new_metadata = HashMap::<String, String>::new();
|
||||
for (column_name, value) in metadata.into_iter() {
|
||||
let key: String = column_name.extract()?;
|
||||
let value: String = value.extract()?;
|
||||
new_metadata.insert(key, value);
|
||||
// Deprecated: forwards to the update_field_metadata path (replace mode).
|
||||
let mut update = FieldMetadataUpdate::new(field_name).replace();
|
||||
for (key, value) in metadata.into_iter() {
|
||||
update = update.set(key.extract::<String>()?, value.extract::<String>()?);
|
||||
}
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let native_tbl = inner
|
||||
.as_native()
|
||||
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
|
||||
let schema = native_tbl.manifest().await.infer_error()?.schema;
|
||||
let field = schema
|
||||
.field(&field_name)
|
||||
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
|
||||
|
||||
native_tbl
|
||||
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
|
||||
.await
|
||||
.infer_error()?;
|
||||
|
||||
inner.update_field_metadata(&[update]).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_field_metadata<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
updates: Vec<Bound<PyDict>>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let updates = updates
|
||||
.iter()
|
||||
.map(|update| {
|
||||
let path: String = update
|
||||
.get_item("path")?
|
||||
.ok_or_else(|| PyValueError::new_err("Missing path"))?
|
||||
.extract()?;
|
||||
let mut field_update = FieldMetadataUpdate::new(path);
|
||||
if let Some(metadata) = update.get_item("metadata")? {
|
||||
let metadata_dict = metadata.cast::<PyDict>()?;
|
||||
for (key, value) in metadata_dict.iter() {
|
||||
let key: String = key.extract()?;
|
||||
if value.is_none() {
|
||||
field_update = field_update.remove(key);
|
||||
} else {
|
||||
field_update = field_update.set(key, value.extract::<String>()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(replace) = update.get_item("replace")?
|
||||
&& replace.extract::<bool>()?
|
||||
{
|
||||
field_update = field_update.replace();
|
||||
}
|
||||
Ok(field_update)
|
||||
})
|
||||
.collect::<PyResult<Vec<_>>>()?;
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let result = inner.update_field_metadata(&updates).await.infer_error()?;
|
||||
Ok(UpdateFieldMetadataResult::from(result))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
@@ -1218,3 +1274,71 @@ impl Tags {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
pub struct Branches {
|
||||
inner: LanceDbTable,
|
||||
}
|
||||
|
||||
impl Branches {
|
||||
pub fn new(table: LanceDbTable) -> Self {
|
||||
Self { inner: table }
|
||||
}
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl Branches {
|
||||
pub fn list(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let res = inner.list_branches().await.infer_error()?;
|
||||
Python::attach(|py| {
|
||||
let py_dict = PyDict::new(py);
|
||||
for (name, contents) in res {
|
||||
let value = PyDict::new(py);
|
||||
value.set_item("parent_branch", contents.parent_branch)?;
|
||||
value.set_item("parent_version", contents.parent_version)?;
|
||||
value.set_item("manifest_size", contents.manifest_size)?;
|
||||
py_dict.set_item(name, value)?;
|
||||
}
|
||||
Ok(py_dict.unbind())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (name, from_ref=None, from_version=None))]
|
||||
pub fn create(
|
||||
self_: PyRef<'_, Self>,
|
||||
name: String,
|
||||
from_ref: Option<String>,
|
||||
from_version: Option<u64>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let from = Ref::Version(from_ref, from_version);
|
||||
let table = inner.create_branch(&name, from).await.infer_error()?;
|
||||
Ok(Table::new(table))
|
||||
})
|
||||
}
|
||||
|
||||
#[pyo3(signature = (name, version=None))]
|
||||
pub fn checkout(
|
||||
self_: PyRef<'_, Self>,
|
||||
name: String,
|
||||
version: Option<u64>,
|
||||
) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let table = inner.checkout_branch(&name, version).await.infer_error()?;
|
||||
Ok(Table::new(table))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn delete(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.delete_branch(&name).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
||||
use arrow_array::RecordBatch;
|
||||
use arrow_schema::SchemaRef;
|
||||
use lance::dataset::ReadParams;
|
||||
use lance::dataset::refs::MAIN_BRANCH;
|
||||
use lance_namespace::models::{
|
||||
CreateNamespaceRequest, CreateNamespaceResponse, DescribeNamespaceRequest,
|
||||
DescribeNamespaceResponse, DropNamespaceRequest, DropNamespaceResponse, ListNamespacesRequest,
|
||||
@@ -119,6 +120,8 @@ pub struct OpenTableBuilder {
|
||||
parent: Arc<dyn Database>,
|
||||
request: OpenTableRequest,
|
||||
embedding_registry: Arc<dyn EmbeddingRegistry>,
|
||||
branch: Option<String>,
|
||||
version: Option<u64>,
|
||||
}
|
||||
|
||||
impl OpenTableBuilder {
|
||||
@@ -139,6 +142,8 @@ impl OpenTableBuilder {
|
||||
managed_versioning: None,
|
||||
},
|
||||
embedding_registry,
|
||||
branch: None,
|
||||
version: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,14 +264,48 @@ impl OpenTableBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table scoped to the given branch instead of the default branch.
|
||||
///
|
||||
/// Reads and writes on the returned table operate in the branch's context.
|
||||
pub fn branch(mut self, branch: impl Into<String>) -> Self {
|
||||
self.branch = Some(branch.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table pinned to a specific version, producing a read-only "view".
|
||||
///
|
||||
/// Composes with [`Self::branch`]: when a branch is also set, this opens that
|
||||
/// branch at the given version; otherwise it opens `main` at that version.
|
||||
/// The returned table is a detached head, so operations that modify the table
|
||||
/// will fail until [`Table::checkout_latest`] is called.
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::Connection;
|
||||
/// # async fn f(conn: &Connection) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let table = conn.open_table("t").branch("exp").version(3).execute().await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn version(mut self, version: u64) -> Self {
|
||||
self.version = Some(version);
|
||||
self
|
||||
}
|
||||
|
||||
/// Open the table
|
||||
pub async fn execute(self) -> Result<Table> {
|
||||
let table = self.parent.open_table(self.request).await?;
|
||||
Ok(Table::new_with_embedding_registry(
|
||||
table,
|
||||
self.parent,
|
||||
self.embedding_registry,
|
||||
))
|
||||
let table = Table::new_with_embedding_registry(table, self.parent, self.embedding_registry);
|
||||
// "main" is the default branch, so treat it as no branch.
|
||||
let branch = self.branch.filter(|b| b.as_str() != MAIN_BRANCH);
|
||||
match branch {
|
||||
Some(branch) => table.checkout_branch(&branch, self.version).await,
|
||||
None => {
|
||||
if let Some(version) = self.version {
|
||||
table.checkout(version).await?;
|
||||
}
|
||||
Ok(table)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -740,6 +740,64 @@ mod tests {
|
||||
assert!(table_names.contains(&"test_table".to_string()));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_branch_query_under_pushdown_stays_local() {
|
||||
// With QueryTable pushdown enabled, a query on the main branch routes to
|
||||
// the namespace server, but a branch handle must run locally: the
|
||||
// server-side request carries no branch and would return main's rows.
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let root_path = tmp_dir.path().to_str().unwrap().to_string();
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
properties.insert("root".to_string(), root_path);
|
||||
|
||||
let conn = connect_namespace("dir", properties)
|
||||
.pushdown_operation(NamespaceClientPushdownOperation::QueryTable)
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to connect to namespace");
|
||||
|
||||
conn.create_namespace(CreateNamespaceRequest {
|
||||
id: Some(vec!["test_ns".into()]),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("Failed to create namespace");
|
||||
|
||||
// main has 5 rows
|
||||
let table = conn
|
||||
.create_table("ref_test", create_test_data())
|
||||
.namespace(vec!["test_ns".into()])
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to create table");
|
||||
let main_version = table.version().await.unwrap();
|
||||
|
||||
// fork a branch off main, then add 5 more rows so it differs from main
|
||||
let branch = table
|
||||
.create_branch("exp", main_version)
|
||||
.await
|
||||
.expect("Failed to create branch");
|
||||
branch
|
||||
.add(create_test_data())
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to append to branch");
|
||||
|
||||
// the branch query must run locally and see the branch's 10 rows --
|
||||
// not get routed to the server (which carries no branch) and see main's 5
|
||||
let results = branch
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.expect("Failed to query branch")
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.expect("Failed to collect results");
|
||||
let count: usize = results.iter().map(|b| b.num_rows()).sum();
|
||||
assert_eq!(count, 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_describe_table() {
|
||||
// Setup: Create a temporary directory for the namespace
|
||||
|
||||
@@ -983,6 +983,49 @@ mod tests {
|
||||
assert_eq!(table.name(), "table1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table_branch_and_version() {
|
||||
// Remote supports version time-travel but not branches. A version-only
|
||||
// open (or one on the default "main" branch) must succeed; a non-main
|
||||
// branch must be rejected, with or without a version.
|
||||
let conn = Connection::new_with_handler(|request| {
|
||||
assert_eq!(request.url().path(), "/v1/table/t/describe/");
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(
|
||||
r#"{"table": "t", "version": 2, "schema": {"fields": [
|
||||
{"name": "a", "type": { "type": "int32" }, "nullable": false}
|
||||
]}}"#,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
// version-only: allowed (open + checkout(version) both round-trip)
|
||||
conn.open_table("t").version(2).execute().await.unwrap();
|
||||
|
||||
// "main" is the default branch, so it counts as no branch
|
||||
conn.open_table("t")
|
||||
.branch("main")
|
||||
.version(2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// a non-main branch is rejected, with or without a version
|
||||
assert!(matches!(
|
||||
conn.open_table("t").branch("exp").execute().await,
|
||||
Err(Error::NotSupported { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
conn.open_table("t")
|
||||
.branch("exp")
|
||||
.version(2)
|
||||
.execute()
|
||||
.await,
|
||||
Err(Error::NotSupported { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_open_table_not_found() {
|
||||
let conn = Connection::new_with_handler(|_| {
|
||||
|
||||
@@ -18,13 +18,13 @@ use crate::index::waiter::wait_for_index;
|
||||
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
|
||||
use crate::table::AddColumnsResult;
|
||||
use crate::table::AddResult;
|
||||
use crate::table::AlterColumnsResult;
|
||||
use crate::table::DeleteResult;
|
||||
use crate::table::DropColumnsResult;
|
||||
use crate::table::MergeResult;
|
||||
use crate::table::Tags;
|
||||
use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AlterColumnsResult, FieldMetadataUpdate, UpdateFieldMetadataResult};
|
||||
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{
|
||||
@@ -1383,6 +1383,38 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
.map_err(unwrap_shared_error)
|
||||
}
|
||||
|
||||
async fn create_branch(
|
||||
&self,
|
||||
_name: &str,
|
||||
_from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn checkout_branch(&self, _name: &str) -> Result<Arc<dyn BaseTable>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, lance::dataset::refs::BranchContents>> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, _name: &str) -> Result<()> {
|
||||
Err(Error::NotSupported {
|
||||
message: "branching is not yet supported on remote tables".into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn current_branch(&self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
|
||||
@@ -1968,6 +2000,35 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "updates": updates });
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/update_field_metadata/",
|
||||
self.identifier
|
||||
))
|
||||
.json(&body);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let result: UpdateFieldMetadataResult =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse update_field_metadata response: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "columns": columns });
|
||||
@@ -2261,6 +2322,7 @@ mod tests {
|
||||
|
||||
use crate::remote::client::{ClientConfig, RetryConfig};
|
||||
use crate::table::AddDataMode;
|
||||
use crate::table::FieldMetadataUpdate;
|
||||
|
||||
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch};
|
||||
@@ -6460,4 +6522,25 @@ mod tests {
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/update_field_metadata/"
|
||||
);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 7, "fields": {"category": {"unit": "label"}}}"#)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let result = table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category").set("unit", "label")])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.version, 7);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,12 +86,15 @@ pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
|
||||
pub use chrono::Duration;
|
||||
pub use delete::DeleteResult;
|
||||
use futures::future::join_all;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::refs::{BranchContents, Ref, TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
|
||||
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
|
||||
pub use schema_evolution::{
|
||||
AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate,
|
||||
UpdateFieldMetadataResult,
|
||||
};
|
||||
use serde_with::skip_serializing_none;
|
||||
pub use update::{UpdateBuilder, UpdateResult};
|
||||
|
||||
@@ -622,6 +625,37 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
async fn restore(&self) -> Result<()>;
|
||||
/// List the versions of the table.
|
||||
async fn list_versions(&self) -> Result<Vec<Version>>;
|
||||
/// Create a new branch from `from` and return a handle scoped to it.
|
||||
async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>>;
|
||||
/// Check out an existing branch at an optional version, returning a handle.
|
||||
///
|
||||
/// `None` tracks the branch's latest; `Some(v)` pins it to that version
|
||||
/// (read-only). The default implementation composes [`Self::checkout_branch`]
|
||||
/// and [`Self::checkout`]; implementations may override it to resolve the
|
||||
/// `(branch, version)` coordinate in a single manifest read.
|
||||
async fn checkout_branch_version(
|
||||
&self,
|
||||
name: &str,
|
||||
version: Option<u64>,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
let branch = self.checkout_branch(name).await?;
|
||||
if let Some(version) = version {
|
||||
branch.checkout(version).await?;
|
||||
}
|
||||
Ok(branch)
|
||||
}
|
||||
/// List the branches of the table.
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>>;
|
||||
/// Delete a branch.
|
||||
async fn delete_branch(&self, name: &str) -> Result<()>;
|
||||
/// The branch this handle is scoped to, or `None` for `main`.
|
||||
fn current_branch(&self) -> Option<String>;
|
||||
/// Get the table definition.
|
||||
async fn table_definition(&self) -> Result<TableDefinition>;
|
||||
/// Get the table URI (storage location)
|
||||
@@ -660,6 +694,19 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
message: "create_insert_exec not implemented".to_string(),
|
||||
})
|
||||
}
|
||||
/// Update per-field metadata. Merges into existing metadata by default;
|
||||
/// [`FieldMetadataUpdate::remove`] deletes a key and
|
||||
/// [`FieldMetadataUpdate::replace`] swaps the field's whole map.
|
||||
///
|
||||
/// The default returns `NotSupported`; Lance-backed and remote tables override it.
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
_updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
Err(Error::NotSupported {
|
||||
message: "update_field_metadata is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
@@ -1340,6 +1387,14 @@ impl Table {
|
||||
self.inner.alter_columns(alterations).await
|
||||
}
|
||||
|
||||
/// Update per-field metadata (merges by default).
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.inner.update_field_metadata(updates).await
|
||||
}
|
||||
|
||||
/// Remove columns from the table.
|
||||
pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.inner.drop_columns(columns).await
|
||||
@@ -1601,6 +1656,57 @@ impl Table {
|
||||
self.inner.tags().await
|
||||
}
|
||||
|
||||
/// Create a new branch from `from` (a version, tag, or branch)
|
||||
pub async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: impl Into<lance::dataset::refs::Ref>,
|
||||
) -> Result<Self> {
|
||||
let inner = self.inner.create_branch(name, from.into()).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
embedding_registry: self.embedding_registry.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Check out an existing branch and return a handle scoped to it.
|
||||
///
|
||||
/// With `version` set, the returned handle is pinned to that version of the
|
||||
/// branch: a read-only, detached view (as with [`Self::checkout`]). With
|
||||
/// `version` as `None` it tracks the branch's latest and stays writable.
|
||||
///
|
||||
/// ```
|
||||
/// # use lancedb::Table;
|
||||
/// # async fn f(table: &Table) -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let exp_at_v3 = table.checkout_branch("exp", Some(3)).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn checkout_branch(&self, name: &str, version: Option<u64>) -> Result<Self> {
|
||||
let inner = self.inner.checkout_branch_version(name, version).await?;
|
||||
Ok(Self {
|
||||
inner,
|
||||
database: self.database.clone(),
|
||||
embedding_registry: self.embedding_registry.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// List the branches of the table.
|
||||
pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
self.inner.list_branches().await
|
||||
}
|
||||
|
||||
/// Delete a branch.
|
||||
pub async fn delete_branch(&self, name: &str) -> Result<()> {
|
||||
self.inner.delete_branch(name).await
|
||||
}
|
||||
|
||||
/// The branch this handle is scoped to, or `None` for `main`.
|
||||
pub fn current_branch(&self) -> Option<String> {
|
||||
self.inner.current_branch()
|
||||
}
|
||||
|
||||
/// Retrieve statistics on the table
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
self.inner.stats().await
|
||||
@@ -1837,6 +1943,30 @@ impl NativeTable {
|
||||
self
|
||||
}
|
||||
|
||||
/// Build a sibling `NativeTable` with the same identity but a different
|
||||
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
|
||||
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
|
||||
Self {
|
||||
name: self.name.clone(),
|
||||
namespace: self.namespace.clone(),
|
||||
id: self.id.clone(),
|
||||
uri: self.uri.clone(),
|
||||
dataset,
|
||||
read_consistency_interval: self.read_consistency_interval,
|
||||
namespace_client: self.namespace_client.clone(),
|
||||
pushdown_operations: self.pushdown_operations.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_branch_name(name: &str, field: &str) -> Result<()> {
|
||||
if name.is_empty() {
|
||||
return Err(Error::InvalidInput {
|
||||
message: format!("{field} must be a non-empty string"),
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Opens an existing Table using a namespace client.
|
||||
///
|
||||
/// This method uses `DatasetBuilder::from_namespace` to open the table, which
|
||||
@@ -2580,6 +2710,7 @@ impl NativeTable {
|
||||
/// field id and the second element is a hashmap of metadata key-value
|
||||
/// pairs.
|
||||
///
|
||||
#[deprecated(since = "0.33.1", note = "Use `update_field_metadata` instead")]
|
||||
pub async fn replace_field_metadata(
|
||||
&self,
|
||||
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
|
||||
@@ -2627,6 +2758,72 @@ impl BaseTable for NativeTable {
|
||||
self.dataset.reload().await
|
||||
}
|
||||
|
||||
async fn create_branch(
|
||||
&self,
|
||||
name: &str,
|
||||
from: lance::dataset::refs::Ref,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
if let lance::dataset::refs::Ref::Version(Some(from_branch), _) = &from {
|
||||
Self::validate_branch_name(from_branch, "from_ref")?;
|
||||
}
|
||||
let mut ds = (*self.dataset.get().await?).clone();
|
||||
let branch_ds = ds.create_branch(name, from, None).await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>> {
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
let branch_ds = self.dataset.get().await?.checkout_branch(name).await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn checkout_branch_version(
|
||||
&self,
|
||||
name: &str,
|
||||
version: Option<u64>,
|
||||
) -> Result<Arc<dyn BaseTable>> {
|
||||
let Some(version) = version else {
|
||||
return self.checkout_branch(name).await;
|
||||
};
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
// Resolve (branch, version) in a single manifest read.
|
||||
let branch_ds = self
|
||||
.dataset
|
||||
.get()
|
||||
.await?
|
||||
.checkout_version((name, version))
|
||||
.await?;
|
||||
let dataset = dataset::DatasetConsistencyWrapper::new_time_travel(
|
||||
branch_ds,
|
||||
self.read_consistency_interval,
|
||||
);
|
||||
Ok(Arc::new(self.with_dataset(dataset)))
|
||||
}
|
||||
|
||||
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
|
||||
Ok(self.dataset.get().await?.list_branches().await?)
|
||||
}
|
||||
|
||||
async fn delete_branch(&self, name: &str) -> Result<()> {
|
||||
Self::validate_branch_name(name, "branch name")?;
|
||||
let mut ds = (*self.dataset.get().await?).clone();
|
||||
ds.delete_branch(name).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn current_branch(&self) -> Option<String> {
|
||||
self.dataset.current_branch()
|
||||
}
|
||||
|
||||
async fn list_versions(&self) -> Result<Vec<Version>> {
|
||||
Ok(self.dataset.get().await?.versions().await?)
|
||||
}
|
||||
@@ -2886,6 +3083,13 @@ impl BaseTable for NativeTable {
|
||||
schema_evolution::execute_alter_columns(self, alterations).await
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
schema_evolution::execute_update_field_metadata(self, updates).await
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
schema_evolution::execute_drop_columns(self, columns).await
|
||||
}
|
||||
@@ -3136,7 +3340,6 @@ pub struct FragmentSummaryStats {
|
||||
#[cfg(test)]
|
||||
#[allow(deprecated)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
@@ -3347,6 +3550,351 @@ mod tests {
|
||||
assert_eq!(table.version().await.unwrap(), 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branches() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// main: one row at v1
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 1);
|
||||
assert_eq!(table.current_branch(), None);
|
||||
let main_version = table.version().await.unwrap();
|
||||
|
||||
// branch off main's current version; it starts with main's data
|
||||
let branch = table.create_branch("exp", main_version).await.unwrap();
|
||||
assert_eq!(branch.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(branch.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// writes on the branch are isolated from main
|
||||
branch.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(branch.count_rows(None).await.unwrap(), 2);
|
||||
assert_eq!(
|
||||
table.count_rows(None).await.unwrap(),
|
||||
1,
|
||||
"main must be untouched by branch writes"
|
||||
);
|
||||
|
||||
// the branch shows up in the listing
|
||||
let branches = table.list_branches().await.unwrap();
|
||||
assert!(branches.contains_key("exp"));
|
||||
|
||||
// checking out the branch from the main handle sees the branch's latest data
|
||||
let checked_out = table.checkout_branch("exp", None).await.unwrap();
|
||||
assert_eq!(checked_out.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(checked_out.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// open_table(...).branch(...) opens directly onto the branch
|
||||
let opened = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(opened.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(opened.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// delete removes it from the listing
|
||||
table.delete_branch("exp").await.unwrap();
|
||||
let branches = table.list_branches().await.unwrap();
|
||||
assert!(!branches.contains_key("exp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_version_checkout() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// main: a single fork-point row (i = 0)
|
||||
let table = conn
|
||||
.create_table("my_table", sample_rows(vec![0]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let fork_point = table.version().await.unwrap();
|
||||
|
||||
// Fork "exp", then advance exp AND main independently past the fork so
|
||||
// they diverge while sharing version numbers.
|
||||
let branch = table.create_branch("exp", fork_point).await.unwrap();
|
||||
let exp_fork = branch.version().await.unwrap(); // exp's shallow-clone version
|
||||
branch.add(sample_rows(vec![1])).execute().await.unwrap(); // exp: {0, 1}
|
||||
let exp_v2 = branch.version().await.unwrap();
|
||||
branch.add(sample_rows(vec![2])).execute().await.unwrap(); // exp HEAD: {0, 1, 2}
|
||||
|
||||
// main's own commit reaches the SAME version number with different data
|
||||
table
|
||||
.add(sample_rows(vec![100, 101, 102]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap(); // main HEAD: {0, 100, 101, 102}
|
||||
let main_v2 = table.version().await.unwrap();
|
||||
assert_eq!(
|
||||
exp_v2, main_v2,
|
||||
"branch and main must share the version number for this test to mean anything"
|
||||
);
|
||||
|
||||
// Open exp at the shared version. The data must be exp's, not main's:
|
||||
// count alone cannot prove this (main@v2 differs), so assert provenance
|
||||
// by content.
|
||||
let pinned = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.version(exp_v2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(pinned.current_branch().as_deref(), Some("exp"));
|
||||
// isolated from exp's HEAD (3 rows) and from main@v2 (4 rows)
|
||||
assert_eq!(pinned.count_rows(None).await.unwrap(), 2);
|
||||
// exp's post-fork row is visible; main's divergent rows are not
|
||||
assert_eq!(
|
||||
pinned.count_rows(Some("i = 1".to_string())).await.unwrap(),
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
pinned
|
||||
.count_rows(Some("i = 100".to_string()))
|
||||
.await
|
||||
.unwrap(),
|
||||
0
|
||||
);
|
||||
|
||||
// the same coordinate is reachable directly via checkout_branch(name, version)
|
||||
let pinned_direct = table.checkout_branch("exp", Some(exp_v2)).await.unwrap();
|
||||
assert_eq!(pinned_direct.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(pinned_direct.count_rows(None).await.unwrap(), 2);
|
||||
|
||||
// the HEADs are unaffected
|
||||
let head = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(head.count_rows(None).await.unwrap(), 3);
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 4);
|
||||
|
||||
// a pinned version is a detached head: writes are rejected
|
||||
assert!(pinned.add(sample_rows(vec![9])).execute().await.is_err());
|
||||
|
||||
// version-only (no branch) time-travels main itself: its fork-point
|
||||
// version holds only main's first row, and the shared version number
|
||||
// resolves to main's data, not the branch's ("opens main at the version")
|
||||
let old_main = conn
|
||||
.open_table("my_table")
|
||||
.version(fork_point)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(old_main.current_branch(), None);
|
||||
assert_eq!(old_main.count_rows(None).await.unwrap(), 1);
|
||||
let shared_on_main = conn
|
||||
.open_table("my_table")
|
||||
.version(exp_v2)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(shared_on_main.current_branch(), None);
|
||||
assert_eq!(shared_on_main.count_rows(None).await.unwrap(), 4);
|
||||
|
||||
// a nonexistent version is rejected
|
||||
assert!(
|
||||
conn.open_table("my_table")
|
||||
.version(9999)
|
||||
.execute()
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// a nonexistent version on a branch is rejected too: this resolves on
|
||||
// the branch's path, a distinct miss from the main lookup above
|
||||
assert!(
|
||||
conn.open_table("my_table")
|
||||
.branch("exp")
|
||||
.version(9999)
|
||||
.execute()
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
// opening the branch at its fork point (the shallow-clone manifest)
|
||||
// shows just the cloned state: main's fork-point row
|
||||
let exp_at_fork = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.version(exp_fork)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(exp_at_fork.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(exp_at_fork.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// checkout_latest re-attaches the pinned handle to the BRANCH's HEAD
|
||||
// (writable again), not main's HEAD, and not staying pinned
|
||||
pinned.checkout_latest().await.unwrap();
|
||||
assert_eq!(pinned.current_branch().as_deref(), Some("exp"));
|
||||
assert_eq!(pinned.count_rows(None).await.unwrap(), 3); // exp HEAD, not main's 4
|
||||
pinned.add(sample_rows(vec![3])).execute().await.unwrap();
|
||||
assert_eq!(pinned.count_rows(None).await.unwrap(), 4); // writable again
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_version_two_branches() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table("my_table", sample_rows(vec![0]))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let fork_point = table.version().await.unwrap();
|
||||
|
||||
// two branches off the same point, each advanced once so they reach the
|
||||
// SAME version number with divergent data
|
||||
let exp1 = table.create_branch("exp1", fork_point).await.unwrap();
|
||||
let exp2 = table.create_branch("exp2", fork_point).await.unwrap();
|
||||
exp1.add(sample_rows(vec![10])).execute().await.unwrap();
|
||||
exp2.add(sample_rows(vec![20])).execute().await.unwrap();
|
||||
let v1 = exp1.version().await.unwrap();
|
||||
let v2 = exp2.version().await.unwrap();
|
||||
assert_eq!(v1, v2, "both branches must reach the same version number");
|
||||
|
||||
// that shared version number resolves to each branch's own data
|
||||
let at1 = table.checkout_branch("exp1", Some(v1)).await.unwrap();
|
||||
assert_eq!(at1.count_rows(Some("i = 10".to_string())).await.unwrap(), 1);
|
||||
assert_eq!(at1.count_rows(Some("i = 20".to_string())).await.unwrap(), 0);
|
||||
let at2 = table.checkout_branch("exp2", Some(v2)).await.unwrap();
|
||||
assert_eq!(at2.count_rows(Some("i = 20".to_string())).await.unwrap(), 1);
|
||||
assert_eq!(at2.count_rows(Some("i = 10".to_string())).await.unwrap(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_name_validation() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// every entry point rejects an empty name instead of passing it down
|
||||
assert!(matches!(
|
||||
table.create_branch("", 1u64).await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
table.checkout_branch("", None).await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
assert!(matches!(
|
||||
table.delete_branch("").await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
// an empty source branch is rejected too
|
||||
assert!(matches!(
|
||||
table
|
||||
.create_branch(
|
||||
"ok",
|
||||
lance::dataset::refs::Ref::Version(Some(String::new()), None)
|
||||
)
|
||||
.await,
|
||||
Err(Error::InvalidInput { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_handle_tracks_concurrent_writes() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
// interval = 0 so every read checks storage for new commits
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let v1 = table.version().await.unwrap();
|
||||
|
||||
// two independent handles on the same branch
|
||||
let writer = table.create_branch("exp", v1).await.unwrap();
|
||||
let reader = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// a concurrent write on the branch is visible to the other handle, which
|
||||
// tracks the branch's HEAD (not main's)
|
||||
writer.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 2);
|
||||
// main is untouched
|
||||
assert_eq!(table.count_rows(None).await.unwrap(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_branch_handle_without_consistency_interval_is_pinned() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
// default interval (None): handles do not auto-refresh
|
||||
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let v1 = table.version().await.unwrap();
|
||||
|
||||
let writer = table.create_branch("exp", v1).await.unwrap();
|
||||
let reader = conn
|
||||
.open_table("my_table")
|
||||
.branch("exp")
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// without a consistency interval the reader stays on the version it
|
||||
// opened, exactly like a main-branch handle...
|
||||
writer.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 1);
|
||||
|
||||
// ...until it explicitly refreshes
|
||||
reader.checkout_latest().await.unwrap();
|
||||
assert_eq!(reader.count_rows(None).await.unwrap(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index() {
|
||||
use arrow_array::RecordBatch;
|
||||
@@ -3698,6 +4246,19 @@ mod tests {
|
||||
Box::new(RecordBatchIterator::new(vec![batch], schema))
|
||||
}
|
||||
|
||||
/// A single-batch reader holding the given `i` (Int32) values. Lets a test
|
||||
/// write distinguishable rows so it can assert data provenance, not row count.
|
||||
fn sample_rows(values: Vec<i32>) -> Box<dyn arrow_array::RecordBatchReader + Send> {
|
||||
let batch = RecordBatch::try_new(
|
||||
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
|
||||
vec![Arc::new(Int32Array::from(values))],
|
||||
)
|
||||
.unwrap();
|
||||
let schema = batch.schema().clone();
|
||||
|
||||
Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_scalar_index() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
@@ -4449,10 +5010,10 @@ mod tests {
|
||||
Some(&"test_val2_update".to_string())
|
||||
);
|
||||
|
||||
let mut new_field_metadata = HashMap::<String, String>::new();
|
||||
new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
|
||||
native_tbl
|
||||
.replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
|
||||
.update_field_metadata(&[
|
||||
FieldMetadataUpdate::new("i").set("test_field_key1", "test_field_val1")
|
||||
])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -76,6 +76,23 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new wrapper pinned to the dataset's current version.
|
||||
///
|
||||
/// `dataset` must already be checked out at the desired version; this pins
|
||||
/// to `dataset.version()` without re-resolving. The wrapper is read-only
|
||||
/// (time-travel) until [`as_latest`](Self::as_latest) re-attaches it to the
|
||||
/// latest version.
|
||||
pub fn new_time_travel(dataset: Dataset, read_consistency_interval: Option<Duration>) -> Self {
|
||||
let version = dataset.version().version;
|
||||
let wrapper = Self::new_latest(dataset, read_consistency_interval);
|
||||
wrapper
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.pinned_version = Some(version);
|
||||
wrapper
|
||||
}
|
||||
|
||||
/// The MemWAL `ShardWriter` cache co-located with this dataset.
|
||||
pub(crate) fn shard_writer(&self) -> &Arc<ShardWriterCache> {
|
||||
&self.shard_writer
|
||||
@@ -144,8 +161,19 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
|
||||
/// Checkout a branch and track its HEAD for new versions.
|
||||
pub async fn as_branch(&self, _branch: impl Into<String>) -> Result<()> {
|
||||
todo!("Branch support not yet implemented")
|
||||
pub async fn as_branch(&self, branch: impl Into<String>) -> Result<()> {
|
||||
let branch = branch.into();
|
||||
let dataset = { self.state.lock()?.dataset.clone() };
|
||||
let new_dataset = dataset.checkout_branch(&branch).await?;
|
||||
|
||||
let mut state = self.state.lock()?;
|
||||
state.dataset = Arc::new(new_dataset);
|
||||
state.pinned_version = None;
|
||||
drop(state);
|
||||
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
|
||||
bg_cache.invalidate();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that the dataset is in a mutable mode (Latest).
|
||||
@@ -161,6 +189,17 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/// The branch this wrapper is currently tracking, or `None` for `main`.
|
||||
pub fn current_branch(&self) -> Option<String> {
|
||||
self.state
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.dataset
|
||||
.manifest()
|
||||
.branch
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Returns the version, if in time travel mode, or None otherwise.
|
||||
pub fn time_travel_version(&self) -> Option<u64> {
|
||||
self.state
|
||||
@@ -737,4 +776,31 @@ mod tests {
|
||||
let result = wrapper.reload().await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_as_branch_is_writable_and_tracked() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let uri = dir.path().to_str().unwrap();
|
||||
|
||||
// v1 on main, then shallow-clone a branch off it
|
||||
let mut ds = create_test_dataset(uri).await;
|
||||
let v1 = ds.version().version;
|
||||
ds.create_branch("exp", v1, None).await.unwrap();
|
||||
|
||||
// wrapper starts on main: latest, writable, no branch
|
||||
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
|
||||
assert_eq!(wrapper.current_branch(), None);
|
||||
|
||||
// switch to the branch
|
||||
wrapper.as_branch("exp").await.unwrap();
|
||||
assert_eq!(wrapper.current_branch().as_deref(), Some("exp"));
|
||||
|
||||
// a branch is writable (unlike a pinned/time-travel checkout)
|
||||
wrapper.ensure_mutable().unwrap();
|
||||
assert_eq!(wrapper.time_travel_version(), None);
|
||||
|
||||
// get() returns the branch dataset
|
||||
let on_branch = wrapper.get().await.unwrap();
|
||||
assert_eq!(on_branch.manifest().branch.as_deref(), Some("exp"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,11 +41,14 @@ pub async fn execute_query(
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<DatasetRecordBatchStream> {
|
||||
// If QueryTable pushdown is enabled and namespace client is configured, use server-side query execution
|
||||
// QueryTable pushdown runs the query server-side, but only on the main
|
||||
// branch: the namespace request carries no branch yet, so a branch handle
|
||||
// must fall through to local execution.
|
||||
if table
|
||||
.pushdown_operations
|
||||
.contains(&NamespaceClientPushdownOperation::QueryTable)
|
||||
&& let Some(ref namespace_client) = table.namespace_client
|
||||
&& table.dataset.current_branch().is_none()
|
||||
{
|
||||
return execute_namespace_query(table, namespace_client.clone(), query, options).await;
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
use lance::dataset::{ColumnAlteration, NewColumnTransform};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::NativeTable;
|
||||
use crate::Result;
|
||||
@@ -44,6 +45,52 @@ pub struct DropColumnsResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// A single field's metadata update, addressed by dot-path.
|
||||
///
|
||||
/// Merges into the field's existing metadata by default. Use [`Self::remove`] to
|
||||
/// delete a key, or [`Self::replace`] to swap the field's entire metadata map.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`).
|
||||
pub path: String,
|
||||
/// Keys to set (`Some`) or delete (`None`).
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If `true`, replace the field's entire metadata map instead of merging.
|
||||
pub replace: bool,
|
||||
}
|
||||
|
||||
impl FieldMetadataUpdate {
|
||||
pub fn new(path: impl Into<String>) -> Self {
|
||||
Self {
|
||||
path: path.into(),
|
||||
metadata: HashMap::new(),
|
||||
replace: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), Some(value.into()));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn remove(mut self, key: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), None);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn replace(mut self) -> Self {
|
||||
self.replace = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
/// The commit version associated with the operation.
|
||||
#[serde(default)]
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// Internal implementation of the add columns logic.
|
||||
///
|
||||
/// Adds new columns to the table using the provided transforms.
|
||||
@@ -90,6 +137,32 @@ pub(crate) async fn execute_drop_columns(
|
||||
Ok(DropColumnsResult { version })
|
||||
}
|
||||
|
||||
/// Internal implementation of the update field metadata logic.
|
||||
///
|
||||
/// Merges or replaces per-field metadata, addressing fields by dot-path.
|
||||
pub(crate) async fn execute_update_field_metadata(
|
||||
table: &NativeTable,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
|
||||
let mut builder = dataset.update_field_metadata();
|
||||
for update in updates {
|
||||
let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone()));
|
||||
builder = if update.replace {
|
||||
builder.replace(&update.path, entries)?
|
||||
} else {
|
||||
builder.update(&update.path, entries)?
|
||||
};
|
||||
}
|
||||
builder.await?;
|
||||
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(UpdateFieldMetadataResult { version })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_array::{Int32Array, StringArray, record_batch};
|
||||
@@ -97,6 +170,7 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use lance::dataset::ColumnAlteration;
|
||||
|
||||
use super::FieldMetadataUpdate;
|
||||
use crate::connect;
|
||||
use crate::query::{ExecutableQuery, QueryBase, Select};
|
||||
use crate::table::NewColumnTransform;
|
||||
@@ -610,4 +684,46 @@ mod tests {
|
||||
let v4 = table.version().await.unwrap();
|
||||
assert_eq!(drop_result.version, v4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batch = record_batch!(
|
||||
("id", Int32, [1, 2, 3]),
|
||||
("category", Utf8, ["A", "B", "C"])
|
||||
)
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("test_update_field_metadata", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Set metadata on a field.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("unit", "label")
|
||||
.set("pii", "false")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let field = schema.field_with_name("category").unwrap();
|
||||
assert_eq!(
|
||||
field.metadata().get("unit").map(String::as_str),
|
||||
Some("label")
|
||||
);
|
||||
|
||||
// Merge: add a key, delete one, keep the rest.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("source", "import")
|
||||
.remove("pii")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let md = schema.field_with_name("category").unwrap().metadata();
|
||||
assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved
|
||||
assert_eq!(md.get("source").map(String::as_str), Some("import")); // added
|
||||
assert!(!md.contains_key("pii")); // deleted
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user