mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-02 11:50:41 +00:00
Compare commits
6 Commits
feat/depre
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b874905fd | ||
|
|
a327044e2f | ||
|
|
f20ec99dec | ||
|
|
60f961584c | ||
|
|
ac699d7ecf | ||
|
|
968277be79 |
7
.agents/skills/README.md
Normal file
7
.agents/skills/README.md
Normal file
@@ -0,0 +1,7 @@
|
||||
# Agent Skills
|
||||
|
||||
This directory contains repo-scoped code agent skills for the LanceDB project.
|
||||
|
||||
Each skill is a folder that contains a required `SKILL.md` and optional bundled resources.
|
||||
|
||||
Codex discovers skills from `.agents/skills` in the current working directory and parent directories.
|
||||
98
.agents/skills/lancedb-update-lance-dependency/SKILL.md
Normal file
98
.agents/skills/lancedb-update-lance-dependency/SKILL.md
Normal file
@@ -0,0 +1,98 @@
|
||||
---
|
||||
name: lancedb-update-lance-dependency
|
||||
description: Update LanceDB to a specific Lance release or tag. Use when bumping Lance dependencies in the lancedb repository, including Rust workspace Lance crates, Java lance-core, validation, branch creation, commit, push, and PR creation when requested.
|
||||
---
|
||||
|
||||
# LanceDB Update Lance Dependency
|
||||
|
||||
## Scope
|
||||
|
||||
Use this skill in the `lancedb/lancedb` repository when updating the Lance dependency to a specific Lance version or tag.
|
||||
|
||||
Inputs can be a version (`7.2.0-beta.1`), a tag (`v7.2.0-beta.1`), a tag ref (`refs/tags/v7.2.0-beta.1`), or `latest`.
|
||||
|
||||
## Workflow
|
||||
|
||||
1. Confirm the worktree status with `git status --short`.
|
||||
2. Resolve the target Lance version:
|
||||
|
||||
- If the input is `latest`, empty, or omitted, run:
|
||||
|
||||
```bash
|
||||
python3 ci/check_lance_release.py
|
||||
```
|
||||
|
||||
Parse the JSON output. If `needs_update` is not `true`, stop without creating a PR. Otherwise use `latest_tag`.
|
||||
|
||||
- If the input is explicit, use it directly.
|
||||
|
||||
3. Compute update metadata without changing files:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION" --metadata-only
|
||||
```
|
||||
|
||||
Before making changes, check for an existing open PR with the emitted `pr_title`:
|
||||
|
||||
```bash
|
||||
gh pr list --search "\"$PR_TITLE\" in:title" --state open --limit 1 --json number,url,title
|
||||
```
|
||||
|
||||
If a matching open PR exists, stop and report it instead of creating a duplicate.
|
||||
|
||||
4. Run the deterministic update entrypoint:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION"
|
||||
```
|
||||
|
||||
This updates the Rust workspace Lance dependencies through `ci/set_lance_version.py`, updates `java/pom.xml`, refreshes Cargo metadata, and prints JSON metadata containing `branch_name`, `commit_message`, and `pr_title`.
|
||||
|
||||
5. Run validation:
|
||||
|
||||
```bash
|
||||
cargo clippy --quiet --workspace --tests --all-features -- -D warnings
|
||||
cargo fmt --all --quiet
|
||||
```
|
||||
|
||||
Fix real diagnostics and rerun clippy until it succeeds. Do not skip warnings.
|
||||
|
||||
6. Inspect `git status --short` and `git diff` to ensure only the Lance dependency update and required compatibility fixes are present.
|
||||
|
||||
7. If the task only asks to prepare local changes, stop here and report the changed files and validation result.
|
||||
|
||||
8. If the task asks to publish the update, create a branch using the printed `branch_name`, stage all relevant files, and commit using the printed `commit_message`. Do not amend or rewrite existing commits.
|
||||
|
||||
9. Push to `origin`. Before creating the PR, check that the current token has push permission:
|
||||
|
||||
```bash
|
||||
gh api repos/lancedb/lancedb --jq .permissions.push
|
||||
```
|
||||
|
||||
If the remote branch already exists for the same generated branch name, delete the remote ref with `gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/$BRANCH_NAME`, then push. Do not force-push.
|
||||
|
||||
10. Create a PR targeting `main` with the printed `pr_title`. If there is no PR template, keep the body to two or three concise sentences: state the Lance dependency bump, note any required compatibility fixes, and link the triggering Lance tag or release.
|
||||
|
||||
11. Read back the remote PR title after creation. If it is not a Conventional Commit title, fix it immediately.
|
||||
|
||||
12. When running in GitHub Actions after creating the LanceDB PR, trigger the Sophon dependency update:
|
||||
|
||||
```bash
|
||||
gh workflow run codex-bump-lancedb-lance.yml \
|
||||
--repo lancedb/sophon \
|
||||
-f lance_ref="$LANCE_TAG" \
|
||||
-f lancedb_ref="$BRANCH_NAME"
|
||||
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
|
||||
```
|
||||
|
||||
Use the emitted metadata `tag` value as `LANCE_TAG`. Do this only after a new LanceDB PR has been created. If the update was skipped because no update is needed or an open PR already exists, do not trigger Sophon.
|
||||
|
||||
## GitHub Actions
|
||||
|
||||
When this skill is used from GitHub Actions, `TAG`, `GH_TOKEN`, and `GITHUB_TOKEN` may already be set. Resolve `latest` first when `TAG` is empty. Once an explicit tag or version is known, use:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG" --github-output "$GITHUB_OUTPUT"
|
||||
```
|
||||
|
||||
Then use the emitted `branch_name`, `commit_message`, and `pr_title` values for branch, commit, and PR creation.
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.30.0-beta.1"
|
||||
current_version = "0.30.1-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -4,14 +4,16 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
tag:
|
||||
description: "Tag name from Lance"
|
||||
required: true
|
||||
description: "Tag name from Lance. If omitted, the skill will use the latest Lance release that needs an update."
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
tag:
|
||||
description: "Tag name from Lance"
|
||||
required: true
|
||||
description: "Tag name from Lance. Leave empty to use the latest Lance release that needs an update."
|
||||
required: false
|
||||
default: ""
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
@@ -25,7 +27,7 @@ jobs:
|
||||
steps:
|
||||
- name: Show inputs
|
||||
run: |
|
||||
echo "tag = ${{ inputs.tag }}"
|
||||
echo "tag = ${{ inputs.tag || 'latest' }}"
|
||||
|
||||
- name: Checkout Repo LanceDB
|
||||
uses: actions/checkout@v4
|
||||
@@ -71,65 +73,21 @@ jobs:
|
||||
OPENAI_API_KEY: ${{ secrets.CODEX_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
VERSION="${TAG#refs/tags/}"
|
||||
VERSION="${VERSION#v}"
|
||||
BRANCH_NAME="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
|
||||
|
||||
# Use "chore" for beta/rc versions, "feat" for stable releases
|
||||
if [[ "${VERSION}" == *beta* ]] || [[ "${VERSION}" == *rc* ]]; then
|
||||
COMMIT_TYPE="chore"
|
||||
else
|
||||
COMMIT_TYPE="feat"
|
||||
fi
|
||||
TARGET_TAG="${TAG:-latest}"
|
||||
|
||||
cat <<EOF >/tmp/codex-prompt.txt
|
||||
You are running inside the lancedb repository on a GitHub Actions runner. Update the Lance dependency to version ${VERSION} and prepare a pull request for maintainers to review.
|
||||
You are running inside the lancedb repository on a GitHub Actions runner.
|
||||
|
||||
Follow these steps exactly:
|
||||
1. Use script "ci/set_lance_version.py" to update Lance Rust dependencies. The script already refreshes Cargo metadata, so allow it to finish even if it takes time.
|
||||
2. Update the Java lance-core dependency version in "java/pom.xml": change the "<lance-core.version>...</lance-core.version>" property to "${VERSION}".
|
||||
3. Run "cargo clippy --workspace --tests --all-features -- -D warnings". If diagnostics appear, fix them yourself and rerun clippy until it exits cleanly. Do not skip any warnings.
|
||||
4. After clippy succeeds, run "cargo fmt --all" to format the workspace.
|
||||
5. Ensure the repository is clean except for intentional changes. Inspect "git status --short" and "git diff" to confirm the dependency update and any required fixes.
|
||||
6. Create and switch to a new branch named "${BRANCH_NAME}" (replace any duplicated hyphens if necessary).
|
||||
7. Stage all relevant files with "git add -A". Commit using the message "${COMMIT_TYPE}: update lance dependency to v${VERSION}".
|
||||
8. Push the branch to origin. If the remote branch already exists, delete it first with "gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/${BRANCH_NAME}" then push with "git push origin ${BRANCH_NAME}". Do NOT use "git push --force" or "git push -f".
|
||||
9. env "GH_TOKEN" is available, use "gh" tools for github related operations like creating pull request.
|
||||
10. Create a pull request targeting "main" with title "${COMMIT_TYPE}: update lance dependency to v${VERSION}". First, write the PR body to /tmp/pr-body.md using a heredoc (cat <<'EOF' > /tmp/pr-body.md). The body should summarize the dependency bump, clippy/fmt verification, and link the triggering tag (${TAG}). Then run "gh pr create --body-file /tmp/pr-body.md".
|
||||
11. After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
|
||||
Use \$lancedb-update-lance-dependency with target "${TARGET_TAG}".
|
||||
|
||||
Constraints:
|
||||
- Use bash commands; avoid modifying GitHub workflow files other than through the scripted task above.
|
||||
- Do not merge the PR.
|
||||
- If any command fails, diagnose and fix the issue instead of aborting.
|
||||
- Use env "GH_TOKEN" for GitHub operations.
|
||||
- Do not merge the pull request.
|
||||
- Do not force-push.
|
||||
- Do not create a duplicate pull request if an open PR already exists for the target Lance version.
|
||||
- If any command fails, diagnose and fix the root cause instead of aborting.
|
||||
- After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
|
||||
EOF
|
||||
|
||||
printenv OPENAI_API_KEY | codex login --with-api-key
|
||||
codex --config shell_environment_policy.ignore_default_excludes=true exec --dangerously-bypass-approvals-and-sandbox "$(cat /tmp/codex-prompt.txt)"
|
||||
|
||||
- name: Trigger sophon dependency update
|
||||
env:
|
||||
TAG: ${{ inputs.tag }}
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
VERSION="${TAG#refs/tags/}"
|
||||
VERSION="${VERSION#v}"
|
||||
LANCEDB_BRANCH="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
|
||||
|
||||
echo "Triggering sophon workflow with:"
|
||||
echo " lance_ref: ${TAG#refs/tags/}"
|
||||
echo " lancedb_ref: ${LANCEDB_BRANCH}"
|
||||
|
||||
gh workflow run codex-bump-lancedb-lance.yml \
|
||||
--repo lancedb/sophon \
|
||||
-f lance_ref="${TAG#refs/tags/}" \
|
||||
-f lancedb_ref="${LANCEDB_BRANCH}"
|
||||
|
||||
- name: Show latest sophon workflow run
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "Latest sophon workflow run:"
|
||||
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
|
||||
|
||||
62
.github/workflows/lance-release-timer.yml
vendored
62
.github/workflows/lance-release-timer.yml
vendored
@@ -1,62 +0,0 @@
|
||||
name: Lance Release Timer
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "*/10 * * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
actions: write
|
||||
|
||||
concurrency:
|
||||
group: lance-release-timer
|
||||
cancel-in-progress: false
|
||||
|
||||
jobs:
|
||||
trigger-update:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Check for new Lance tag
|
||||
id: check
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
python3 ci/check_lance_release.py --github-output "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Look for existing PR
|
||||
if: steps.check.outputs.needs_update == 'true'
|
||||
id: pr
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TITLE="chore: update lance dependency to v${{ steps.check.outputs.latest_version }}"
|
||||
COUNT=$(gh pr list --search "\"$TITLE\" in:title" --state open --limit 1 --json number --jq 'length')
|
||||
if [ "$COUNT" -gt 0 ]; then
|
||||
echo "Open PR already exists for $TITLE"
|
||||
echo "pr_exists=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "No existing PR for $TITLE"
|
||||
echo "pr_exists=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Trigger codex update workflow
|
||||
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TAG=${{ steps.check.outputs.latest_tag }}
|
||||
gh workflow run codex-update-lance-dependency.yml -f tag=refs/tags/$TAG
|
||||
|
||||
- name: Show latest codex workflow run
|
||||
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
gh run list --workflow codex-update-lance-dependency.yml --limit 1 --json databaseId,url,displayTitle
|
||||
348
Cargo.lock
generated
348
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
126
ci/update_lance_dependency.py
Normal file
126
ci/update_lance_dependency.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Prepare a Lance dependency update for LanceDB."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
|
||||
try:
|
||||
from check_lance_release import parse_semver
|
||||
except ModuleNotFoundError:
|
||||
# Supports importing as ci.update_lance_dependency from tests or ad hoc checks.
|
||||
from ci.check_lance_release import parse_semver # type: ignore
|
||||
|
||||
|
||||
def normalize_version(raw: str) -> str:
|
||||
value = raw.strip()
|
||||
value = value.removeprefix("refs/tags/")
|
||||
value = value.removeprefix("v")
|
||||
try:
|
||||
parse_semver(value)
|
||||
except ValueError:
|
||||
raise ValueError(f"Unsupported Lance version or tag: {raw}")
|
||||
return value
|
||||
|
||||
|
||||
def normalized_tag(version: str) -> str:
|
||||
return f"v{version}"
|
||||
|
||||
|
||||
def branch_name(version: str) -> str:
|
||||
suffix = re.sub(r"[^a-zA-Z0-9]+", "-", version).strip("-")
|
||||
suffix = re.sub(r"-+", "-", suffix)
|
||||
return f"codex/update-lance-{suffix}"
|
||||
|
||||
|
||||
def commit_type(version: str) -> str:
|
||||
prerelease = version.split("-", maxsplit=1)[1] if "-" in version else ""
|
||||
return "chore" if "beta" in prerelease or "rc" in prerelease else "feat"
|
||||
|
||||
|
||||
def metadata_for(version: str) -> dict[str, str]:
|
||||
kind = commit_type(version)
|
||||
message = f"{kind}: update lance dependency to v{version}"
|
||||
return {
|
||||
"version": version,
|
||||
"tag": normalized_tag(version),
|
||||
"branch_name": branch_name(version),
|
||||
"commit_type": kind,
|
||||
"commit_message": message,
|
||||
"pr_title": message,
|
||||
}
|
||||
|
||||
|
||||
def run_command(cmd: Sequence[str], *, cwd: Path) -> None:
|
||||
subprocess.run(cmd, cwd=cwd, check=True)
|
||||
|
||||
|
||||
def update_java_lance_core_version(repo_root: Path, version: str) -> None:
|
||||
pom_path = repo_root / "java" / "pom.xml"
|
||||
contents = pom_path.read_text(encoding="utf-8")
|
||||
updated, count = re.subn(
|
||||
r"(<lance-core\.version>)[^<]+(</lance-core\.version>)",
|
||||
rf"\g<1>{version}\g<2>",
|
||||
contents,
|
||||
count=1,
|
||||
)
|
||||
if count != 1:
|
||||
raise RuntimeError(
|
||||
"Expected exactly one <lance-core.version> entry in java/pom.xml"
|
||||
)
|
||||
pom_path.write_text(updated, encoding="utf-8")
|
||||
|
||||
|
||||
def write_github_outputs(path: str | None, payload: dict[str, str]) -> None:
|
||||
if not path:
|
||||
return
|
||||
with open(path, "a", encoding="utf-8") as output:
|
||||
for key, value in payload.items():
|
||||
output.write(f"{key}={value}\n")
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"tag_or_version",
|
||||
help="Lance tag or version, for example refs/tags/v7.2.0-beta.1 or 7.2.0",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--repo-root",
|
||||
type=Path,
|
||||
default=Path(__file__).resolve().parents[1],
|
||||
help="Path to the lancedb repository root",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--github-output",
|
||||
default=None,
|
||||
help="Optional GitHub Actions output file to receive metadata fields",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--metadata-only",
|
||||
action="store_true",
|
||||
help="Only print derived metadata; do not modify dependency files",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
repo_root = args.repo_root.resolve()
|
||||
version = normalize_version(args.tag_or_version)
|
||||
payload = metadata_for(version)
|
||||
|
||||
if not args.metadata_only:
|
||||
run_command([sys.executable, "ci/set_lance_version.py", version], cwd=repo_root)
|
||||
update_java_lance_core_version(repo_root, version)
|
||||
|
||||
write_github_outputs(args.github_output, payload)
|
||||
print(json.dumps(payload, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
|
||||
<dependency>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-core</artifactId>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<version>0.30.1-beta.0</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
|
||||
@@ -994,29 +994,6 @@ based on the row being updated (e.g. "my_col + 1")
|
||||
|
||||
***
|
||||
|
||||
### updateFieldMetadata()
|
||||
|
||||
```ts
|
||||
abstract updateFieldMetadata(updates): Promise<UpdateFieldMetadataResult>
|
||||
```
|
||||
|
||||
Update per-field (column) metadata.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **updates**: [`FieldMetadataUpdate`](../interfaces/FieldMetadataUpdate.md)[]
|
||||
One or more per-field updates. Each
|
||||
update's metadata is merged into the field's existing metadata by default;
|
||||
a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`UpdateFieldMetadataResult`](../interfaces/UpdateFieldMetadataResult.md)>
|
||||
|
||||
resolves to the new table version.
|
||||
|
||||
***
|
||||
|
||||
### vectorSearch()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -65,7 +65,6 @@
|
||||
- [DropNamespaceOptions](interfaces/DropNamespaceOptions.md)
|
||||
- [DropNamespaceResponse](interfaces/DropNamespaceResponse.md)
|
||||
- [ExecutableQuery](interfaces/ExecutableQuery.md)
|
||||
- [FieldMetadataUpdate](interfaces/FieldMetadataUpdate.md)
|
||||
- [FragmentStatistics](interfaces/FragmentStatistics.md)
|
||||
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
|
||||
- [FtsOptions](interfaces/FtsOptions.md)
|
||||
@@ -102,7 +101,6 @@
|
||||
- [TimeoutConfig](interfaces/TimeoutConfig.md)
|
||||
- [TlsConfig](interfaces/TlsConfig.md)
|
||||
- [TokenResponse](interfaces/TokenResponse.md)
|
||||
- [UpdateFieldMetadataResult](interfaces/UpdateFieldMetadataResult.md)
|
||||
- [UpdateOptions](interfaces/UpdateOptions.md)
|
||||
- [UpdateResult](interfaces/UpdateResult.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FieldMetadataUpdate
|
||||
|
||||
# Interface: FieldMetadataUpdate
|
||||
|
||||
A per-field metadata update, addressed by dot-path.
|
||||
|
||||
## Properties
|
||||
|
||||
### metadata
|
||||
|
||||
```ts
|
||||
metadata: Record<string, null | string>;
|
||||
```
|
||||
|
||||
Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
default; a value of `null` deletes that key.
|
||||
|
||||
***
|
||||
|
||||
### path
|
||||
|
||||
```ts
|
||||
path: string;
|
||||
```
|
||||
|
||||
Dot-separated path to the field. For a top-level column this is just its
|
||||
name; for a nested field it's the path, e.g. "a.b.c".
|
||||
|
||||
***
|
||||
|
||||
### replace?
|
||||
|
||||
```ts
|
||||
optional replace: boolean;
|
||||
```
|
||||
|
||||
If true, replace the field's entire metadata map instead of merging.
|
||||
@@ -1,15 +0,0 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / UpdateFieldMetadataResult
|
||||
|
||||
# Interface: UpdateFieldMetadataResult
|
||||
|
||||
## Properties
|
||||
|
||||
### version
|
||||
|
||||
```ts
|
||||
version: number;
|
||||
```
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<version>0.30.1-beta.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.30.0-beta.1</version>
|
||||
<version>0.30.1-beta.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
<description>LanceDB Java SDK Parent POM</description>
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.30.0-beta.1"
|
||||
version = "0.30.1-beta.0"
|
||||
publish = false
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
|
||||
@@ -1571,33 +1571,6 @@ describe("schema evolution", function () {
|
||||
expect(await table.schema()).toEqual(expectedSchema3);
|
||||
});
|
||||
|
||||
it("can update field metadata", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
const table = await con.createTable("fm", [
|
||||
{ id: 1, category: "a" },
|
||||
{ id: 2, category: "b" },
|
||||
]);
|
||||
|
||||
const res = await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { unit: "label", pii: "false" } },
|
||||
]);
|
||||
expect(res).toHaveProperty("version");
|
||||
expect(res.version).toBe(2);
|
||||
|
||||
let cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label");
|
||||
expect(cat?.metadata.get("pii")).toBe("false");
|
||||
|
||||
// merge: add a key, delete one via null, keep the rest
|
||||
await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { source: "import", pii: null } },
|
||||
]);
|
||||
cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label"); // preserved
|
||||
expect(cat?.metadata.get("source")).toBe("import"); // added
|
||||
expect(cat?.metadata.has("pii")).toBe(false); // deleted
|
||||
});
|
||||
|
||||
it("can cast to various types", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ export {
|
||||
AddResult,
|
||||
AddColumnsResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
UpdateResult,
|
||||
@@ -118,7 +117,6 @@ export {
|
||||
WriteProgress,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
FieldMetadataUpdate,
|
||||
} from "./table";
|
||||
|
||||
export {
|
||||
|
||||
@@ -32,7 +32,6 @@ import {
|
||||
OptimizeStats,
|
||||
TableStatistics,
|
||||
Tags,
|
||||
UpdateFieldMetadataResult,
|
||||
UpdateResult,
|
||||
Table as _NativeTable,
|
||||
} from "./native";
|
||||
@@ -509,18 +508,6 @@ export abstract class Table {
|
||||
abstract alterColumns(
|
||||
columnAlterations: ColumnAlteration[],
|
||||
): Promise<AlterColumnsResult>;
|
||||
|
||||
/**
|
||||
* Update per-field (column) metadata.
|
||||
* @param {FieldMetadataUpdate[]} updates One or more per-field updates. Each
|
||||
* update's metadata is merged into the field's existing metadata by default;
|
||||
* a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
* @returns {Promise<UpdateFieldMetadataResult>} resolves to the new table version.
|
||||
*/
|
||||
abstract updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult>;
|
||||
|
||||
/**
|
||||
* Drop one or more columns from the dataset
|
||||
*
|
||||
@@ -1050,12 +1037,6 @@ export class LocalTable extends Table {
|
||||
return await this.inner.alterColumns(processedAlterations);
|
||||
}
|
||||
|
||||
async updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult> {
|
||||
return await this.inner.updateFieldMetadata(updates);
|
||||
}
|
||||
|
||||
async dropColumns(columnNames: string[]): Promise<DropColumnsResult> {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
@@ -1222,19 +1203,3 @@ export interface ColumnAlteration {
|
||||
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
|
||||
nullable?: boolean;
|
||||
}
|
||||
|
||||
/** A per-field metadata update, addressed by dot-path. */
|
||||
export interface FieldMetadataUpdate {
|
||||
/**
|
||||
* Dot-separated path to the field. For a top-level column this is just its
|
||||
* name; for a nested field it's the path, e.g. "a.b.c".
|
||||
*/
|
||||
path: string;
|
||||
/**
|
||||
* Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
* default; a value of `null` deletes that key.
|
||||
*/
|
||||
metadata: Record<string, string | null>;
|
||||
/** If true, replace the field's entire metadata map instead of merging. */
|
||||
replace?: boolean;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.30.0-beta.1",
|
||||
"version": "0.30.1-beta.0",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -5,9 +5,8 @@ use std::collections::HashMap;
|
||||
|
||||
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration,
|
||||
FieldMetadataUpdate as LanceFieldMetadataUpdate, NewColumnTransform, OptimizeAction,
|
||||
OptimizeOptions, Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
|
||||
@@ -356,23 +355,6 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: Vec<FieldMetadataUpdate>,
|
||||
) -> napi::Result<UpdateFieldMetadataResult> {
|
||||
let updates = updates
|
||||
.into_iter()
|
||||
.map(LanceFieldMetadataUpdate::from)
|
||||
.collect::<Vec<_>>();
|
||||
let res = self
|
||||
.inner_ref()?
|
||||
.update_field_metadata(&updates)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<DropColumnsResult> {
|
||||
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
@@ -765,29 +747,6 @@ pub struct ColumnAlteration {
|
||||
pub nullable: Option<bool>,
|
||||
}
|
||||
|
||||
/// A per-field metadata update, addressed by dot-path. Merges into the field's
|
||||
/// existing metadata by default; a `null` value deletes a key, and `replace`
|
||||
/// swaps the field's entire metadata map.
|
||||
#[napi(object)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. "embedding" or "a.b.c").
|
||||
pub path: String,
|
||||
/// Metadata keys to set; a `null` value deletes that key.
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If true, replace the field's entire metadata map instead of merging.
|
||||
pub replace: Option<bool>,
|
||||
}
|
||||
|
||||
impl From<FieldMetadataUpdate> for LanceFieldMetadataUpdate {
|
||||
fn from(js: FieldMetadataUpdate) -> Self {
|
||||
Self {
|
||||
path: js.path,
|
||||
metadata: js.metadata,
|
||||
replace: js.replace.unwrap_or(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
|
||||
type Error = String;
|
||||
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
|
||||
@@ -1028,19 +987,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(value: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: value.version as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct DropColumnsResult {
|
||||
pub version: i64,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.33.0-beta.1"
|
||||
current_version = "0.33.1-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.33.0-beta.1"
|
||||
version = "0.33.1-beta.0"
|
||||
publish = false
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
|
||||
@@ -315,6 +315,15 @@ def deserialize_conn(
|
||||
manifest_enabled=parsed.get("manifest_enabled", False),
|
||||
namespace_client_properties=parsed.get("namespace_client_properties"),
|
||||
)
|
||||
elif connection_type == "remote":
|
||||
return RemoteDBConnection(
|
||||
parsed["db_url"],
|
||||
parsed["api_key"],
|
||||
parsed.get("region", "us-east-1"),
|
||||
host_override=parsed.get("host_override"),
|
||||
client_config=parsed.get("client_config"),
|
||||
storage_options=storage_options,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown connection_type: {connection_type}")
|
||||
|
||||
|
||||
@@ -208,9 +208,6 @@ class Table:
|
||||
async def alter_columns(
|
||||
self, columns: list[dict[str, Any]]
|
||||
) -> AlterColumnsResult: ...
|
||||
async def update_field_metadata(
|
||||
self, updates: list[dict[str, Any]]
|
||||
) -> UpdateFieldMetadataResult: ...
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -463,9 +460,6 @@ class AddColumnsResult:
|
||||
class AlterColumnsResult:
|
||||
version: int
|
||||
|
||||
class UpdateFieldMetadataResult:
|
||||
version: int
|
||||
|
||||
class DropColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -3,12 +3,13 @@
|
||||
|
||||
import copy
|
||||
import json
|
||||
import os
|
||||
|
||||
from deprecation import deprecated
|
||||
import pyarrow as pa
|
||||
|
||||
from ._lancedb import async_permutation_builder, PermutationReader
|
||||
from .table import LanceTable
|
||||
from .table import LanceTable, Table
|
||||
from .background_loop import LOOP
|
||||
from .util import batch_to_tensor, batch_to_tensor_rows
|
||||
from typing import Any, Callable, Iterator, Literal, Optional, TYPE_CHECKING, Union
|
||||
@@ -354,6 +355,49 @@ class Transforms:
|
||||
DEFAULT_BATCH_SIZE = 100
|
||||
|
||||
|
||||
def _table_to_pickle_state(table: Table) -> dict[str, Any]:
|
||||
from .remote.table import RemoteTable
|
||||
|
||||
if isinstance(table, RemoteTable):
|
||||
return {
|
||||
"kind": "remote",
|
||||
"table": table,
|
||||
}
|
||||
|
||||
if not isinstance(table, LanceTable):
|
||||
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
|
||||
|
||||
base_uri = table._conn.uri
|
||||
if base_uri.startswith("memory://"):
|
||||
return {
|
||||
"kind": "memory",
|
||||
"name": table.name,
|
||||
"data": table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
"kind": "local",
|
||||
"name": table.name,
|
||||
"uri": base_uri,
|
||||
"namespace": table._namespace_path,
|
||||
"storage_options": table._conn.storage_options,
|
||||
}
|
||||
|
||||
|
||||
def _table_from_pickle_state(state: dict[str, Any]) -> Table:
|
||||
from . import connect
|
||||
|
||||
kind = state["kind"]
|
||||
if kind == "remote":
|
||||
return state["table"]
|
||||
if kind == "memory":
|
||||
return connect("memory://").create_table(state["name"], state["data"])
|
||||
if kind == "local":
|
||||
db = connect(state["uri"], storage_options=state["storage_options"])
|
||||
return db.open_table(state["name"], namespace_path=state["namespace"] or None)
|
||||
raise ValueError(f"Unknown table pickle state kind: {kind}")
|
||||
|
||||
|
||||
class Permutation:
|
||||
"""
|
||||
A Permutation is a view of a dataset that can be used as input to model training
|
||||
@@ -369,15 +413,15 @@ class Permutation:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable],
|
||||
base_table: Table,
|
||||
permutation_table: Optional[Table],
|
||||
split: int,
|
||||
selection: dict[str, str],
|
||||
batch_size: int,
|
||||
transform_fn: Callable[pa.RecordBatch, Any],
|
||||
offset: Optional[int] = None,
|
||||
limit: Optional[int] = None,
|
||||
connection_factory: Optional[Callable[[str], LanceTable]] = None,
|
||||
connection_factory: Optional[Callable[[str], Table]] = None,
|
||||
_reader: Optional[PermutationReader] = None,
|
||||
):
|
||||
"""
|
||||
@@ -397,6 +441,7 @@ class Permutation:
|
||||
if _reader is None:
|
||||
_reader = LOOP.run(self._build_reader())
|
||||
self.reader: PermutationReader = _reader
|
||||
self._pid = os.getpid()
|
||||
|
||||
async def _build_reader(self) -> PermutationReader:
|
||||
reader = await PermutationReader.from_tables(
|
||||
@@ -428,29 +473,25 @@ class Permutation:
|
||||
return new
|
||||
|
||||
def with_connection_factory(
|
||||
self, connection_factory: Callable[[str], LanceTable]
|
||||
self, connection_factory: Callable[[str], Table]
|
||||
) -> "Permutation":
|
||||
"""
|
||||
Creates a new permutation that will use ``connection_factory`` to reopen
|
||||
the base table when this permutation is unpickled in a worker process.
|
||||
|
||||
The factory is a callable that takes a single argument — the base table
|
||||
name — and returns a [LanceTable]. It must be picklable; the worker
|
||||
The factory is a callable that takes a single argument, the base table
|
||||
name, and returns a LanceDB table. It must be picklable; the worker
|
||||
will pickle it via standard ``pickle`` and call it to recover the base
|
||||
table. Picklable callables in practice means top-level (module-level)
|
||||
functions, ``functools.partial`` of such functions, or instances of
|
||||
picklable classes implementing ``__call__``. Lambdas and closures over
|
||||
local variables don't pickle with the default protocol.
|
||||
|
||||
Setting a factory is necessary when the URI alone is not enough to
|
||||
re-open the connection — most importantly for LanceDB Cloud (``db://``)
|
||||
connections, where ``api_key`` and ``region`` aren't recoverable from
|
||||
the connection object after construction.
|
||||
|
||||
For local file or cloud-storage paths the factory is optional: if not
|
||||
set, ``__getstate__`` falls back to capturing
|
||||
``(uri, storage_options, namespace_path)`` and re-opening via
|
||||
``lancedb.connect(uri, storage_options=...)``.
|
||||
A factory is optional for normal local and remote LanceDB connections:
|
||||
if not set, ``__getstate__`` captures the table's own picklable reopen
|
||||
state. Use a factory when that default state is not enough, for example
|
||||
when credentials should be loaded from the worker environment instead
|
||||
of being embedded in the pickle.
|
||||
|
||||
Examples
|
||||
--------
|
||||
@@ -508,7 +549,7 @@ class Permutation:
|
||||
return new
|
||||
|
||||
@classmethod
|
||||
def identity(cls, table: LanceTable) -> "Permutation":
|
||||
def identity(cls, table: Table) -> "Permutation":
|
||||
"""
|
||||
Creates an identity permutation for the given table.
|
||||
"""
|
||||
@@ -517,8 +558,8 @@ class Permutation:
|
||||
@classmethod
|
||||
def from_tables(
|
||||
cls,
|
||||
base_table: LanceTable,
|
||||
permutation_table: Optional[LanceTable] = None,
|
||||
base_table: Table,
|
||||
permutation_table: Optional[Table] = None,
|
||||
split: Optional[Union[str, int]] = None,
|
||||
) -> "Permutation":
|
||||
"""
|
||||
@@ -594,11 +635,10 @@ class Permutation:
|
||||
|
||||
The base table is captured either via a user-supplied
|
||||
``connection_factory`` (see [with_connection_factory]) or, as a
|
||||
fallback, by introspecting ``(uri, storage_options, namespace_path)``
|
||||
on the connection. The permutation table — always an in-memory
|
||||
LanceDB table — is captured as a pyarrow Table (which pickles via
|
||||
Arrow IPC natively). The reader is dropped from the wire format;
|
||||
``__setstate__`` rebuilds it from the restored tables.
|
||||
fallback, by the table's own picklable reopen state. The permutation
|
||||
table is captured as a pyarrow Table (which pickles via Arrow IPC
|
||||
natively). The reader is dropped from the wire format and rebuilt
|
||||
lazily on first use.
|
||||
"""
|
||||
permutation_data: Optional[pa.Table] = None
|
||||
if self.permutation_table is not None:
|
||||
@@ -622,39 +662,9 @@ class Permutation:
|
||||
# namespace from the existing connection.
|
||||
return common
|
||||
|
||||
# URI-introspection fallback: only viable for native (OSS) connections
|
||||
# where (uri, storage_options) is enough to reopen. Remote / cloud
|
||||
# connections don't expose recoverable api_key / region — those users
|
||||
# must call with_connection_factory().
|
||||
try:
|
||||
base_uri = self.base_table._conn.uri
|
||||
storage_options = self.base_table._conn.storage_options
|
||||
except AttributeError as e:
|
||||
raise ValueError(
|
||||
"Cannot pickle this Permutation: the base table's connection "
|
||||
"does not expose a uri/storage_options, which usually means it "
|
||||
"is a remote (LanceDB Cloud) connection. Call "
|
||||
"Permutation.with_connection_factory(...) first to provide a "
|
||||
"picklable callable that re-opens the base table from a worker "
|
||||
"process."
|
||||
) from e
|
||||
|
||||
if base_uri.startswith("memory://"):
|
||||
# In-memory base tables don't exist in any worker process by
|
||||
# default, so dump the entire base table into the pickle. This
|
||||
# can be expensive for large datasets — users with large
|
||||
# in-memory base tables should either persist them or set a
|
||||
# connection_factory.
|
||||
return {
|
||||
**common,
|
||||
"base_table_data": self.base_table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
**common,
|
||||
"base_table_uri": base_uri,
|
||||
"base_table_namespace": self.base_table._namespace_path,
|
||||
"base_table_storage_options": storage_options,
|
||||
"base_table_state": _table_to_pickle_state(self.base_table),
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict[str, Any]) -> None:
|
||||
@@ -663,6 +673,8 @@ class Permutation:
|
||||
connection_factory = state["connection_factory"]
|
||||
if connection_factory is not None:
|
||||
base_table = connection_factory(state["base_table_name"])
|
||||
elif "base_table_state" in state:
|
||||
base_table = _table_from_pickle_state(state["base_table_state"])
|
||||
elif "base_table_data" in state:
|
||||
# In-memory base table inlined into the pickle; rebuild the same
|
||||
# way we rebuild the in-memory permutation table.
|
||||
@@ -680,7 +692,7 @@ class Permutation:
|
||||
namespace_path=state["base_table_namespace"] or None,
|
||||
)
|
||||
|
||||
permutation_table: Optional[LanceTable] = None
|
||||
permutation_table: Optional[Table] = None
|
||||
if state["permutation_data"] is not None:
|
||||
mem_db = connect("memory://")
|
||||
permutation_table = mem_db.create_table(
|
||||
@@ -696,10 +708,28 @@ class Permutation:
|
||||
self.offset = state["offset"]
|
||||
self.limit = state["limit"]
|
||||
self.connection_factory = connection_factory
|
||||
self.reader = None
|
||||
self._pid = None
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if self.reader is not None and getattr(self, "_pid", None) == pid:
|
||||
return
|
||||
# The reader owns Rust-side table handles. Rebuild it after unpickle or
|
||||
# fork even though the Python table wrappers reopen themselves.
|
||||
if hasattr(self.base_table, "_ensure_open"):
|
||||
self.base_table._ensure_open()
|
||||
if self.permutation_table is not None and hasattr(
|
||||
self.permutation_table, "_ensure_open"
|
||||
):
|
||||
self.permutation_table._ensure_open()
|
||||
self.reader = LOOP.run(self._build_reader())
|
||||
self._pid = pid
|
||||
|
||||
@property
|
||||
def schema(self) -> pa.Schema:
|
||||
self._ensure_open()
|
||||
|
||||
async def do_output_schema():
|
||||
return await self.reader.output_schema(self.selection)
|
||||
|
||||
@@ -717,6 +747,7 @@ class Permutation:
|
||||
"""
|
||||
The number of rows in the permutation
|
||||
"""
|
||||
self._ensure_open()
|
||||
return self.reader.count_rows()
|
||||
|
||||
@property
|
||||
@@ -875,6 +906,7 @@ class Permutation:
|
||||
If skip_last_batch is True, the last batch will be skipped if it is not a
|
||||
multiple of batch_size.
|
||||
"""
|
||||
self._ensure_open()
|
||||
|
||||
async def get_iter():
|
||||
return await self.reader.read(self.selection, batch_size=batch_size)
|
||||
@@ -976,6 +1008,7 @@ class Permutation:
|
||||
so `with_format` and `with_transform` affect this method in the same way
|
||||
they affect iteration.
|
||||
"""
|
||||
self._ensure_open()
|
||||
|
||||
async def do_take_offsets():
|
||||
return await self.reader.take_offsets(offsets, selection=self.selection)
|
||||
@@ -1011,9 +1044,11 @@ class Permutation:
|
||||
"""
|
||||
Skip the first `skip` rows of the permutation
|
||||
"""
|
||||
self._ensure_open()
|
||||
new = copy.copy(self)
|
||||
new.offset = skip
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
new._pid = os.getpid()
|
||||
return new
|
||||
|
||||
@deprecated(details="Use with_take instead")
|
||||
@@ -1032,9 +1067,11 @@ class Permutation:
|
||||
"""
|
||||
Limit the permutation to `limit` rows (following any `skip`)
|
||||
"""
|
||||
self._ensure_open()
|
||||
new = copy.copy(self)
|
||||
new.limit = limit
|
||||
new.reader = LOOP.run(new._build_reader())
|
||||
new._pid = os.getpid()
|
||||
return new
|
||||
|
||||
@deprecated(details="Use with_repeat instead")
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import sys
|
||||
@@ -17,7 +18,7 @@ else:
|
||||
|
||||
# Remove this import to fix circular dependency
|
||||
# from lancedb import connect_async
|
||||
from lancedb.remote import ClientConfig
|
||||
from lancedb.remote import ClientConfig, RetryConfig, TimeoutConfig, TlsConfig
|
||||
import pyarrow as pa
|
||||
|
||||
from ..common import DATA
|
||||
@@ -36,6 +37,64 @@ from ..table import Table
|
||||
from ..util import validate_table_name
|
||||
|
||||
|
||||
def _duration_seconds(value: Optional[timedelta]) -> Optional[float]:
|
||||
return value.total_seconds() if value is not None else None
|
||||
|
||||
|
||||
def _timeout_config_to_dict(
|
||||
config: Optional[TimeoutConfig],
|
||||
) -> Optional[dict[str, Any]]:
|
||||
if config is None:
|
||||
return None
|
||||
return {
|
||||
"timeout": _duration_seconds(config.timeout),
|
||||
"connect_timeout": _duration_seconds(config.connect_timeout),
|
||||
"read_timeout": _duration_seconds(config.read_timeout),
|
||||
"pool_idle_timeout": _duration_seconds(config.pool_idle_timeout),
|
||||
}
|
||||
|
||||
|
||||
def _retry_config_to_dict(config: RetryConfig) -> dict[str, Any]:
|
||||
return {
|
||||
"retries": config.retries,
|
||||
"connect_retries": config.connect_retries,
|
||||
"read_retries": config.read_retries,
|
||||
"backoff_factor": config.backoff_factor,
|
||||
"backoff_jitter": config.backoff_jitter,
|
||||
"statuses": config.statuses,
|
||||
}
|
||||
|
||||
|
||||
def _tls_config_to_dict(config: Optional[TlsConfig]) -> Optional[dict[str, Any]]:
|
||||
if config is None:
|
||||
return None
|
||||
return {
|
||||
"cert_file": config.cert_file,
|
||||
"key_file": config.key_file,
|
||||
"ssl_ca_cert": config.ssl_ca_cert,
|
||||
"assert_hostname": config.assert_hostname,
|
||||
}
|
||||
|
||||
|
||||
def _client_config_to_dict(config: ClientConfig) -> dict[str, Any]:
|
||||
if config.header_provider is not None:
|
||||
raise ValueError(
|
||||
"Cannot serialize a remote connection with a header_provider. "
|
||||
"Use static api_key/extra_headers or provide a worker-side "
|
||||
"connection factory instead."
|
||||
)
|
||||
return {
|
||||
"user_agent": config.user_agent,
|
||||
"retry_config": _retry_config_to_dict(config.retry_config),
|
||||
"timeout_config": _timeout_config_to_dict(config.timeout_config),
|
||||
"extra_headers": config.extra_headers,
|
||||
"id_delimiter": config.id_delimiter,
|
||||
"tls_config": _tls_config_to_dict(config.tls_config),
|
||||
"header_provider": None,
|
||||
"user_id": config.user_id,
|
||||
}
|
||||
|
||||
|
||||
class RemoteDBConnection(DBConnection):
|
||||
"""A connection to a remote LanceDB database."""
|
||||
|
||||
@@ -89,6 +148,11 @@ class RemoteDBConnection(DBConnection):
|
||||
parsed = urlparse(db_url)
|
||||
if parsed.scheme != "db":
|
||||
raise ValueError(f"Invalid scheme: {parsed.scheme}, only accepts db://")
|
||||
self.db_url = db_url
|
||||
self.api_key = api_key
|
||||
self.region = region
|
||||
self.host_override = host_override
|
||||
self.storage_options = storage_options
|
||||
self.db_name = parsed.netloc
|
||||
|
||||
self.client_config = client_config
|
||||
@@ -111,6 +175,20 @@ class RemoteDBConnection(DBConnection):
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteConnect(name={self.db_name})"
|
||||
|
||||
@override
|
||||
def serialize(self) -> str:
|
||||
return json.dumps(
|
||||
{
|
||||
"connection_type": "remote",
|
||||
"db_url": self.db_url,
|
||||
"api_key": self.api_key,
|
||||
"region": self.region,
|
||||
"host_override": self.host_override,
|
||||
"client_config": _client_config_to_dict(self.client_config),
|
||||
"storage_options": self.storage_options,
|
||||
}
|
||||
)
|
||||
|
||||
@override
|
||||
def list_namespaces(
|
||||
self,
|
||||
@@ -331,7 +409,12 @@ class RemoteDBConnection(DBConnection):
|
||||
)
|
||||
|
||||
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
|
||||
def clone_table(
|
||||
self,
|
||||
@@ -380,7 +463,12 @@ class RemoteDBConnection(DBConnection):
|
||||
is_shallow=is_shallow,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=target_namespace_path,
|
||||
)
|
||||
|
||||
@override
|
||||
def create_table(
|
||||
@@ -525,7 +613,12 @@ class RemoteDBConnection(DBConnection):
|
||||
fill_value=fill_value,
|
||||
)
|
||||
)
|
||||
return RemoteTable(table, self.db_name)
|
||||
return RemoteTable(
|
||||
table,
|
||||
self.db_name,
|
||||
connection_state=self.serialize,
|
||||
namespace_path=namespace_path,
|
||||
)
|
||||
|
||||
@override
|
||||
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):
|
||||
|
||||
@@ -5,6 +5,7 @@ from datetime import timedelta
|
||||
import deprecation
|
||||
import logging
|
||||
from functools import cached_property
|
||||
import os
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
@@ -24,7 +25,6 @@ from lancedb._lancedb import (
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
@@ -64,14 +64,80 @@ class RemoteTable(Table):
|
||||
self,
|
||||
table: AsyncTable,
|
||||
db_name: str,
|
||||
*,
|
||||
connection_state: Optional[Union[str, Callable[[], str]]] = None,
|
||||
namespace_path: Optional[List[str]] = None,
|
||||
):
|
||||
self._table = table
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self.db_name = db_name
|
||||
self._connection_state = connection_state
|
||||
self._namespace_path = list(namespace_path or [])
|
||||
self._checkout_version: Optional[int] = None
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _serialized_connection_state(self) -> str:
|
||||
if self._connection_state is None:
|
||||
raise RuntimeError(
|
||||
"Cannot reopen this remote table because it does not carry "
|
||||
"serialized connection state"
|
||||
)
|
||||
if callable(self._connection_state):
|
||||
self._connection_state = self._connection_state()
|
||||
return self._connection_state
|
||||
|
||||
@property
|
||||
def _table(self) -> AsyncTable:
|
||||
self._ensure_open()
|
||||
assert self._table_handle is not None
|
||||
return self._table_handle
|
||||
|
||||
@_table.setter
|
||||
def _table(self, table: AsyncTable) -> None:
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if self._table_handle is not None and self._pid == pid:
|
||||
return
|
||||
|
||||
# Pickle clears the handle; fork inherits a handle created in the
|
||||
# parent process. In both cases reopen before touching the Rust client.
|
||||
from lancedb import deserialize_conn
|
||||
|
||||
db = deserialize_conn(self._serialized_connection_state(), for_worker=True)
|
||||
table = db.open_table(self._name, namespace_path=self._namespace_path)
|
||||
if self._checkout_version is not None:
|
||||
table.checkout(self._checkout_version)
|
||||
|
||||
self._table_handle = table._table
|
||||
self.db_name = table.db_name
|
||||
self._pid = pid
|
||||
|
||||
def __getstate__(self) -> dict:
|
||||
return {
|
||||
"connection_state": self._serialized_connection_state(),
|
||||
"db_name": self.db_name,
|
||||
"name": self.name,
|
||||
"namespace_path": self._namespace_path,
|
||||
"checkout_version": self._checkout_version,
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict) -> None:
|
||||
self._table_handle = None
|
||||
self._name = state["name"]
|
||||
self.db_name = state["db_name"]
|
||||
self._connection_state = state["connection_state"]
|
||||
self._namespace_path = state["namespace_path"]
|
||||
self._checkout_version = state["checkout_version"]
|
||||
self._pid = None
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""The name of the table"""
|
||||
return self._table.name
|
||||
return self._name
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"RemoteTable({self.db_name}.{self.name})"
|
||||
@@ -121,13 +187,19 @@ class RemoteTable(Table):
|
||||
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
|
||||
|
||||
def checkout(self, version: Union[int, str]):
|
||||
return LOOP.run(self._table.checkout(version))
|
||||
result = LOOP.run(self._table.checkout(version))
|
||||
self._checkout_version = self.version
|
||||
return result
|
||||
|
||||
def checkout_latest(self):
|
||||
return LOOP.run(self._table.checkout_latest())
|
||||
result = LOOP.run(self._table.checkout_latest())
|
||||
self._checkout_version = None
|
||||
return result
|
||||
|
||||
def restore(self, version: Optional[Union[int, str]] = None):
|
||||
return LOOP.run(self._table.restore(version))
|
||||
result = LOOP.run(self._table.restore(version))
|
||||
self._checkout_version = None
|
||||
return result
|
||||
|
||||
def list_indices(self) -> Iterable[IndexConfig]:
|
||||
"""List all the indices on the table"""
|
||||
@@ -778,11 +850,6 @@ class RemoteTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
|
||||
@@ -154,7 +154,6 @@ if TYPE_CHECKING:
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
@@ -1800,29 +1799,6 @@ class Table(ABC):
|
||||
version: the new version number of the table after the alteration.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""
|
||||
Update per-field (column) metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
updates : dict
|
||||
One or more dicts, each with:
|
||||
- "path": str — dot-path to the field (e.g. "embedding" or "a.b.c").
|
||||
- "metadata": dict[str, str | None] — keys to set; a value of ``None``
|
||||
deletes that key.
|
||||
- "replace": bool, optional — replace the field's whole metadata map
|
||||
instead of merging (default False).
|
||||
|
||||
Returns
|
||||
-------
|
||||
UpdateFieldMetadataResult
|
||||
version: the new table version after the update.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
"""
|
||||
@@ -3607,11 +3583,6 @@ class LanceTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
@@ -3666,18 +3637,10 @@ class LanceTable(Table):
|
||||
"""
|
||||
LOOP.run(self._table.migrate_v2_manifest_paths())
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
@@ -5271,13 +5234,6 @@ class AsyncTable:
|
||||
"""
|
||||
return await self._inner.alter_columns(alterations)
|
||||
|
||||
async def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""Update per-field metadata. See
|
||||
[`Table.update_field_metadata`][lancedb.table.Table.update_field_metadata]."""
|
||||
return await self._inner.update_field_metadata(updates)
|
||||
|
||||
async def drop_columns(self, columns: Iterable[str]):
|
||||
"""
|
||||
Drop columns from the table.
|
||||
@@ -5562,20 +5518,12 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.migrate_manifest_paths_v2()
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
async def replace_field_metadata(
|
||||
self, field_name: str, new_metadata: dict[str, str]
|
||||
):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
import re
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import contextlib
|
||||
from datetime import timedelta
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
@@ -171,6 +172,155 @@ def test_table_len_sync():
|
||||
assert len(table) == 1
|
||||
|
||||
|
||||
def test_remote_connection_serializes():
|
||||
def handler(request):
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"tables": []}')
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
serialized = json.loads(db.serialize())
|
||||
assert isinstance(serialized["client_config"], dict)
|
||||
restored = lancedb.deserialize_conn(db.serialize())
|
||||
assert restored.table_names() == []
|
||||
|
||||
|
||||
def test_remote_table_is_picklable():
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "id", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.open_table("test")
|
||||
restored = pickle.loads(pickle.dumps(table))
|
||||
assert restored.count_rows() == 3
|
||||
|
||||
|
||||
def test_remote_table_open_does_not_require_picklable_client_config():
|
||||
from lancedb.remote import HeaderProvider
|
||||
|
||||
class LocalHeaderProvider(HeaderProvider):
|
||||
def get_headers(self):
|
||||
return {"X-Test-Header": "present"}
|
||||
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
assert request.headers.get("X-Test-Header") == "present"
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with http.server.HTTPServer(
|
||||
("localhost", 0), make_mock_http_handler(handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
"header_provider": LocalHeaderProvider(),
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
assert table.count_rows() == 3
|
||||
with pytest.raises(ValueError, match="header_provider"):
|
||||
pickle.dumps(table)
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
def test_remote_permutation_is_picklable():
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
rows = list(range(10))
|
||||
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "a", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(str(len(rows)).encode())
|
||||
elif request.path == "/v1/table/test/query/":
|
||||
content_len = int(request.headers.get("Content-Length"))
|
||||
body = json.loads(request.rfile.read(content_len))
|
||||
if "filter" in body:
|
||||
match = re.search(r"_rowoffset in \((.*?)\)", body["filter"])
|
||||
offsets = [int(offset.strip()) for offset in match.group(1).split(",")]
|
||||
else:
|
||||
offsets = rows
|
||||
table = pa.table({"a": [rows[offset] for offset in offsets]})
|
||||
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
|
||||
request.end_headers()
|
||||
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
permutation = Permutation.identity(db.open_table("test"))
|
||||
restored = pickle.loads(pickle.dumps(permutation))
|
||||
assert restored.__getitems__([0, 2, 4]) == [{"a": 0}, {"a": 2}, {"a": 4}]
|
||||
|
||||
|
||||
def test_create_table_exist_ok():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=exist_ok":
|
||||
@@ -1400,6 +1550,10 @@ def _remote_fork_child(port: int, queue) -> None:
|
||||
queue.put(db.table_names())
|
||||
|
||||
|
||||
def _remote_table_fork_child(table, queue) -> None:
|
||||
queue.put(table.count_rows())
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
@@ -1462,3 +1616,65 @@ def test_remote_connection_after_fork():
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_inherited_remote_table_reopens_after_fork():
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"7")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
|
||||
port = server.server_address[1]
|
||||
server_thread = threading.Thread(target=server.serve_forever)
|
||||
server_thread.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
assert table.count_rows() == 7
|
||||
|
||||
ctx = mp.get_context("fork")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(target=_remote_table_fork_child, args=(table, queue))
|
||||
proc.start()
|
||||
proc.join(timeout=15)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail("Remote table hung after fork")
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no result"
|
||||
assert queue.get() == 7
|
||||
finally:
|
||||
server.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
@@ -2472,30 +2472,6 @@ def test_alter_columns(mem_db: DBConnection):
|
||||
assert table.to_arrow().column_names == ["new_id"]
|
||||
|
||||
|
||||
def test_update_field_metadata(mem_db: DBConnection):
|
||||
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
|
||||
table = mem_db.create_table("my_table", data=data)
|
||||
|
||||
res = table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"unit": "label", "pii": "false"}}
|
||||
)
|
||||
assert res.version == 2
|
||||
# Arrow field metadata is bytes-keyed
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"pii": b"false",
|
||||
}
|
||||
|
||||
# merge: add a key, delete one via None, keep the rest
|
||||
table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"source": "import", "pii": None}}
|
||||
)
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"source": b"import",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alter_columns_async(mem_db_async: AsyncConnection):
|
||||
data = pa.table({"id": [0, 1]})
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import http.server
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import pickle
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
|
||||
import lancedb
|
||||
import pyarrow as pa
|
||||
@@ -15,6 +20,107 @@ from lancedb.util import tbl_to_tensor
|
||||
torch = pytest.importorskip("torch")
|
||||
|
||||
|
||||
REMOTE_ROWS = list(range(100))
|
||||
|
||||
|
||||
def _make_mock_http_handler(handler):
|
||||
class MockLanceDBHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
handler(self)
|
||||
|
||||
def do_POST(self):
|
||||
handler(self)
|
||||
|
||||
return MockLanceDBHandler
|
||||
|
||||
|
||||
def _remote_schema_payload():
|
||||
return {
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "a", "type": {"type": "int64"}, "nullable": False},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _offsets_from_filter(filter_sql: str | None) -> list[int]:
|
||||
if filter_sql is None:
|
||||
return REMOTE_ROWS
|
||||
match = re.search(r"_rowoffset in \((.*?)\)", filter_sql)
|
||||
if match is None:
|
||||
return REMOTE_ROWS
|
||||
raw_offsets = match.group(1).strip()
|
||||
if raw_offsets == "":
|
||||
return []
|
||||
return [int(offset.strip()) for offset in raw_offsets.split(",")]
|
||||
|
||||
|
||||
def _remote_dataset_handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(json.dumps(_remote_schema_payload()).encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(str(len(REMOTE_ROWS)).encode())
|
||||
elif request.path == "/v1/table/test/query/":
|
||||
content_len = int(request.headers.get("Content-Length"))
|
||||
body = json.loads(request.rfile.read(content_len))
|
||||
offsets = _offsets_from_filter(body.get("filter"))
|
||||
requested_columns = body.get("columns") or ["a"]
|
||||
if isinstance(requested_columns, dict):
|
||||
requested_columns = list(requested_columns)
|
||||
|
||||
data = {}
|
||||
for column in requested_columns:
|
||||
if column == "a":
|
||||
data[column] = [REMOTE_ROWS[offset] for offset in offsets]
|
||||
elif column == "_rowoffset":
|
||||
data[column] = offsets
|
||||
elif column == "_rowid":
|
||||
data[column] = offsets
|
||||
|
||||
table = pa.table(data)
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
|
||||
request.end_headers()
|
||||
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
|
||||
writer.write_table(table)
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _remote_dataset_table():
|
||||
with http.server.ThreadingHTTPServer(
|
||||
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
yield db.open_table("test")
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
|
||||
def _open_native_table(uri: str, table_name: str):
|
||||
"""Top-level connection factory used by the explicit-factory pickle test.
|
||||
|
||||
@@ -107,6 +213,39 @@ def test_permutation_dataloader_multiprocessing(tmp_db):
|
||||
assert seen == 1000
|
||||
|
||||
|
||||
def test_remote_table_dataloader_multiprocessing():
|
||||
with _remote_dataset_table() as table:
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
table,
|
||||
collate_fn=tbl_to_tensor,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch.size(0) == 1
|
||||
assert batch.size(1) == 10
|
||||
seen += batch.size(1)
|
||||
assert seen == len(REMOTE_ROWS)
|
||||
|
||||
|
||||
def test_remote_permutation_dataloader_multiprocessing():
|
||||
with _remote_dataset_table() as table:
|
||||
permutation = Permutation.identity(table)
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="spawn",
|
||||
)
|
||||
seen = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
seen += batch["a"].size(0)
|
||||
assert seen == len(REMOTE_ROWS)
|
||||
|
||||
|
||||
def test_permutation_pickle_with_connection_factory(tmp_path):
|
||||
"""When the user provides a connection_factory, pickling should round-trip
|
||||
through that factory rather than introspecting the connection URI. Useful
|
||||
@@ -171,6 +310,35 @@ def _multiworker_dataloader_target(db_uri: str, result_queue):
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
def _remote_multiworker_dataloader_target(port: int, result_queue):
|
||||
import lancedb
|
||||
from lancedb.permutation import Permutation
|
||||
|
||||
db = lancedb.connect(
|
||||
"db://dev",
|
||||
api_key="fake",
|
||||
host_override=f"http://localhost:{port}",
|
||||
client_config={
|
||||
"retry_config": {"retries": 0},
|
||||
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
|
||||
},
|
||||
)
|
||||
table = db.open_table("test")
|
||||
permutation = Permutation.identity(table)
|
||||
|
||||
dataloader = torch.utils.data.DataLoader(
|
||||
permutation,
|
||||
batch_size=10,
|
||||
num_workers=2,
|
||||
multiprocessing_context="fork",
|
||||
)
|
||||
count = 0
|
||||
for batch in dataloader:
|
||||
assert batch["a"].size(0) == 10
|
||||
count += 1
|
||||
result_queue.put(count)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
@@ -208,3 +376,46 @@ def test_permutation_dataloader_fork_workers(tmp_path):
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 100
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
sys.platform != "linux",
|
||||
reason=(
|
||||
"fork() is unavailable on Windows and unsafe on macOS "
|
||||
"(Apple frameworks/TLS are not fork-safe)"
|
||||
),
|
||||
)
|
||||
def test_remote_permutation_dataloader_fork_workers():
|
||||
with http.server.ThreadingHTTPServer(
|
||||
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
|
||||
) as server:
|
||||
port = server.server_address[1]
|
||||
handle = threading.Thread(target=server.serve_forever)
|
||||
handle.start()
|
||||
try:
|
||||
ctx = mp.get_context("spawn")
|
||||
queue = ctx.Queue()
|
||||
proc = ctx.Process(
|
||||
target=_remote_multiworker_dataloader_target,
|
||||
args=(port, queue),
|
||||
)
|
||||
proc.start()
|
||||
proc.join(timeout=30)
|
||||
|
||||
if proc.is_alive():
|
||||
proc.terminate()
|
||||
proc.join(timeout=5)
|
||||
if proc.is_alive():
|
||||
proc.kill()
|
||||
proc.join()
|
||||
pytest.fail(
|
||||
"Remote permutation hung when iterated in a fork-based "
|
||||
"DataLoader worker"
|
||||
)
|
||||
|
||||
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
|
||||
assert not queue.empty(), "child produced no batches"
|
||||
assert queue.get() == 10
|
||||
finally:
|
||||
server.shutdown()
|
||||
handle.join()
|
||||
|
||||
@@ -16,7 +16,7 @@ use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateFieldMetadataResult, UpdateResult,
|
||||
MergeResult, Table, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -50,7 +50,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<RecordBatchStream>()?;
|
||||
m.add_class::<AddColumnsResult>()?;
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<UpdateFieldMetadataResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
|
||||
@@ -16,8 +16,8 @@ use arrow::{
|
||||
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
|
||||
};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
|
||||
Table as LanceDbTable,
|
||||
};
|
||||
use pyo3::{
|
||||
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
||||
@@ -357,27 +357,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl UpdateFieldMetadataResult {
|
||||
pub fn __repr__(&self) -> String {
|
||||
format!("UpdateFieldMetadataResult(version={})", self.version)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(result: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: result.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DropColumnsResult {
|
||||
@@ -1118,7 +1097,6 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
pub fn replace_field_metadata<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
field_name: String,
|
||||
@@ -1149,45 +1127,6 @@ impl Table {
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_field_metadata<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
updates: Vec<Bound<PyDict>>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let updates = updates
|
||||
.iter()
|
||||
.map(|update| {
|
||||
let path: String = update
|
||||
.get_item("path")?
|
||||
.ok_or_else(|| PyValueError::new_err("Missing path"))?
|
||||
.extract()?;
|
||||
let mut field_update = FieldMetadataUpdate::new(path);
|
||||
if let Some(metadata) = update.get_item("metadata")? {
|
||||
let metadata_dict = metadata.cast::<PyDict>()?;
|
||||
for (key, value) in metadata_dict.iter() {
|
||||
let key: String = key.extract()?;
|
||||
if value.is_none() {
|
||||
field_update = field_update.remove(key);
|
||||
} else {
|
||||
field_update = field_update.set(key, value.extract::<String>()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(replace) = update.get_item("replace")?
|
||||
&& replace.extract::<bool>()?
|
||||
{
|
||||
field_update = field_update.replace();
|
||||
}
|
||||
Ok(field_update)
|
||||
})
|
||||
.collect::<PyResult<Vec<_>>>()?;
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let result = inner.update_field_metadata(&updates).await.infer_error()?;
|
||||
Ok(UpdateFieldMetadataResult::from(result))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.30.0-beta.1"
|
||||
version = "0.30.1-beta.0"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
|
||||
@@ -18,13 +18,13 @@ use crate::index::waiter::wait_for_index;
|
||||
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
|
||||
use crate::table::AddColumnsResult;
|
||||
use crate::table::AddResult;
|
||||
use crate::table::AlterColumnsResult;
|
||||
use crate::table::DeleteResult;
|
||||
use crate::table::DropColumnsResult;
|
||||
use crate::table::MergeResult;
|
||||
use crate::table::Tags;
|
||||
use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AlterColumnsResult, FieldMetadataUpdate, UpdateFieldMetadataResult};
|
||||
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{
|
||||
@@ -1968,35 +1968,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "updates": updates });
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/update_field_metadata/",
|
||||
self.identifier
|
||||
))
|
||||
.json(&body);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let result: UpdateFieldMetadataResult =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse update_field_metadata response: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "columns": columns });
|
||||
@@ -2290,7 +2261,6 @@ mod tests {
|
||||
|
||||
use crate::remote::client::{ClientConfig, RetryConfig};
|
||||
use crate::table::AddDataMode;
|
||||
use crate::table::FieldMetadataUpdate;
|
||||
|
||||
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch};
|
||||
@@ -6490,25 +6460,4 @@ mod tests {
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/update_field_metadata/"
|
||||
);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 7, "fields": {"category": {"unit": "label"}}}"#)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let result = table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category").set("unit", "label")])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.version, 7);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,10 +91,7 @@ pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
|
||||
pub use schema_evolution::{
|
||||
AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate,
|
||||
UpdateFieldMetadataResult,
|
||||
};
|
||||
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
|
||||
use serde_with::skip_serializing_none;
|
||||
pub use update::{UpdateBuilder, UpdateResult};
|
||||
|
||||
@@ -663,19 +660,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
message: "create_insert_exec not implemented".to_string(),
|
||||
})
|
||||
}
|
||||
/// Update per-field metadata. Merges into existing metadata by default;
|
||||
/// [`FieldMetadataUpdate::remove`] deletes a key and
|
||||
/// [`FieldMetadataUpdate::replace`] swaps the field's whole map.
|
||||
///
|
||||
/// The default returns `NotSupported`; Lance-backed and remote tables override it.
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
_updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
Err(Error::NotSupported {
|
||||
message: "update_field_metadata is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
@@ -1356,14 +1340,6 @@ impl Table {
|
||||
self.inner.alter_columns(alterations).await
|
||||
}
|
||||
|
||||
/// Update per-field metadata (merges by default).
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.inner.update_field_metadata(updates).await
|
||||
}
|
||||
|
||||
/// Remove columns from the table.
|
||||
pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.inner.drop_columns(columns).await
|
||||
@@ -2604,7 +2580,6 @@ impl NativeTable {
|
||||
/// field id and the second element is a hashmap of metadata key-value
|
||||
/// pairs.
|
||||
///
|
||||
#[deprecated(since = "0.33.1", note = "Use `update_field_metadata` instead")]
|
||||
pub async fn replace_field_metadata(
|
||||
&self,
|
||||
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
|
||||
@@ -2911,13 +2886,6 @@ impl BaseTable for NativeTable {
|
||||
schema_evolution::execute_alter_columns(self, alterations).await
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
schema_evolution::execute_update_field_metadata(self, updates).await
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
schema_evolution::execute_drop_columns(self, columns).await
|
||||
}
|
||||
@@ -4483,7 +4451,6 @@ mod tests {
|
||||
|
||||
let mut new_field_metadata = HashMap::<String, String>::new();
|
||||
new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
|
||||
#[allow(deprecated)]
|
||||
native_tbl
|
||||
.replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
|
||||
.await
|
||||
|
||||
@@ -870,10 +870,8 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
result.iter().map(|batch| batch.num_rows()).sum::<usize>(),
|
||||
0
|
||||
);
|
||||
// Should return empty or nearly empty result
|
||||
assert!(result[0].num_rows() <= 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
|
||||
use lance::dataset::{ColumnAlteration, NewColumnTransform};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::NativeTable;
|
||||
use crate::Result;
|
||||
@@ -45,52 +44,6 @@ pub struct DropColumnsResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// A single field's metadata update, addressed by dot-path.
|
||||
///
|
||||
/// Merges into the field's existing metadata by default. Use [`Self::remove`] to
|
||||
/// delete a key, or [`Self::replace`] to swap the field's entire metadata map.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`).
|
||||
pub path: String,
|
||||
/// Keys to set (`Some`) or delete (`None`).
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If `true`, replace the field's entire metadata map instead of merging.
|
||||
pub replace: bool,
|
||||
}
|
||||
|
||||
impl FieldMetadataUpdate {
|
||||
pub fn new(path: impl Into<String>) -> Self {
|
||||
Self {
|
||||
path: path.into(),
|
||||
metadata: HashMap::new(),
|
||||
replace: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), Some(value.into()));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn remove(mut self, key: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), None);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn replace(mut self) -> Self {
|
||||
self.replace = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
/// The commit version associated with the operation.
|
||||
#[serde(default)]
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// Internal implementation of the add columns logic.
|
||||
///
|
||||
/// Adds new columns to the table using the provided transforms.
|
||||
@@ -137,32 +90,6 @@ pub(crate) async fn execute_drop_columns(
|
||||
Ok(DropColumnsResult { version })
|
||||
}
|
||||
|
||||
/// Internal implementation of the update field metadata logic.
|
||||
///
|
||||
/// Merges or replaces per-field metadata, addressing fields by dot-path.
|
||||
pub(crate) async fn execute_update_field_metadata(
|
||||
table: &NativeTable,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
|
||||
let mut builder = dataset.update_field_metadata();
|
||||
for update in updates {
|
||||
let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone()));
|
||||
builder = if update.replace {
|
||||
builder.replace(&update.path, entries)?
|
||||
} else {
|
||||
builder.update(&update.path, entries)?
|
||||
};
|
||||
}
|
||||
builder.await?;
|
||||
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(UpdateFieldMetadataResult { version })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_array::{Int32Array, StringArray, record_batch};
|
||||
@@ -170,7 +97,6 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use lance::dataset::ColumnAlteration;
|
||||
|
||||
use super::FieldMetadataUpdate;
|
||||
use crate::connect;
|
||||
use crate::query::{ExecutableQuery, QueryBase, Select};
|
||||
use crate::table::NewColumnTransform;
|
||||
@@ -684,46 +610,4 @@ mod tests {
|
||||
let v4 = table.version().await.unwrap();
|
||||
assert_eq!(drop_result.version, v4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batch = record_batch!(
|
||||
("id", Int32, [1, 2, 3]),
|
||||
("category", Utf8, ["A", "B", "C"])
|
||||
)
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("test_update_field_metadata", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Set metadata on a field.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("unit", "label")
|
||||
.set("pii", "false")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let field = schema.field_with_name("category").unwrap();
|
||||
assert_eq!(
|
||||
field.metadata().get("unit").map(String::as_str),
|
||||
Some("label")
|
||||
);
|
||||
|
||||
// Merge: add a key, delete one, keep the rest.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("source", "import")
|
||||
.remove("pii")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let md = schema.field_with_name("category").unwrap().metadata();
|
||||
assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved
|
||||
assert_eq!(md.get("source").map(String::as_str), Some("import")); // added
|
||||
assert!(!md.contains_key("pii")); // deleted
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user