mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-02 20:00:46 +00:00
Compare commits
1 Commits
feat/depre
...
xuanwo/tab
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d10d22e92 |
@@ -1,7 +0,0 @@
|
||||
# Agent Skills
|
||||
|
||||
This directory contains repo-scoped code agent skills for the LanceDB project.
|
||||
|
||||
Each skill is a folder that contains a required `SKILL.md` and optional bundled resources.
|
||||
|
||||
Codex discovers skills from `.agents/skills` in the current working directory and parent directories.
|
||||
@@ -1,98 +0,0 @@
|
||||
---
|
||||
name: lancedb-update-lance-dependency
|
||||
description: Update LanceDB to a specific Lance release or tag. Use when bumping Lance dependencies in the lancedb repository, including Rust workspace Lance crates, Java lance-core, validation, branch creation, commit, push, and PR creation when requested.
|
||||
---
|
||||
|
||||
# LanceDB Update Lance Dependency
|
||||
|
||||
## Scope
|
||||
|
||||
Use this skill in the `lancedb/lancedb` repository when updating the Lance dependency to a specific Lance version or tag.
|
||||
|
||||
Inputs can be a version (`7.2.0-beta.1`), a tag (`v7.2.0-beta.1`), a tag ref (`refs/tags/v7.2.0-beta.1`), or `latest`.
|
||||
|
||||
## Workflow
|
||||
|
||||
1. Confirm the worktree status with `git status --short`.
|
||||
2. Resolve the target Lance version:
|
||||
|
||||
- If the input is `latest`, empty, or omitted, run:
|
||||
|
||||
```bash
|
||||
python3 ci/check_lance_release.py
|
||||
```
|
||||
|
||||
Parse the JSON output. If `needs_update` is not `true`, stop without creating a PR. Otherwise use `latest_tag`.
|
||||
|
||||
- If the input is explicit, use it directly.
|
||||
|
||||
3. Compute update metadata without changing files:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION" --metadata-only
|
||||
```
|
||||
|
||||
Before making changes, check for an existing open PR with the emitted `pr_title`:
|
||||
|
||||
```bash
|
||||
gh pr list --search "\"$PR_TITLE\" in:title" --state open --limit 1 --json number,url,title
|
||||
```
|
||||
|
||||
If a matching open PR exists, stop and report it instead of creating a duplicate.
|
||||
|
||||
4. Run the deterministic update entrypoint:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION"
|
||||
```
|
||||
|
||||
This updates the Rust workspace Lance dependencies through `ci/set_lance_version.py`, updates `java/pom.xml`, refreshes Cargo metadata, and prints JSON metadata containing `branch_name`, `commit_message`, and `pr_title`.
|
||||
|
||||
5. Run validation:
|
||||
|
||||
```bash
|
||||
cargo clippy --quiet --workspace --tests --all-features -- -D warnings
|
||||
cargo fmt --all --quiet
|
||||
```
|
||||
|
||||
Fix real diagnostics and rerun clippy until it succeeds. Do not skip warnings.
|
||||
|
||||
6. Inspect `git status --short` and `git diff` to ensure only the Lance dependency update and required compatibility fixes are present.
|
||||
|
||||
7. If the task only asks to prepare local changes, stop here and report the changed files and validation result.
|
||||
|
||||
8. If the task asks to publish the update, create a branch using the printed `branch_name`, stage all relevant files, and commit using the printed `commit_message`. Do not amend or rewrite existing commits.
|
||||
|
||||
9. Push to `origin`. Before creating the PR, check that the current token has push permission:
|
||||
|
||||
```bash
|
||||
gh api repos/lancedb/lancedb --jq .permissions.push
|
||||
```
|
||||
|
||||
If the remote branch already exists for the same generated branch name, delete the remote ref with `gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/$BRANCH_NAME`, then push. Do not force-push.
|
||||
|
||||
10. Create a PR targeting `main` with the printed `pr_title`. If there is no PR template, keep the body to two or three concise sentences: state the Lance dependency bump, note any required compatibility fixes, and link the triggering Lance tag or release.
|
||||
|
||||
11. Read back the remote PR title after creation. If it is not a Conventional Commit title, fix it immediately.
|
||||
|
||||
12. When running in GitHub Actions after creating the LanceDB PR, trigger the Sophon dependency update:
|
||||
|
||||
```bash
|
||||
gh workflow run codex-bump-lancedb-lance.yml \
|
||||
--repo lancedb/sophon \
|
||||
-f lance_ref="$LANCE_TAG" \
|
||||
-f lancedb_ref="$BRANCH_NAME"
|
||||
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
|
||||
```
|
||||
|
||||
Use the emitted metadata `tag` value as `LANCE_TAG`. Do this only after a new LanceDB PR has been created. If the update was skipped because no update is needed or an open PR already exists, do not trigger Sophon.
|
||||
|
||||
## GitHub Actions
|
||||
|
||||
When this skill is used from GitHub Actions, `TAG`, `GH_TOKEN`, and `GITHUB_TOKEN` may already be set. Resolve `latest` first when `TAG` is empty. Once an explicit tag or version is known, use:
|
||||
|
||||
```bash
|
||||
python3 ci/update_lance_dependency.py "$TAG" --github-output "$GITHUB_OUTPUT"
|
||||
```
|
||||
|
||||
Then use the emitted `branch_name`, `commit_message`, and `pr_title` values for branch, commit, and PR creation.
|
||||
@@ -4,16 +4,14 @@ on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
tag:
|
||||
description: "Tag name from Lance. If omitted, the skill will use the latest Lance release that needs an update."
|
||||
required: false
|
||||
default: ""
|
||||
description: "Tag name from Lance"
|
||||
required: true
|
||||
type: string
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
tag:
|
||||
description: "Tag name from Lance. Leave empty to use the latest Lance release that needs an update."
|
||||
required: false
|
||||
default: ""
|
||||
description: "Tag name from Lance"
|
||||
required: true
|
||||
type: string
|
||||
|
||||
permissions:
|
||||
@@ -27,7 +25,7 @@ jobs:
|
||||
steps:
|
||||
- name: Show inputs
|
||||
run: |
|
||||
echo "tag = ${{ inputs.tag || 'latest' }}"
|
||||
echo "tag = ${{ inputs.tag }}"
|
||||
|
||||
- name: Checkout Repo LanceDB
|
||||
uses: actions/checkout@v4
|
||||
@@ -73,21 +71,65 @@ jobs:
|
||||
OPENAI_API_KEY: ${{ secrets.CODEX_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TARGET_TAG="${TAG:-latest}"
|
||||
VERSION="${TAG#refs/tags/}"
|
||||
VERSION="${VERSION#v}"
|
||||
BRANCH_NAME="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
|
||||
|
||||
# Use "chore" for beta/rc versions, "feat" for stable releases
|
||||
if [[ "${VERSION}" == *beta* ]] || [[ "${VERSION}" == *rc* ]]; then
|
||||
COMMIT_TYPE="chore"
|
||||
else
|
||||
COMMIT_TYPE="feat"
|
||||
fi
|
||||
|
||||
cat <<EOF >/tmp/codex-prompt.txt
|
||||
You are running inside the lancedb repository on a GitHub Actions runner.
|
||||
You are running inside the lancedb repository on a GitHub Actions runner. Update the Lance dependency to version ${VERSION} and prepare a pull request for maintainers to review.
|
||||
|
||||
Use \$lancedb-update-lance-dependency with target "${TARGET_TAG}".
|
||||
Follow these steps exactly:
|
||||
1. Use script "ci/set_lance_version.py" to update Lance Rust dependencies. The script already refreshes Cargo metadata, so allow it to finish even if it takes time.
|
||||
2. Update the Java lance-core dependency version in "java/pom.xml": change the "<lance-core.version>...</lance-core.version>" property to "${VERSION}".
|
||||
3. Run "cargo clippy --workspace --tests --all-features -- -D warnings". If diagnostics appear, fix them yourself and rerun clippy until it exits cleanly. Do not skip any warnings.
|
||||
4. After clippy succeeds, run "cargo fmt --all" to format the workspace.
|
||||
5. Ensure the repository is clean except for intentional changes. Inspect "git status --short" and "git diff" to confirm the dependency update and any required fixes.
|
||||
6. Create and switch to a new branch named "${BRANCH_NAME}" (replace any duplicated hyphens if necessary).
|
||||
7. Stage all relevant files with "git add -A". Commit using the message "${COMMIT_TYPE}: update lance dependency to v${VERSION}".
|
||||
8. Push the branch to origin. If the remote branch already exists, delete it first with "gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/${BRANCH_NAME}" then push with "git push origin ${BRANCH_NAME}". Do NOT use "git push --force" or "git push -f".
|
||||
9. env "GH_TOKEN" is available, use "gh" tools for github related operations like creating pull request.
|
||||
10. Create a pull request targeting "main" with title "${COMMIT_TYPE}: update lance dependency to v${VERSION}". First, write the PR body to /tmp/pr-body.md using a heredoc (cat <<'EOF' > /tmp/pr-body.md). The body should summarize the dependency bump, clippy/fmt verification, and link the triggering tag (${TAG}). Then run "gh pr create --body-file /tmp/pr-body.md".
|
||||
11. After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
|
||||
|
||||
Constraints:
|
||||
- Use env "GH_TOKEN" for GitHub operations.
|
||||
- Do not merge the pull request.
|
||||
- Do not force-push.
|
||||
- Do not create a duplicate pull request if an open PR already exists for the target Lance version.
|
||||
- If any command fails, diagnose and fix the root cause instead of aborting.
|
||||
- After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
|
||||
- Use bash commands; avoid modifying GitHub workflow files other than through the scripted task above.
|
||||
- Do not merge the PR.
|
||||
- If any command fails, diagnose and fix the issue instead of aborting.
|
||||
EOF
|
||||
|
||||
printenv OPENAI_API_KEY | codex login --with-api-key
|
||||
codex --config shell_environment_policy.ignore_default_excludes=true exec --dangerously-bypass-approvals-and-sandbox "$(cat /tmp/codex-prompt.txt)"
|
||||
|
||||
- name: Trigger sophon dependency update
|
||||
env:
|
||||
TAG: ${{ inputs.tag }}
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
VERSION="${TAG#refs/tags/}"
|
||||
VERSION="${VERSION#v}"
|
||||
LANCEDB_BRANCH="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
|
||||
|
||||
echo "Triggering sophon workflow with:"
|
||||
echo " lance_ref: ${TAG#refs/tags/}"
|
||||
echo " lancedb_ref: ${LANCEDB_BRANCH}"
|
||||
|
||||
gh workflow run codex-bump-lancedb-lance.yml \
|
||||
--repo lancedb/sophon \
|
||||
-f lance_ref="${TAG#refs/tags/}" \
|
||||
-f lancedb_ref="${LANCEDB_BRANCH}"
|
||||
|
||||
- name: Show latest sophon workflow run
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "Latest sophon workflow run:"
|
||||
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
|
||||
|
||||
62
.github/workflows/lance-release-timer.yml
vendored
Normal file
62
.github/workflows/lance-release-timer.yml
vendored
Normal file
@@ -0,0 +1,62 @@
|
||||
name: Lance Release Timer
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "*/10 * * * *"
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
actions: write
|
||||
|
||||
concurrency:
|
||||
group: lance-release-timer
|
||||
cancel-in-progress: false
|
||||
|
||||
jobs:
|
||||
trigger-update:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Check for new Lance tag
|
||||
id: check
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
python3 ci/check_lance_release.py --github-output "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Look for existing PR
|
||||
if: steps.check.outputs.needs_update == 'true'
|
||||
id: pr
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TITLE="chore: update lance dependency to v${{ steps.check.outputs.latest_version }}"
|
||||
COUNT=$(gh pr list --search "\"$TITLE\" in:title" --state open --limit 1 --json number --jq 'length')
|
||||
if [ "$COUNT" -gt 0 ]; then
|
||||
echo "Open PR already exists for $TITLE"
|
||||
echo "pr_exists=true" >> "$GITHUB_OUTPUT"
|
||||
else
|
||||
echo "No existing PR for $TITLE"
|
||||
echo "pr_exists=false" >> "$GITHUB_OUTPUT"
|
||||
fi
|
||||
|
||||
- name: Trigger codex update workflow
|
||||
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
TAG=${{ steps.check.outputs.latest_tag }}
|
||||
gh workflow run codex-update-lance-dependency.yml -f tag=refs/tags/$TAG
|
||||
|
||||
- name: Show latest codex workflow run
|
||||
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
gh run list --workflow codex-update-lance-dependency.yml --limit 1 --json databaseId,url,displayTitle
|
||||
@@ -1,126 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Prepare a Lance dependency update for LanceDB."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
|
||||
try:
|
||||
from check_lance_release import parse_semver
|
||||
except ModuleNotFoundError:
|
||||
# Supports importing as ci.update_lance_dependency from tests or ad hoc checks.
|
||||
from ci.check_lance_release import parse_semver # type: ignore
|
||||
|
||||
|
||||
def normalize_version(raw: str) -> str:
|
||||
value = raw.strip()
|
||||
value = value.removeprefix("refs/tags/")
|
||||
value = value.removeprefix("v")
|
||||
try:
|
||||
parse_semver(value)
|
||||
except ValueError:
|
||||
raise ValueError(f"Unsupported Lance version or tag: {raw}")
|
||||
return value
|
||||
|
||||
|
||||
def normalized_tag(version: str) -> str:
|
||||
return f"v{version}"
|
||||
|
||||
|
||||
def branch_name(version: str) -> str:
|
||||
suffix = re.sub(r"[^a-zA-Z0-9]+", "-", version).strip("-")
|
||||
suffix = re.sub(r"-+", "-", suffix)
|
||||
return f"codex/update-lance-{suffix}"
|
||||
|
||||
|
||||
def commit_type(version: str) -> str:
|
||||
prerelease = version.split("-", maxsplit=1)[1] if "-" in version else ""
|
||||
return "chore" if "beta" in prerelease or "rc" in prerelease else "feat"
|
||||
|
||||
|
||||
def metadata_for(version: str) -> dict[str, str]:
|
||||
kind = commit_type(version)
|
||||
message = f"{kind}: update lance dependency to v{version}"
|
||||
return {
|
||||
"version": version,
|
||||
"tag": normalized_tag(version),
|
||||
"branch_name": branch_name(version),
|
||||
"commit_type": kind,
|
||||
"commit_message": message,
|
||||
"pr_title": message,
|
||||
}
|
||||
|
||||
|
||||
def run_command(cmd: Sequence[str], *, cwd: Path) -> None:
|
||||
subprocess.run(cmd, cwd=cwd, check=True)
|
||||
|
||||
|
||||
def update_java_lance_core_version(repo_root: Path, version: str) -> None:
|
||||
pom_path = repo_root / "java" / "pom.xml"
|
||||
contents = pom_path.read_text(encoding="utf-8")
|
||||
updated, count = re.subn(
|
||||
r"(<lance-core\.version>)[^<]+(</lance-core\.version>)",
|
||||
rf"\g<1>{version}\g<2>",
|
||||
contents,
|
||||
count=1,
|
||||
)
|
||||
if count != 1:
|
||||
raise RuntimeError(
|
||||
"Expected exactly one <lance-core.version> entry in java/pom.xml"
|
||||
)
|
||||
pom_path.write_text(updated, encoding="utf-8")
|
||||
|
||||
|
||||
def write_github_outputs(path: str | None, payload: dict[str, str]) -> None:
|
||||
if not path:
|
||||
return
|
||||
with open(path, "a", encoding="utf-8") as output:
|
||||
for key, value in payload.items():
|
||||
output.write(f"{key}={value}\n")
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"tag_or_version",
|
||||
help="Lance tag or version, for example refs/tags/v7.2.0-beta.1 or 7.2.0",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--repo-root",
|
||||
type=Path,
|
||||
default=Path(__file__).resolve().parents[1],
|
||||
help="Path to the lancedb repository root",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--github-output",
|
||||
default=None,
|
||||
help="Optional GitHub Actions output file to receive metadata fields",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--metadata-only",
|
||||
action="store_true",
|
||||
help="Only print derived metadata; do not modify dependency files",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
repo_root = args.repo_root.resolve()
|
||||
version = normalize_version(args.tag_or_version)
|
||||
payload = metadata_for(version)
|
||||
|
||||
if not args.metadata_only:
|
||||
run_command([sys.executable, "ci/set_lance_version.py", version], cwd=repo_root)
|
||||
update_java_lance_core_version(repo_root, version)
|
||||
|
||||
write_github_outputs(args.github_output, payload)
|
||||
print(json.dumps(payload, sort_keys=True))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -994,29 +994,6 @@ based on the row being updated (e.g. "my_col + 1")
|
||||
|
||||
***
|
||||
|
||||
### updateFieldMetadata()
|
||||
|
||||
```ts
|
||||
abstract updateFieldMetadata(updates): Promise<UpdateFieldMetadataResult>
|
||||
```
|
||||
|
||||
Update per-field (column) metadata.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **updates**: [`FieldMetadataUpdate`](../interfaces/FieldMetadataUpdate.md)[]
|
||||
One or more per-field updates. Each
|
||||
update's metadata is merged into the field's existing metadata by default;
|
||||
a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`UpdateFieldMetadataResult`](../interfaces/UpdateFieldMetadataResult.md)>
|
||||
|
||||
resolves to the new table version.
|
||||
|
||||
***
|
||||
|
||||
### vectorSearch()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -65,7 +65,6 @@
|
||||
- [DropNamespaceOptions](interfaces/DropNamespaceOptions.md)
|
||||
- [DropNamespaceResponse](interfaces/DropNamespaceResponse.md)
|
||||
- [ExecutableQuery](interfaces/ExecutableQuery.md)
|
||||
- [FieldMetadataUpdate](interfaces/FieldMetadataUpdate.md)
|
||||
- [FragmentStatistics](interfaces/FragmentStatistics.md)
|
||||
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
|
||||
- [FtsOptions](interfaces/FtsOptions.md)
|
||||
@@ -102,7 +101,6 @@
|
||||
- [TimeoutConfig](interfaces/TimeoutConfig.md)
|
||||
- [TlsConfig](interfaces/TlsConfig.md)
|
||||
- [TokenResponse](interfaces/TokenResponse.md)
|
||||
- [UpdateFieldMetadataResult](interfaces/UpdateFieldMetadataResult.md)
|
||||
- [UpdateOptions](interfaces/UpdateOptions.md)
|
||||
- [UpdateResult](interfaces/UpdateResult.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FieldMetadataUpdate
|
||||
|
||||
# Interface: FieldMetadataUpdate
|
||||
|
||||
A per-field metadata update, addressed by dot-path.
|
||||
|
||||
## Properties
|
||||
|
||||
### metadata
|
||||
|
||||
```ts
|
||||
metadata: Record<string, null | string>;
|
||||
```
|
||||
|
||||
Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
default; a value of `null` deletes that key.
|
||||
|
||||
***
|
||||
|
||||
### path
|
||||
|
||||
```ts
|
||||
path: string;
|
||||
```
|
||||
|
||||
Dot-separated path to the field. For a top-level column this is just its
|
||||
name; for a nested field it's the path, e.g. "a.b.c".
|
||||
|
||||
***
|
||||
|
||||
### replace?
|
||||
|
||||
```ts
|
||||
optional replace: boolean;
|
||||
```
|
||||
|
||||
If true, replace the field's entire metadata map instead of merging.
|
||||
@@ -1,15 +0,0 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / UpdateFieldMetadataResult
|
||||
|
||||
# Interface: UpdateFieldMetadataResult
|
||||
|
||||
## Properties
|
||||
|
||||
### version
|
||||
|
||||
```ts
|
||||
version: number;
|
||||
```
|
||||
@@ -1571,33 +1571,6 @@ describe("schema evolution", function () {
|
||||
expect(await table.schema()).toEqual(expectedSchema3);
|
||||
});
|
||||
|
||||
it("can update field metadata", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
const table = await con.createTable("fm", [
|
||||
{ id: 1, category: "a" },
|
||||
{ id: 2, category: "b" },
|
||||
]);
|
||||
|
||||
const res = await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { unit: "label", pii: "false" } },
|
||||
]);
|
||||
expect(res).toHaveProperty("version");
|
||||
expect(res.version).toBe(2);
|
||||
|
||||
let cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label");
|
||||
expect(cat?.metadata.get("pii")).toBe("false");
|
||||
|
||||
// merge: add a key, delete one via null, keep the rest
|
||||
await table.updateFieldMetadata([
|
||||
{ path: "category", metadata: { source: "import", pii: null } },
|
||||
]);
|
||||
cat = (await table.schema()).fields.find((f) => f.name === "category");
|
||||
expect(cat?.metadata.get("unit")).toBe("label"); // preserved
|
||||
expect(cat?.metadata.get("source")).toBe("import"); // added
|
||||
expect(cat?.metadata.has("pii")).toBe(false); // deleted
|
||||
});
|
||||
|
||||
it("can cast to various types", async function () {
|
||||
const con = await connect(tmpDir.name);
|
||||
|
||||
|
||||
@@ -42,7 +42,6 @@ export {
|
||||
AddResult,
|
||||
AddColumnsResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
UpdateResult,
|
||||
@@ -118,7 +117,6 @@ export {
|
||||
WriteProgress,
|
||||
LsmWriteSpec,
|
||||
ColumnAlteration,
|
||||
FieldMetadataUpdate,
|
||||
} from "./table";
|
||||
|
||||
export {
|
||||
|
||||
@@ -32,7 +32,6 @@ import {
|
||||
OptimizeStats,
|
||||
TableStatistics,
|
||||
Tags,
|
||||
UpdateFieldMetadataResult,
|
||||
UpdateResult,
|
||||
Table as _NativeTable,
|
||||
} from "./native";
|
||||
@@ -509,18 +508,6 @@ export abstract class Table {
|
||||
abstract alterColumns(
|
||||
columnAlterations: ColumnAlteration[],
|
||||
): Promise<AlterColumnsResult>;
|
||||
|
||||
/**
|
||||
* Update per-field (column) metadata.
|
||||
* @param {FieldMetadataUpdate[]} updates One or more per-field updates. Each
|
||||
* update's metadata is merged into the field's existing metadata by default;
|
||||
* a value of `null` deletes that key, and `replace: true` swaps the whole map.
|
||||
* @returns {Promise<UpdateFieldMetadataResult>} resolves to the new table version.
|
||||
*/
|
||||
abstract updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult>;
|
||||
|
||||
/**
|
||||
* Drop one or more columns from the dataset
|
||||
*
|
||||
@@ -1050,12 +1037,6 @@ export class LocalTable extends Table {
|
||||
return await this.inner.alterColumns(processedAlterations);
|
||||
}
|
||||
|
||||
async updateFieldMetadata(
|
||||
updates: FieldMetadataUpdate[],
|
||||
): Promise<UpdateFieldMetadataResult> {
|
||||
return await this.inner.updateFieldMetadata(updates);
|
||||
}
|
||||
|
||||
async dropColumns(columnNames: string[]): Promise<DropColumnsResult> {
|
||||
return await this.inner.dropColumns(columnNames);
|
||||
}
|
||||
@@ -1222,19 +1203,3 @@ export interface ColumnAlteration {
|
||||
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
|
||||
nullable?: boolean;
|
||||
}
|
||||
|
||||
/** A per-field metadata update, addressed by dot-path. */
|
||||
export interface FieldMetadataUpdate {
|
||||
/**
|
||||
* Dot-separated path to the field. For a top-level column this is just its
|
||||
* name; for a nested field it's the path, e.g. "a.b.c".
|
||||
*/
|
||||
path: string;
|
||||
/**
|
||||
* Metadata key/value pairs. Merged into the field's existing metadata by
|
||||
* default; a value of `null` deletes that key.
|
||||
*/
|
||||
metadata: Record<string, string | null>;
|
||||
/** If true, replace the field's entire metadata map instead of merging. */
|
||||
replace?: boolean;
|
||||
}
|
||||
|
||||
@@ -5,9 +5,8 @@ use std::collections::HashMap;
|
||||
|
||||
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration,
|
||||
FieldMetadataUpdate as LanceFieldMetadataUpdate, NewColumnTransform, OptimizeAction,
|
||||
OptimizeOptions, Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
|
||||
@@ -356,23 +355,6 @@ impl Table {
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: Vec<FieldMetadataUpdate>,
|
||||
) -> napi::Result<UpdateFieldMetadataResult> {
|
||||
let updates = updates
|
||||
.into_iter()
|
||||
.map(LanceFieldMetadataUpdate::from)
|
||||
.collect::<Vec<_>>();
|
||||
let res = self
|
||||
.inner_ref()?
|
||||
.update_field_metadata(&updates)
|
||||
.await
|
||||
.default_error()?;
|
||||
Ok(res.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<DropColumnsResult> {
|
||||
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
|
||||
@@ -765,29 +747,6 @@ pub struct ColumnAlteration {
|
||||
pub nullable: Option<bool>,
|
||||
}
|
||||
|
||||
/// A per-field metadata update, addressed by dot-path. Merges into the field's
|
||||
/// existing metadata by default; a `null` value deletes a key, and `replace`
|
||||
/// swaps the field's entire metadata map.
|
||||
#[napi(object)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. "embedding" or "a.b.c").
|
||||
pub path: String,
|
||||
/// Metadata keys to set; a `null` value deletes that key.
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If true, replace the field's entire metadata map instead of merging.
|
||||
pub replace: Option<bool>,
|
||||
}
|
||||
|
||||
impl From<FieldMetadataUpdate> for LanceFieldMetadataUpdate {
|
||||
fn from(js: FieldMetadataUpdate) -> Self {
|
||||
Self {
|
||||
path: js.path,
|
||||
metadata: js.metadata,
|
||||
replace: js.replace.unwrap_or(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
|
||||
type Error = String;
|
||||
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
|
||||
@@ -1028,19 +987,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(value: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: value.version as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct DropColumnsResult {
|
||||
pub version: i64,
|
||||
|
||||
@@ -208,9 +208,6 @@ class Table:
|
||||
async def alter_columns(
|
||||
self, columns: list[dict[str, Any]]
|
||||
) -> AlterColumnsResult: ...
|
||||
async def update_field_metadata(
|
||||
self, updates: list[dict[str, Any]]
|
||||
) -> UpdateFieldMetadataResult: ...
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -220,6 +217,7 @@ class Table:
|
||||
async def uri(self) -> str: ...
|
||||
async def initial_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def latest_storage_options(self) -> Optional[Dict[str, str]]: ...
|
||||
async def _table_reopen_state(self) -> Dict[str, Any]: ...
|
||||
async def set_unenforced_primary_key(self, columns: List[str]) -> None: ...
|
||||
async def set_lsm_write_spec(self, spec: LsmWriteSpec) -> None: ...
|
||||
async def unset_lsm_write_spec(self) -> None: ...
|
||||
@@ -463,9 +461,6 @@ class AddColumnsResult:
|
||||
class AlterColumnsResult:
|
||||
version: int
|
||||
|
||||
class UpdateFieldMetadataResult:
|
||||
version: int
|
||||
|
||||
class DropColumnsResult:
|
||||
version: int
|
||||
|
||||
|
||||
@@ -358,36 +358,28 @@ DEFAULT_BATCH_SIZE = 100
|
||||
def _table_to_pickle_state(table: Table) -> dict[str, Any]:
|
||||
from .remote.table import RemoteTable
|
||||
|
||||
if isinstance(table, RemoteTable):
|
||||
return {
|
||||
"kind": "remote",
|
||||
"table": table,
|
||||
}
|
||||
|
||||
if not isinstance(table, LanceTable):
|
||||
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
|
||||
|
||||
base_uri = table._conn.uri
|
||||
if base_uri.startswith("memory://"):
|
||||
if isinstance(table, LanceTable) and table._conn.uri.startswith("memory://"):
|
||||
return {
|
||||
"kind": "memory",
|
||||
"name": table.name,
|
||||
"data": table.to_arrow(),
|
||||
}
|
||||
|
||||
return {
|
||||
"kind": "local",
|
||||
"name": table.name,
|
||||
"uri": base_uri,
|
||||
"namespace": table._namespace_path,
|
||||
"storage_options": table._conn.storage_options,
|
||||
}
|
||||
if isinstance(table, (LanceTable, RemoteTable)):
|
||||
return {
|
||||
"kind": "table",
|
||||
"table": table,
|
||||
}
|
||||
|
||||
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
|
||||
|
||||
|
||||
def _table_from_pickle_state(state: dict[str, Any]) -> Table:
|
||||
from . import connect
|
||||
|
||||
kind = state["kind"]
|
||||
if kind == "table":
|
||||
return state["table"]
|
||||
if kind == "remote":
|
||||
return state["table"]
|
||||
if kind == "memory":
|
||||
|
||||
@@ -25,7 +25,6 @@ from lancedb._lancedb import (
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
IndexConfig,
|
||||
@@ -75,6 +74,7 @@ class RemoteTable(Table):
|
||||
self._connection_state = connection_state
|
||||
self._namespace_path = list(namespace_path or [])
|
||||
self._checkout_version: Optional[int] = None
|
||||
self._table_state: Optional[dict[str, Any]] = None
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _serialized_connection_state(self) -> str:
|
||||
@@ -87,6 +87,16 @@ class RemoteTable(Table):
|
||||
self._connection_state = self._connection_state()
|
||||
return self._connection_state
|
||||
|
||||
def _reopen_state(self) -> dict[str, Any]:
|
||||
if self._table_state is not None:
|
||||
return self._table_state
|
||||
self._table_state = {
|
||||
"name": self._name,
|
||||
"namespace_path": self._namespace_path,
|
||||
"storage_options": None,
|
||||
}
|
||||
return self._table_state
|
||||
|
||||
@property
|
||||
def _table(self) -> AsyncTable:
|
||||
self._ensure_open()
|
||||
@@ -97,6 +107,7 @@ class RemoteTable(Table):
|
||||
def _table(self, table: AsyncTable) -> None:
|
||||
self._table_handle = table
|
||||
self._name = table.name
|
||||
self._table_state = None
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
@@ -109,7 +120,11 @@ class RemoteTable(Table):
|
||||
from lancedb import deserialize_conn
|
||||
|
||||
db = deserialize_conn(self._serialized_connection_state(), for_worker=True)
|
||||
table = db.open_table(self._name, namespace_path=self._namespace_path)
|
||||
table_state = self._reopen_state()
|
||||
table = db.open_table(
|
||||
table_state["name"],
|
||||
namespace_path=table_state["namespace_path"] or None,
|
||||
)
|
||||
if self._checkout_version is not None:
|
||||
table.checkout(self._checkout_version)
|
||||
|
||||
@@ -121,17 +136,24 @@ class RemoteTable(Table):
|
||||
return {
|
||||
"connection_state": self._serialized_connection_state(),
|
||||
"db_name": self.db_name,
|
||||
"name": self.name,
|
||||
"namespace_path": self._namespace_path,
|
||||
"table_state": self._reopen_state(),
|
||||
"checkout_version": self._checkout_version,
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict) -> None:
|
||||
self._table_handle = None
|
||||
self._name = state["name"]
|
||||
table_state = state.get("table_state")
|
||||
if table_state is None:
|
||||
table_state = {
|
||||
"name": state["name"],
|
||||
"namespace_path": state["namespace_path"],
|
||||
"storage_options": None,
|
||||
}
|
||||
self._table_state = table_state
|
||||
self._name = table_state["name"]
|
||||
self.db_name = state["db_name"]
|
||||
self._connection_state = state["connection_state"]
|
||||
self._namespace_path = state["namespace_path"]
|
||||
self._namespace_path = table_state["namespace_path"]
|
||||
self._checkout_version = state["checkout_version"]
|
||||
self._pid = None
|
||||
|
||||
@@ -851,11 +873,6 @@ class RemoteTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import os
|
||||
import deprecation
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
@@ -154,7 +155,6 @@ if TYPE_CHECKING:
|
||||
AddColumnsResult,
|
||||
AddResult,
|
||||
AlterColumnsResult,
|
||||
UpdateFieldMetadataResult,
|
||||
DeleteResult,
|
||||
DropColumnsResult,
|
||||
LsmWriteSpec,
|
||||
@@ -758,8 +758,12 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pass
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""The number of rows in this Table"""
|
||||
self._ensure_open()
|
||||
return self.count_rows(None)
|
||||
|
||||
@property
|
||||
@@ -1405,6 +1409,7 @@ class Table(ABC):
|
||||
pa.RecordBatch
|
||||
A record batch containing the rows at the given offsets.
|
||||
"""
|
||||
self._ensure_open()
|
||||
# We don't know the order of the results at all. So we calculate a permutation
|
||||
# for ordering the given offsets. Then we load the data with the _rowoffset
|
||||
# column. Then we sort by _rowoffset and apply the inverse of the permutation
|
||||
@@ -1800,29 +1805,6 @@ class Table(ABC):
|
||||
version: the new version number of the table after the alteration.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""
|
||||
Update per-field (column) metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
updates : dict
|
||||
One or more dicts, each with:
|
||||
- "path": str — dot-path to the field (e.g. "embedding" or "a.b.c").
|
||||
- "metadata": dict[str, str | None] — keys to set; a value of ``None``
|
||||
deletes that key.
|
||||
- "replace": bool, optional — replace the field's whole metadata map
|
||||
instead of merging (default False).
|
||||
|
||||
Returns
|
||||
-------
|
||||
UpdateFieldMetadataResult
|
||||
version: the new table version after the update.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
"""
|
||||
@@ -1986,6 +1968,7 @@ class LanceTable(Table):
|
||||
self._location = location # Store location for use in _dataset_path
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
self._init_reopen_tracking()
|
||||
if _async is not None:
|
||||
self._table = _async
|
||||
else:
|
||||
@@ -2001,6 +1984,66 @@ class LanceTable(Table):
|
||||
)
|
||||
)
|
||||
|
||||
def _init_reopen_tracking(self) -> None:
|
||||
self._checkout_version: Optional[int] = None
|
||||
self._table_state: Optional[dict[str, Any]] = None
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _reopen_state(self) -> dict[str, Any]:
|
||||
state = LOOP.run(self._table._table_reopen_state())
|
||||
if get_uri_scheme(self._conn.uri) == "memory":
|
||||
raise ValueError(
|
||||
"Cannot pickle an in-memory LanceTable. Use a persisted table "
|
||||
"or provide a worker-side connection factory."
|
||||
)
|
||||
return state
|
||||
|
||||
def _copy_reopened_table(self, table: "LanceTable") -> None:
|
||||
self._conn = table._conn
|
||||
self._namespace_path = table._namespace_path
|
||||
self._location = table._location
|
||||
self._namespace_client = table._namespace_client
|
||||
self._pushdown_operations = table._pushdown_operations
|
||||
self._table = table._table
|
||||
self._pid = os.getpid()
|
||||
|
||||
def _ensure_open(self) -> None:
|
||||
pid = os.getpid()
|
||||
if getattr(self, "_table", None) is not None and self._pid == pid:
|
||||
return
|
||||
if self._table_state is None:
|
||||
self._table_state = self._reopen_state()
|
||||
|
||||
table = self._conn.open_table(
|
||||
self._table_state["name"],
|
||||
namespace_path=self._table_state["namespace_path"] or None,
|
||||
storage_options=self._table_state["storage_options"],
|
||||
)
|
||||
if self._checkout_version is not None:
|
||||
table.checkout(self._checkout_version)
|
||||
self._copy_reopened_table(table)
|
||||
|
||||
def __getstate__(self) -> dict[str, Any]:
|
||||
return {
|
||||
"connection_state": self._conn.serialize(),
|
||||
"table_state": self._reopen_state(),
|
||||
"checkout_version": self._checkout_version,
|
||||
}
|
||||
|
||||
def __setstate__(self, state: dict[str, Any]) -> None:
|
||||
from . import deserialize_conn
|
||||
|
||||
self._conn = deserialize_conn(state["connection_state"], for_worker=True)
|
||||
self._namespace_path = list(state["table_state"]["namespace_path"] or [])
|
||||
self._location = None
|
||||
self._namespace_client = None
|
||||
self._pushdown_operations = set()
|
||||
self._checkout_version = state["checkout_version"]
|
||||
self._table_state = state["table_state"]
|
||||
self._table = None
|
||||
self._pid = None
|
||||
self._ensure_open()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._table.name
|
||||
@@ -2204,6 +2247,7 @@ class LanceTable(Table):
|
||||
0 [1.1, 0.9] vector
|
||||
"""
|
||||
LOOP.run(self._table.checkout(version))
|
||||
self._checkout_version = self.version
|
||||
|
||||
def checkout_latest(self):
|
||||
"""Checkout the latest version of the table. This is an in-place operation.
|
||||
@@ -2212,6 +2256,7 @@ class LanceTable(Table):
|
||||
version of the table.
|
||||
"""
|
||||
LOOP.run(self._table.checkout_latest())
|
||||
self._checkout_version = None
|
||||
|
||||
def restore(self, version: Optional[Union[int, str]] = None):
|
||||
"""Restore a version of the table. This is an in-place operation.
|
||||
@@ -2260,6 +2305,7 @@ class LanceTable(Table):
|
||||
if version is not None:
|
||||
LOOP.run(self._table.checkout(version))
|
||||
LOOP.run(self._table.restore())
|
||||
self._checkout_version = None
|
||||
|
||||
def count_rows(self, filter: Optional[str] = None) -> int:
|
||||
return LOOP.run(self._table.count_rows(filter))
|
||||
@@ -3318,6 +3364,7 @@ class LanceTable(Table):
|
||||
self._location = location
|
||||
self._namespace_client = namespace_client
|
||||
self._pushdown_operations = pushdown_operations or set()
|
||||
self._init_reopen_tracking()
|
||||
|
||||
if data_storage_version is not None:
|
||||
warnings.warn(
|
||||
@@ -3607,11 +3654,6 @@ class LanceTable(Table):
|
||||
) -> AlterColumnsResult:
|
||||
return LOOP.run(self._table.alter_columns(*alterations))
|
||||
|
||||
def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
return LOOP.run(self._table.update_field_metadata(*updates))
|
||||
|
||||
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
|
||||
return LOOP.run(self._table.drop_columns(columns))
|
||||
|
||||
@@ -3666,18 +3708,10 @@ class LanceTable(Table):
|
||||
"""
|
||||
LOOP.run(self._table.migrate_v2_manifest_paths())
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
@@ -4588,6 +4622,10 @@ class AsyncTable:
|
||||
"""
|
||||
return await self._inner.latest_storage_options()
|
||||
|
||||
async def _table_reopen_state(self) -> dict[str, Any]:
|
||||
"""Get the Rust-side table state needed to reopen this table."""
|
||||
return await self._inner._table_reopen_state()
|
||||
|
||||
async def add(
|
||||
self,
|
||||
data: DATA,
|
||||
@@ -5271,13 +5309,6 @@ class AsyncTable:
|
||||
"""
|
||||
return await self._inner.alter_columns(alterations)
|
||||
|
||||
async def update_field_metadata(
|
||||
self, *updates: dict[str, Any]
|
||||
) -> UpdateFieldMetadataResult:
|
||||
"""Update per-field metadata. See
|
||||
[`Table.update_field_metadata`][lancedb.table.Table.update_field_metadata]."""
|
||||
return await self._inner.update_field_metadata(updates)
|
||||
|
||||
async def drop_columns(self, columns: Iterable[str]):
|
||||
"""
|
||||
Drop columns from the table.
|
||||
@@ -5562,20 +5593,12 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.migrate_manifest_paths_v2()
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.33.1",
|
||||
current_version=__version__,
|
||||
details="Use update_field_metadata() instead.",
|
||||
)
|
||||
async def replace_field_metadata(
|
||||
self, field_name: str, new_metadata: dict[str, str]
|
||||
):
|
||||
"""
|
||||
Replace the metadata of a field in the schema
|
||||
|
||||
.. deprecated:: 0.33.1
|
||||
Use :func:`update_field_metadata` instead.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
field_name: str
|
||||
|
||||
@@ -215,10 +215,51 @@ def test_remote_table_is_picklable():
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.open_table("test")
|
||||
state = table.__getstate__()
|
||||
assert state["table_state"] == {
|
||||
"name": "test",
|
||||
"namespace_path": [],
|
||||
"storage_options": None,
|
||||
}
|
||||
restored = pickle.loads(pickle.dumps(table))
|
||||
assert restored.count_rows() == 3
|
||||
|
||||
|
||||
def test_remote_table_reopens_when_pid_changes_without_cached_state():
|
||||
def handler(request):
|
||||
request.close_connection = True
|
||||
if request.path == "/v1/table/test/describe/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(
|
||||
{
|
||||
"version": 1,
|
||||
"schema": {
|
||||
"fields": [
|
||||
{"name": "id", "type": {"type": "int64"}, "nullable": False}
|
||||
]
|
||||
},
|
||||
}
|
||||
)
|
||||
request.wfile.write(payload.encode())
|
||||
elif request.path == "/v1/table/test/count_rows/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"3")
|
||||
else:
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.open_table("test")
|
||||
table._pid = -1
|
||||
table._table_state = None
|
||||
|
||||
assert table.count_rows() == 3
|
||||
|
||||
|
||||
def test_remote_table_open_does_not_require_picklable_client_config():
|
||||
from lancedb.remote import HeaderProvider
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
import warnings
|
||||
from datetime import date, datetime, timedelta
|
||||
@@ -48,6 +49,36 @@ def test_basic(mem_db: DBConnection):
|
||||
assert table.to_arrow() == expected_data
|
||||
|
||||
|
||||
def test_lance_table_is_picklable(tmp_db: DBConnection):
|
||||
table = tmp_db.create_table("pickle_table", pa.table({"id": [1, 2, 3]}))
|
||||
|
||||
restored = pickle.loads(pickle.dumps(table))
|
||||
|
||||
assert restored.name == "pickle_table"
|
||||
assert restored.count_rows() == 3
|
||||
assert restored.to_arrow().column("id").to_pylist() == [1, 2, 3]
|
||||
|
||||
|
||||
def test_lance_table_pickle_preserves_checkout(tmp_db: DBConnection):
|
||||
table = tmp_db.create_table("pickle_checkout", pa.table({"id": [1]}))
|
||||
table.add(pa.table({"id": [2]}))
|
||||
table.checkout(1)
|
||||
|
||||
restored = pickle.loads(pickle.dumps(table))
|
||||
|
||||
assert restored.count_rows() == 1
|
||||
assert restored.to_arrow().column("id").to_pylist() == [1]
|
||||
restored.checkout_latest()
|
||||
assert restored.count_rows() == 2
|
||||
|
||||
|
||||
def test_memory_lance_table_pickle_is_unsupported(mem_db: DBConnection):
|
||||
table = mem_db.create_table("memory_pickle", pa.table({"id": [1]}))
|
||||
|
||||
with pytest.raises(ValueError, match="in-memory LanceTable"):
|
||||
pickle.dumps(table)
|
||||
|
||||
|
||||
def test_table_to_pandas_default_matches_arrow(tmp_db: DBConnection):
|
||||
pd = pytest.importorskip("pandas")
|
||||
data = pa.table({"id": [1, 2], "text": ["one", "two"]})
|
||||
@@ -2472,30 +2503,6 @@ def test_alter_columns(mem_db: DBConnection):
|
||||
assert table.to_arrow().column_names == ["new_id"]
|
||||
|
||||
|
||||
def test_update_field_metadata(mem_db: DBConnection):
|
||||
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
|
||||
table = mem_db.create_table("my_table", data=data)
|
||||
|
||||
res = table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"unit": "label", "pii": "false"}}
|
||||
)
|
||||
assert res.version == 2
|
||||
# Arrow field metadata is bytes-keyed
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"pii": b"false",
|
||||
}
|
||||
|
||||
# merge: add a key, delete one via None, keep the rest
|
||||
table.update_field_metadata(
|
||||
{"path": "category", "metadata": {"source": "import", "pii": None}}
|
||||
)
|
||||
assert table.schema.field("category").metadata == {
|
||||
b"unit": b"label",
|
||||
b"source": b"import",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_alter_columns_async(mem_db_async: AsyncConnection):
|
||||
data = pa.table({"id": [0, 1]})
|
||||
|
||||
@@ -16,7 +16,7 @@ use query::{FTSQuery, HybridQuery, Query, VectorQuery};
|
||||
use session::Session;
|
||||
use table::{
|
||||
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
|
||||
MergeResult, Table, UpdateFieldMetadataResult, UpdateResult,
|
||||
MergeResult, Table, UpdateResult,
|
||||
};
|
||||
|
||||
pub mod arrow;
|
||||
@@ -50,7 +50,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
|
||||
m.add_class::<RecordBatchStream>()?;
|
||||
m.add_class::<AddColumnsResult>()?;
|
||||
m.add_class::<AlterColumnsResult>()?;
|
||||
m.add_class::<UpdateFieldMetadataResult>()?;
|
||||
m.add_class::<AddResult>()?;
|
||||
m.add_class::<MergeResult>()?;
|
||||
m.add_class::<LsmWriteSpec>()?;
|
||||
|
||||
@@ -16,12 +16,12 @@ use arrow::{
|
||||
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
|
||||
};
|
||||
use lancedb::table::{
|
||||
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
|
||||
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
|
||||
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
|
||||
Table as LanceDbTable,
|
||||
};
|
||||
use pyo3::{
|
||||
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
|
||||
exceptions::{PyRuntimeError, PyValueError},
|
||||
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
|
||||
pyclass, pymethods,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
|
||||
};
|
||||
@@ -357,27 +357,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl UpdateFieldMetadataResult {
|
||||
pub fn __repr__(&self) -> String {
|
||||
format!("UpdateFieldMetadataResult(version={})", self.version)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
|
||||
fn from(result: lancedb::table::UpdateFieldMetadataResult) -> Self {
|
||||
Self {
|
||||
version: result.version,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pyclass(get_all, from_py_object)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DropColumnsResult {
|
||||
@@ -776,6 +755,23 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn _table_reopen_state(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let name = inner.name().to_string();
|
||||
let namespace_path = inner.namespace().to_vec();
|
||||
let storage_options = inner.initial_storage_options().await;
|
||||
|
||||
Python::attach(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("name", name)?;
|
||||
dict.set_item("namespace_path", namespace_path)?;
|
||||
dict.set_item("storage_options", storage_options)?;
|
||||
Ok(dict.unbind())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
None => format!("ClosedTable({})", self.name),
|
||||
@@ -1123,57 +1119,31 @@ impl Table {
|
||||
field_name: String,
|
||||
metadata: &Bound<'_, PyDict>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
// Deprecated: forwards to the update_field_metadata path (replace mode).
|
||||
let mut update = FieldMetadataUpdate::new(field_name).replace();
|
||||
for (key, value) in metadata.into_iter() {
|
||||
update = update.set(key.extract::<String>()?, value.extract::<String>()?);
|
||||
let mut new_metadata = HashMap::<String, String>::new();
|
||||
for (column_name, value) in metadata.into_iter() {
|
||||
let key: String = column_name.extract()?;
|
||||
let value: String = value.extract()?;
|
||||
new_metadata.insert(key, value);
|
||||
}
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.update_field_metadata(&[update]).await.infer_error()?;
|
||||
let native_tbl = inner
|
||||
.as_native()
|
||||
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
|
||||
let schema = native_tbl.manifest().await.infer_error()?.schema;
|
||||
let field = schema
|
||||
.field(&field_name)
|
||||
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
|
||||
|
||||
native_tbl
|
||||
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
|
||||
.await
|
||||
.infer_error()?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_field_metadata<'a>(
|
||||
self_: PyRef<'a, Self>,
|
||||
updates: Vec<Bound<PyDict>>,
|
||||
) -> PyResult<Bound<'a, PyAny>> {
|
||||
let updates = updates
|
||||
.iter()
|
||||
.map(|update| {
|
||||
let path: String = update
|
||||
.get_item("path")?
|
||||
.ok_or_else(|| PyValueError::new_err("Missing path"))?
|
||||
.extract()?;
|
||||
let mut field_update = FieldMetadataUpdate::new(path);
|
||||
if let Some(metadata) = update.get_item("metadata")? {
|
||||
let metadata_dict = metadata.cast::<PyDict>()?;
|
||||
for (key, value) in metadata_dict.iter() {
|
||||
let key: String = key.extract()?;
|
||||
if value.is_none() {
|
||||
field_update = field_update.remove(key);
|
||||
} else {
|
||||
field_update = field_update.set(key, value.extract::<String>()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(replace) = update.get_item("replace")?
|
||||
&& replace.extract::<bool>()?
|
||||
{
|
||||
field_update = field_update.replace();
|
||||
}
|
||||
Ok(field_update)
|
||||
})
|
||||
.collect::<PyResult<Vec<_>>>()?;
|
||||
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let result = inner.update_field_metadata(&updates).await.infer_error()?;
|
||||
Ok(UpdateFieldMetadataResult::from(result))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromPyObject)]
|
||||
|
||||
@@ -18,13 +18,13 @@ use crate::index::waiter::wait_for_index;
|
||||
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
|
||||
use crate::table::AddColumnsResult;
|
||||
use crate::table::AddResult;
|
||||
use crate::table::AlterColumnsResult;
|
||||
use crate::table::DeleteResult;
|
||||
use crate::table::DropColumnsResult;
|
||||
use crate::table::MergeResult;
|
||||
use crate::table::Tags;
|
||||
use crate::table::UpdateResult;
|
||||
use crate::table::query::create_multi_vector_plan;
|
||||
use crate::table::{AlterColumnsResult, FieldMetadataUpdate, UpdateFieldMetadataResult};
|
||||
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
|
||||
use crate::utils::background_cache::BackgroundCache;
|
||||
use crate::utils::{
|
||||
@@ -1968,35 +1968,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "updates": updates });
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/update_field_metadata/",
|
||||
self.identifier
|
||||
))
|
||||
.json(&body);
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let result: UpdateFieldMetadataResult =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse update_field_metadata response: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.check_mutable().await?;
|
||||
let body = serde_json::json!({ "columns": columns });
|
||||
@@ -2290,7 +2261,6 @@ mod tests {
|
||||
|
||||
use crate::remote::client::{ClientConfig, RetryConfig};
|
||||
use crate::table::AddDataMode;
|
||||
use crate::table::FieldMetadataUpdate;
|
||||
|
||||
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
|
||||
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch};
|
||||
@@ -6490,25 +6460,4 @@ mod tests {
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let table = Table::new_with_handler("my_table", |request| {
|
||||
assert_eq!(request.method(), "POST");
|
||||
assert_eq!(
|
||||
request.url().path(),
|
||||
"/v1/table/my_table/update_field_metadata/"
|
||||
);
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(r#"{"version": 7, "fields": {"category": {"unit": "label"}}}"#)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
let result = table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category").set("unit", "label")])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(result.version, 7);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,10 +91,7 @@ pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
|
||||
pub use schema_evolution::{
|
||||
AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate,
|
||||
UpdateFieldMetadataResult,
|
||||
};
|
||||
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
|
||||
use serde_with::skip_serializing_none;
|
||||
pub use update::{UpdateBuilder, UpdateResult};
|
||||
|
||||
@@ -663,19 +660,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
message: "create_insert_exec not implemented".to_string(),
|
||||
})
|
||||
}
|
||||
/// Update per-field metadata. Merges into existing metadata by default;
|
||||
/// [`FieldMetadataUpdate::remove`] deletes a key and
|
||||
/// [`FieldMetadataUpdate::replace`] swaps the field's whole map.
|
||||
///
|
||||
/// The default returns `NotSupported`; Lance-backed and remote tables override it.
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
_updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
Err(Error::NotSupported {
|
||||
message: "update_field_metadata is not supported on this table type".into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
@@ -1356,14 +1340,6 @@ impl Table {
|
||||
self.inner.alter_columns(alterations).await
|
||||
}
|
||||
|
||||
/// Update per-field metadata (merges by default).
|
||||
pub async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
self.inner.update_field_metadata(updates).await
|
||||
}
|
||||
|
||||
/// Remove columns from the table.
|
||||
pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
self.inner.drop_columns(columns).await
|
||||
@@ -2604,7 +2580,6 @@ impl NativeTable {
|
||||
/// field id and the second element is a hashmap of metadata key-value
|
||||
/// pairs.
|
||||
///
|
||||
#[deprecated(since = "0.33.1", note = "Use `update_field_metadata` instead")]
|
||||
pub async fn replace_field_metadata(
|
||||
&self,
|
||||
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
|
||||
@@ -2911,13 +2886,6 @@ impl BaseTable for NativeTable {
|
||||
schema_evolution::execute_alter_columns(self, alterations).await
|
||||
}
|
||||
|
||||
async fn update_field_metadata(
|
||||
&self,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
schema_evolution::execute_update_field_metadata(self, updates).await
|
||||
}
|
||||
|
||||
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
|
||||
schema_evolution::execute_drop_columns(self, columns).await
|
||||
}
|
||||
@@ -3168,6 +3136,7 @@ pub struct FragmentSummaryStats {
|
||||
#[cfg(test)]
|
||||
#[allow(deprecated)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
@@ -4480,10 +4449,10 @@ mod tests {
|
||||
Some(&"test_val2_update".to_string())
|
||||
);
|
||||
|
||||
let mut new_field_metadata = HashMap::<String, String>::new();
|
||||
new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
|
||||
native_tbl
|
||||
.update_field_metadata(&[
|
||||
FieldMetadataUpdate::new("i").set("test_field_key1", "test_field_val1")
|
||||
])
|
||||
.replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
|
||||
use lance::dataset::{ColumnAlteration, NewColumnTransform};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::NativeTable;
|
||||
use crate::Result;
|
||||
@@ -45,52 +44,6 @@ pub struct DropColumnsResult {
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// A single field's metadata update, addressed by dot-path.
|
||||
///
|
||||
/// Merges into the field's existing metadata by default. Use [`Self::remove`] to
|
||||
/// delete a key, or [`Self::replace`] to swap the field's entire metadata map.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
|
||||
pub struct FieldMetadataUpdate {
|
||||
/// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`).
|
||||
pub path: String,
|
||||
/// Keys to set (`Some`) or delete (`None`).
|
||||
pub metadata: HashMap<String, Option<String>>,
|
||||
/// If `true`, replace the field's entire metadata map instead of merging.
|
||||
pub replace: bool,
|
||||
}
|
||||
|
||||
impl FieldMetadataUpdate {
|
||||
pub fn new(path: impl Into<String>) -> Self {
|
||||
Self {
|
||||
path: path.into(),
|
||||
metadata: HashMap::new(),
|
||||
replace: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), Some(value.into()));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn remove(mut self, key: impl Into<String>) -> Self {
|
||||
self.metadata.insert(key.into(), None);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn replace(mut self) -> Self {
|
||||
self.replace = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct UpdateFieldMetadataResult {
|
||||
/// The commit version associated with the operation.
|
||||
#[serde(default)]
|
||||
pub version: u64,
|
||||
}
|
||||
|
||||
/// Internal implementation of the add columns logic.
|
||||
///
|
||||
/// Adds new columns to the table using the provided transforms.
|
||||
@@ -137,32 +90,6 @@ pub(crate) async fn execute_drop_columns(
|
||||
Ok(DropColumnsResult { version })
|
||||
}
|
||||
|
||||
/// Internal implementation of the update field metadata logic.
|
||||
///
|
||||
/// Merges or replaces per-field metadata, addressing fields by dot-path.
|
||||
pub(crate) async fn execute_update_field_metadata(
|
||||
table: &NativeTable,
|
||||
updates: &[FieldMetadataUpdate],
|
||||
) -> Result<UpdateFieldMetadataResult> {
|
||||
table.dataset.ensure_mutable()?;
|
||||
let mut dataset = (*table.dataset.get().await?).clone();
|
||||
|
||||
let mut builder = dataset.update_field_metadata();
|
||||
for update in updates {
|
||||
let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone()));
|
||||
builder = if update.replace {
|
||||
builder.replace(&update.path, entries)?
|
||||
} else {
|
||||
builder.update(&update.path, entries)?
|
||||
};
|
||||
}
|
||||
builder.await?;
|
||||
|
||||
let version = dataset.version().version;
|
||||
table.dataset.update(dataset);
|
||||
Ok(UpdateFieldMetadataResult { version })
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow_array::{Int32Array, StringArray, record_batch};
|
||||
@@ -170,7 +97,6 @@ mod tests {
|
||||
use futures::TryStreamExt;
|
||||
use lance::dataset::ColumnAlteration;
|
||||
|
||||
use super::FieldMetadataUpdate;
|
||||
use crate::connect;
|
||||
use crate::query::{ExecutableQuery, QueryBase, Select};
|
||||
use crate::table::NewColumnTransform;
|
||||
@@ -684,46 +610,4 @@ mod tests {
|
||||
let v4 = table.version().await.unwrap();
|
||||
assert_eq!(drop_result.version, v4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_field_metadata() {
|
||||
let conn = connect("memory://").execute().await.unwrap();
|
||||
let batch = record_batch!(
|
||||
("id", Int32, [1, 2, 3]),
|
||||
("category", Utf8, ["A", "B", "C"])
|
||||
)
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("test_update_field_metadata", batch)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Set metadata on a field.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("unit", "label")
|
||||
.set("pii", "false")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let field = schema.field_with_name("category").unwrap();
|
||||
assert_eq!(
|
||||
field.metadata().get("unit").map(String::as_str),
|
||||
Some("label")
|
||||
);
|
||||
|
||||
// Merge: add a key, delete one, keep the rest.
|
||||
table
|
||||
.update_field_metadata(&[FieldMetadataUpdate::new("category")
|
||||
.set("source", "import")
|
||||
.remove("pii")])
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = table.schema().await.unwrap();
|
||||
let md = schema.field_with_name("category").unwrap().metadata();
|
||||
assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved
|
||||
assert_eq!(md.get("source").map(String::as_str), Some("import")); // added
|
||||
assert!(!md.contains_key("pii")); // deleted
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user