Compare commits

..

1 Commits

Author SHA1 Message Date
Brendan Clement
b5e521c4a1 docs: add cross-SDK parity guidance for code review 2026-05-29 12:48:00 -07:00
51 changed files with 440 additions and 2565 deletions

View File

@@ -1,7 +0,0 @@
# Agent Skills
This directory contains repo-scoped code agent skills for the LanceDB project.
Each skill is a folder that contains a required `SKILL.md` and optional bundled resources.
Codex discovers skills from `.agents/skills` in the current working directory and parent directories.

View File

@@ -1,98 +0,0 @@
---
name: lancedb-update-lance-dependency
description: Update LanceDB to a specific Lance release or tag. Use when bumping Lance dependencies in the lancedb repository, including Rust workspace Lance crates, Java lance-core, validation, branch creation, commit, push, and PR creation when requested.
---
# LanceDB Update Lance Dependency
## Scope
Use this skill in the `lancedb/lancedb` repository when updating the Lance dependency to a specific Lance version or tag.
Inputs can be a version (`7.2.0-beta.1`), a tag (`v7.2.0-beta.1`), a tag ref (`refs/tags/v7.2.0-beta.1`), or `latest`.
## Workflow
1. Confirm the worktree status with `git status --short`.
2. Resolve the target Lance version:
- If the input is `latest`, empty, or omitted, run:
```bash
python3 ci/check_lance_release.py
```
Parse the JSON output. If `needs_update` is not `true`, stop without creating a PR. Otherwise use `latest_tag`.
- If the input is explicit, use it directly.
3. Compute update metadata without changing files:
```bash
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION" --metadata-only
```
Before making changes, check for an existing open PR with the emitted `pr_title`:
```bash
gh pr list --search "\"$PR_TITLE\" in:title" --state open --limit 1 --json number,url,title
```
If a matching open PR exists, stop and report it instead of creating a duplicate.
4. Run the deterministic update entrypoint:
```bash
python3 ci/update_lance_dependency.py "$TAG_OR_VERSION"
```
This updates the Rust workspace Lance dependencies through `ci/set_lance_version.py`, updates `java/pom.xml`, refreshes Cargo metadata, and prints JSON metadata containing `branch_name`, `commit_message`, and `pr_title`.
5. Run validation:
```bash
cargo clippy --quiet --workspace --tests --all-features -- -D warnings
cargo fmt --all --quiet
```
Fix real diagnostics and rerun clippy until it succeeds. Do not skip warnings.
6. Inspect `git status --short` and `git diff` to ensure only the Lance dependency update and required compatibility fixes are present.
7. If the task only asks to prepare local changes, stop here and report the changed files and validation result.
8. If the task asks to publish the update, create a branch using the printed `branch_name`, stage all relevant files, and commit using the printed `commit_message`. Do not amend or rewrite existing commits.
9. Push to `origin`. Before creating the PR, check that the current token has push permission:
```bash
gh api repos/lancedb/lancedb --jq .permissions.push
```
If the remote branch already exists for the same generated branch name, delete the remote ref with `gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/$BRANCH_NAME`, then push. Do not force-push.
10. Create a PR targeting `main` with the printed `pr_title`. If there is no PR template, keep the body to two or three concise sentences: state the Lance dependency bump, note any required compatibility fixes, and link the triggering Lance tag or release.
11. Read back the remote PR title after creation. If it is not a Conventional Commit title, fix it immediately.
12. When running in GitHub Actions after creating the LanceDB PR, trigger the Sophon dependency update:
```bash
gh workflow run codex-bump-lancedb-lance.yml \
--repo lancedb/sophon \
-f lance_ref="$LANCE_TAG" \
-f lancedb_ref="$BRANCH_NAME"
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle
```
Use the emitted metadata `tag` value as `LANCE_TAG`. Do this only after a new LanceDB PR has been created. If the update was skipped because no update is needed or an open PR already exists, do not trigger Sophon.
## GitHub Actions
When this skill is used from GitHub Actions, `TAG`, `GH_TOKEN`, and `GITHUB_TOKEN` may already be set. Resolve `latest` first when `TAG` is empty. Once an explicit tag or version is known, use:
```bash
python3 ci/update_lance_dependency.py "$TAG" --github-output "$GITHUB_OUTPUT"
```
Then use the emitted `branch_name`, `commit_message`, and `pr_title` values for branch, commit, and PR creation.

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.30.1-beta.0"
current_version = "0.30.0-beta.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -4,16 +4,14 @@ on:
workflow_call:
inputs:
tag:
description: "Tag name from Lance. If omitted, the skill will use the latest Lance release that needs an update."
required: false
default: ""
description: "Tag name from Lance"
required: true
type: string
workflow_dispatch:
inputs:
tag:
description: "Tag name from Lance. Leave empty to use the latest Lance release that needs an update."
required: false
default: ""
description: "Tag name from Lance"
required: true
type: string
permissions:
@@ -27,7 +25,7 @@ jobs:
steps:
- name: Show inputs
run: |
echo "tag = ${{ inputs.tag || 'latest' }}"
echo "tag = ${{ inputs.tag }}"
- name: Checkout Repo LanceDB
uses: actions/checkout@v4
@@ -73,21 +71,65 @@ jobs:
OPENAI_API_KEY: ${{ secrets.CODEX_TOKEN }}
run: |
set -euo pipefail
TARGET_TAG="${TAG:-latest}"
VERSION="${TAG#refs/tags/}"
VERSION="${VERSION#v}"
BRANCH_NAME="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
# Use "chore" for beta/rc versions, "feat" for stable releases
if [[ "${VERSION}" == *beta* ]] || [[ "${VERSION}" == *rc* ]]; then
COMMIT_TYPE="chore"
else
COMMIT_TYPE="feat"
fi
cat <<EOF >/tmp/codex-prompt.txt
You are running inside the lancedb repository on a GitHub Actions runner.
You are running inside the lancedb repository on a GitHub Actions runner. Update the Lance dependency to version ${VERSION} and prepare a pull request for maintainers to review.
Use \$lancedb-update-lance-dependency with target "${TARGET_TAG}".
Follow these steps exactly:
1. Use script "ci/set_lance_version.py" to update Lance Rust dependencies. The script already refreshes Cargo metadata, so allow it to finish even if it takes time.
2. Update the Java lance-core dependency version in "java/pom.xml": change the "<lance-core.version>...</lance-core.version>" property to "${VERSION}".
3. Run "cargo clippy --workspace --tests --all-features -- -D warnings". If diagnostics appear, fix them yourself and rerun clippy until it exits cleanly. Do not skip any warnings.
4. After clippy succeeds, run "cargo fmt --all" to format the workspace.
5. Ensure the repository is clean except for intentional changes. Inspect "git status --short" and "git diff" to confirm the dependency update and any required fixes.
6. Create and switch to a new branch named "${BRANCH_NAME}" (replace any duplicated hyphens if necessary).
7. Stage all relevant files with "git add -A". Commit using the message "${COMMIT_TYPE}: update lance dependency to v${VERSION}".
8. Push the branch to origin. If the remote branch already exists, delete it first with "gh api -X DELETE repos/lancedb/lancedb/git/refs/heads/${BRANCH_NAME}" then push with "git push origin ${BRANCH_NAME}". Do NOT use "git push --force" or "git push -f".
9. env "GH_TOKEN" is available, use "gh" tools for github related operations like creating pull request.
10. Create a pull request targeting "main" with title "${COMMIT_TYPE}: update lance dependency to v${VERSION}". First, write the PR body to /tmp/pr-body.md using a heredoc (cat <<'EOF' > /tmp/pr-body.md). The body should summarize the dependency bump, clippy/fmt verification, and link the triggering tag (${TAG}). Then run "gh pr create --body-file /tmp/pr-body.md".
11. After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
Constraints:
- Use env "GH_TOKEN" for GitHub operations.
- Do not merge the pull request.
- Do not force-push.
- Do not create a duplicate pull request if an open PR already exists for the target Lance version.
- If any command fails, diagnose and fix the root cause instead of aborting.
- After creating the PR, display the PR URL, "git status --short", and a concise summary of the commands run and their results.
- Use bash commands; avoid modifying GitHub workflow files other than through the scripted task above.
- Do not merge the PR.
- If any command fails, diagnose and fix the issue instead of aborting.
EOF
printenv OPENAI_API_KEY | codex login --with-api-key
codex --config shell_environment_policy.ignore_default_excludes=true exec --dangerously-bypass-approvals-and-sandbox "$(cat /tmp/codex-prompt.txt)"
- name: Trigger sophon dependency update
env:
TAG: ${{ inputs.tag }}
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
run: |
set -euo pipefail
VERSION="${TAG#refs/tags/}"
VERSION="${VERSION#v}"
LANCEDB_BRANCH="codex/update-lance-${VERSION//[^a-zA-Z0-9]/-}"
echo "Triggering sophon workflow with:"
echo " lance_ref: ${TAG#refs/tags/}"
echo " lancedb_ref: ${LANCEDB_BRANCH}"
gh workflow run codex-bump-lancedb-lance.yml \
--repo lancedb/sophon \
-f lance_ref="${TAG#refs/tags/}" \
-f lancedb_ref="${LANCEDB_BRANCH}"
- name: Show latest sophon workflow run
env:
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
run: |
set -euo pipefail
echo "Latest sophon workflow run:"
gh run list --repo lancedb/sophon --workflow codex-bump-lancedb-lance.yml --limit 1 --json databaseId,url,displayTitle

View File

@@ -0,0 +1,62 @@
name: Lance Release Timer
on:
schedule:
- cron: "*/10 * * * *"
workflow_dispatch:
permissions:
contents: read
actions: write
concurrency:
group: lance-release-timer
cancel-in-progress: false
jobs:
trigger-update:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Check for new Lance tag
id: check
env:
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
run: |
python3 ci/check_lance_release.py --github-output "$GITHUB_OUTPUT"
- name: Look for existing PR
if: steps.check.outputs.needs_update == 'true'
id: pr
env:
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
run: |
set -euo pipefail
TITLE="chore: update lance dependency to v${{ steps.check.outputs.latest_version }}"
COUNT=$(gh pr list --search "\"$TITLE\" in:title" --state open --limit 1 --json number --jq 'length')
if [ "$COUNT" -gt 0 ]; then
echo "Open PR already exists for $TITLE"
echo "pr_exists=true" >> "$GITHUB_OUTPUT"
else
echo "No existing PR for $TITLE"
echo "pr_exists=false" >> "$GITHUB_OUTPUT"
fi
- name: Trigger codex update workflow
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
env:
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
run: |
set -euo pipefail
TAG=${{ steps.check.outputs.latest_tag }}
gh workflow run codex-update-lance-dependency.yml -f tag=refs/tags/$TAG
- name: Show latest codex workflow run
if: steps.check.outputs.needs_update == 'true' && steps.pr.outputs.pr_exists != 'true'
env:
GH_TOKEN: ${{ secrets.ROBOT_TOKEN }}
run: |
set -euo pipefail
gh run list --workflow codex-update-lance-dependency.yml --limit 1 --json databaseId,url,displayTitle

346
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,20 +13,20 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.2.0-beta.3", default-features = false, "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.2.0-beta.3", "tag" = "v7.2.0-beta.3", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.2.0-beta.1", default-features = false, "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.2.0-beta.1", "tag" = "v7.2.0-beta.1", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "58.0.0", optional = false }

26
REVIEW.md Normal file
View File

@@ -0,0 +1,26 @@
# Code review guidelines
Repo-specific guidance for automated PR reviews.
## Cross-SDK parity
LanceDB exposes the same core (`rust/lancedb`) through Python, TypeScript (`nodejs`),
and Java bindings. Behavioral drift between SDKs is a recurring problem, so watch for
parity gaps when reviewing — but only flag real ones:
* If the change adds or modifies user-facing API or behavior in the shared core
(`rust/lancedb`), check whether each binding that should expose it (`python`,
`nodejs`) does. A core change with no corresponding binding update is worth a note.
* If the change adds or modifies a public API in one SDK but not the other, open the
sibling SDK's corresponding module and state whether an equivalent exists. If not,
note it as a possible parity gap and suggest a follow-up issue.
* For bug fixes, first read the sibling SDK's analogous code path to check whether the
same bug exists there. Only raise parity if it actually does. Do not ask to "port" a
fix for a bug that only ever existed in one binding.
* Stay silent on internal-only refactors, tests, docs, and changes with no cross-SDK
surface.
* Parity expectations apply to the Python and TypeScript (`nodejs`) SDKs. Java currently
implements only the remote table, not the local/embedded backend, so it is expected to
be partial — do not flag Java for missing local-only functionality.
* Keep parity feedback to a short, clearly-labeled note (e.g. "Possible SDK parity
gap: …"). It is advisory, not a merge blocker.

View File

@@ -1,126 +0,0 @@
#!/usr/bin/env python3
"""Prepare a Lance dependency update for LanceDB."""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
from typing import Sequence
try:
from check_lance_release import parse_semver
except ModuleNotFoundError:
# Supports importing as ci.update_lance_dependency from tests or ad hoc checks.
from ci.check_lance_release import parse_semver # type: ignore
def normalize_version(raw: str) -> str:
value = raw.strip()
value = value.removeprefix("refs/tags/")
value = value.removeprefix("v")
try:
parse_semver(value)
except ValueError:
raise ValueError(f"Unsupported Lance version or tag: {raw}")
return value
def normalized_tag(version: str) -> str:
return f"v{version}"
def branch_name(version: str) -> str:
suffix = re.sub(r"[^a-zA-Z0-9]+", "-", version).strip("-")
suffix = re.sub(r"-+", "-", suffix)
return f"codex/update-lance-{suffix}"
def commit_type(version: str) -> str:
prerelease = version.split("-", maxsplit=1)[1] if "-" in version else ""
return "chore" if "beta" in prerelease or "rc" in prerelease else "feat"
def metadata_for(version: str) -> dict[str, str]:
kind = commit_type(version)
message = f"{kind}: update lance dependency to v{version}"
return {
"version": version,
"tag": normalized_tag(version),
"branch_name": branch_name(version),
"commit_type": kind,
"commit_message": message,
"pr_title": message,
}
def run_command(cmd: Sequence[str], *, cwd: Path) -> None:
subprocess.run(cmd, cwd=cwd, check=True)
def update_java_lance_core_version(repo_root: Path, version: str) -> None:
pom_path = repo_root / "java" / "pom.xml"
contents = pom_path.read_text(encoding="utf-8")
updated, count = re.subn(
r"(<lance-core\.version>)[^<]+(</lance-core\.version>)",
rf"\g<1>{version}\g<2>",
contents,
count=1,
)
if count != 1:
raise RuntimeError(
"Expected exactly one <lance-core.version> entry in java/pom.xml"
)
pom_path.write_text(updated, encoding="utf-8")
def write_github_outputs(path: str | None, payload: dict[str, str]) -> None:
if not path:
return
with open(path, "a", encoding="utf-8") as output:
for key, value in payload.items():
output.write(f"{key}={value}\n")
def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"tag_or_version",
help="Lance tag or version, for example refs/tags/v7.2.0-beta.1 or 7.2.0",
)
parser.add_argument(
"--repo-root",
type=Path,
default=Path(__file__).resolve().parents[1],
help="Path to the lancedb repository root",
)
parser.add_argument(
"--github-output",
default=None,
help="Optional GitHub Actions output file to receive metadata fields",
)
parser.add_argument(
"--metadata-only",
action="store_true",
help="Only print derived metadata; do not modify dependency files",
)
args = parser.parse_args(argv)
repo_root = args.repo_root.resolve()
version = normalize_version(args.tag_or_version)
payload = metadata_for(version)
if not args.metadata_only:
run_command([sys.executable, "ci/set_lance_version.py", version], cwd=repo_root)
update_java_lance_core_version(repo_root, version)
write_github_outputs(args.github_output, payload)
print(json.dumps(payload, sort_keys=True))
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.30.1-beta.0</version>
<version>0.30.0-beta.1</version>
</dependency>
```

View File

@@ -1,43 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / BranchContents
# Class: BranchContents
## Constructors
### new BranchContents()
```ts
new BranchContents(): BranchContents
```
#### Returns
[`BranchContents`](BranchContents.md)
## Properties
### manifestSize
```ts
manifestSize: number;
```
***
### parentBranch?
```ts
optional parentBranch: string;
```
***
### parentVersion
```ts
parentVersion: number;
```

View File

@@ -1,90 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / Branches
# Class: Branches
Branch manager for a [Table](Table.md).
Unlike tags, `create` and `checkout` return a new [Table](Table.md) handle scoped
to the branch; writes on it do not affect `main`.
## Methods
### checkout()
```ts
checkout(name): Promise<Table>
```
Check out an existing branch and return a handle scoped to it.
#### Parameters
* **name**: `string`
#### Returns
`Promise`&lt;[`Table`](Table.md)&gt;
***
### create()
```ts
create(
name,
fromRef?,
fromVersion?): Promise<Table>
```
Create a branch and return a handle scoped to it.
#### Parameters
* **name**: `string`
Name of the new branch.
* **fromRef?**: `string`
Source branch to fork from. Defaults to `main`.
* **fromVersion?**: `number`
A specific version on `fromRef`. Defaults to latest.
#### Returns
`Promise`&lt;[`Table`](Table.md)&gt;
***
### delete()
```ts
delete(name): Promise<void>
```
Delete a branch.
#### Parameters
* **name**: `string`
#### Returns
`Promise`&lt;`void`&gt;
***
### list()
```ts
list(): Promise<Record<string, BranchContents>>
```
List all branches, mapping name to branch metadata.
#### Returns
`Promise`&lt;`Record`&lt;`string`, [`BranchContents`](BranchContents.md)&gt;&gt;

View File

@@ -110,23 +110,6 @@ containing the new version number of the table after altering the columns.
***
### branches()
```ts
abstract branches(): Promise<Branches>
```
Get the branch manager for this table.
Branches are isolated, writable lines of history forked from another
branch (or version). Writes on a branch do not affect `main`.
#### Returns
`Promise`&lt;[`Branches`](Branches.md)&gt;
***
### checkout()
```ts
@@ -1011,29 +994,6 @@ based on the row being updated (e.g. "my_col + 1")
***
### updateFieldMetadata()
```ts
abstract updateFieldMetadata(updates): Promise<UpdateFieldMetadataResult>
```
Update per-field (column) metadata.
#### Parameters
* **updates**: [`FieldMetadataUpdate`](../interfaces/FieldMetadataUpdate.md)[]
One or more per-field updates. Each
update's metadata is merged into the field's existing metadata by default;
a value of `null` deletes that key, and `replace: true` swaps the whole map.
#### Returns
`Promise`&lt;[`UpdateFieldMetadataResult`](../interfaces/UpdateFieldMetadataResult.md)&gt;
resolves to the new table version.
***
### vectorSearch()
```ts

View File

@@ -19,8 +19,6 @@
- [BooleanQuery](classes/BooleanQuery.md)
- [BoostQuery](classes/BoostQuery.md)
- [BranchContents](classes/BranchContents.md)
- [Branches](classes/Branches.md)
- [Connection](classes/Connection.md)
- [HeaderProvider](classes/HeaderProvider.md)
- [Index](classes/Index.md)
@@ -67,7 +65,6 @@
- [DropNamespaceOptions](interfaces/DropNamespaceOptions.md)
- [DropNamespaceResponse](interfaces/DropNamespaceResponse.md)
- [ExecutableQuery](interfaces/ExecutableQuery.md)
- [FieldMetadataUpdate](interfaces/FieldMetadataUpdate.md)
- [FragmentStatistics](interfaces/FragmentStatistics.md)
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
- [FtsOptions](interfaces/FtsOptions.md)
@@ -104,7 +101,6 @@
- [TimeoutConfig](interfaces/TimeoutConfig.md)
- [TlsConfig](interfaces/TlsConfig.md)
- [TokenResponse](interfaces/TokenResponse.md)
- [UpdateFieldMetadataResult](interfaces/UpdateFieldMetadataResult.md)
- [UpdateOptions](interfaces/UpdateOptions.md)
- [UpdateResult](interfaces/UpdateResult.md)
- [Version](interfaces/Version.md)

View File

@@ -1,41 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / FieldMetadataUpdate
# Interface: FieldMetadataUpdate
A per-field metadata update, addressed by dot-path.
## Properties
### metadata
```ts
metadata: Record<string, null | string>;
```
Metadata key/value pairs. Merged into the field's existing metadata by
default; a value of `null` deletes that key.
***
### path
```ts
path: string;
```
Dot-separated path to the field. For a top-level column this is just its
name; for a nested field it's the path, e.g. "a.b.c".
***
### replace?
```ts
optional replace: boolean;
```
If true, replace the field's entire metadata map instead of merging.

View File

@@ -1,15 +0,0 @@
[**@lancedb/lancedb**](../README.md) • **Docs**
***
[@lancedb/lancedb](../globals.md) / UpdateFieldMetadataResult
# Interface: UpdateFieldMetadataResult
## Properties
### version
```ts
version: number;
```

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.30.1-beta.0</version>
<version>0.30.0-beta.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.30.1-beta.0</version>
<version>0.30.0-beta.1</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.30.1-beta.0"
version = "0.30.0-beta.1"
publish = false
license.workspace = true
description.workspace = true

View File

@@ -85,39 +85,6 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
await expect(table.countRows()).resolves.toBe(3);
});
it("should support branches", async () => {
await table.add([{ id: 1 }]);
expect(await table.countRows()).toBe(1);
// fork an isolated, writable branch from main
const branch = await (await table.branches()).create("exp");
expect(await branch.countRows()).toBe(1);
await branch.add([{ id: 2 }]);
expect(await branch.countRows()).toBe(2);
// main is untouched by branch writes
expect(await table.countRows()).toBe(1);
// listed, with main (null) as the parent
const list = await (await table.branches()).list();
expect(Object.keys(list)).toContain("exp");
expect(list["exp"].parentBranch).toBeNull();
// fromRef="main" is equivalent to the default
await (await table.branches()).create("exp2", "main");
const list2 = await (await table.branches()).list();
expect(list2["exp2"].parentBranch).toBeNull();
// checkout returns a handle scoped to the branch's latest
const checkedOut = await (await table.branches()).checkout("exp");
expect(await checkedOut.countRows()).toBe(2);
// delete removes it
await (await table.branches()).delete("exp");
await (await table.branches()).delete("exp2");
const after = await (await table.branches()).list();
expect(Object.keys(after)).not.toContain("exp");
});
it("should show table stats", async () => {
await table.add([{ id: 1 }, { id: 2 }]);
await table.add([{ id: 1 }]);
@@ -1604,33 +1571,6 @@ describe("schema evolution", function () {
expect(await table.schema()).toEqual(expectedSchema3);
});
it("can update field metadata", async function () {
const con = await connect(tmpDir.name);
const table = await con.createTable("fm", [
{ id: 1, category: "a" },
{ id: 2, category: "b" },
]);
const res = await table.updateFieldMetadata([
{ path: "category", metadata: { unit: "label", pii: "false" } },
]);
expect(res).toHaveProperty("version");
expect(res.version).toBe(2);
let cat = (await table.schema()).fields.find((f) => f.name === "category");
expect(cat?.metadata.get("unit")).toBe("label");
expect(cat?.metadata.get("pii")).toBe("false");
// merge: add a key, delete one via null, keep the rest
await table.updateFieldMetadata([
{ path: "category", metadata: { source: "import", pii: null } },
]);
cat = (await table.schema()).fields.find((f) => f.name === "category");
expect(cat?.metadata.get("unit")).toBe("label"); // preserved
expect(cat?.metadata.get("source")).toBe("import"); // added
expect(cat?.metadata.has("pii")).toBe(false); // deleted
});
it("can cast to various types", async function () {
const con = await connect(tmpDir.name);

View File

@@ -38,12 +38,10 @@ export {
FragmentSummaryStats,
Tags,
TagContents,
BranchContents,
MergeResult,
AddResult,
AddColumnsResult,
AlterColumnsResult,
UpdateFieldMetadataResult,
DeleteResult,
DropColumnsResult,
UpdateResult,
@@ -112,7 +110,6 @@ export {
export {
Table,
Branches,
AddDataOptions,
UpdateOptions,
OptimizeOptions,
@@ -120,7 +117,6 @@ export {
WriteProgress,
LsmWriteSpec,
ColumnAlteration,
FieldMetadataUpdate,
} from "./table";
export {

View File

@@ -25,16 +25,13 @@ import {
AddColumnsSql,
AddResult,
AlterColumnsResult,
BranchContents,
DeleteResult,
DropColumnsResult,
IndexConfig,
IndexStatistics,
Branches as NativeBranches,
OptimizeStats,
TableStatistics,
Tags,
UpdateFieldMetadataResult,
UpdateResult,
Table as _NativeTable,
} from "./native";
@@ -511,18 +508,6 @@ export abstract class Table {
abstract alterColumns(
columnAlterations: ColumnAlteration[],
): Promise<AlterColumnsResult>;
/**
* Update per-field (column) metadata.
* @param {FieldMetadataUpdate[]} updates One or more per-field updates. Each
* update's metadata is merged into the field's existing metadata by default;
* a value of `null` deletes that key, and `replace: true` swaps the whole map.
* @returns {Promise<UpdateFieldMetadataResult>} resolves to the new table version.
*/
abstract updateFieldMetadata(
updates: FieldMetadataUpdate[],
): Promise<UpdateFieldMetadataResult>;
/**
* Drop one or more columns from the dataset
*
@@ -655,14 +640,6 @@ export abstract class Table {
*/
abstract tags(): Promise<Tags>;
/**
* Get the branch manager for this table.
*
* Branches are isolated, writable lines of history forked from another
* branch (or version). Writes on a branch do not affect `main`.
*/
abstract branches(): Promise<Branches>;
/**
* Restore the table to the currently checked out version
*
@@ -1060,12 +1037,6 @@ export class LocalTable extends Table {
return await this.inner.alterColumns(processedAlterations);
}
async updateFieldMetadata(
updates: FieldMetadataUpdate[],
): Promise<UpdateFieldMetadataResult> {
return await this.inner.updateFieldMetadata(updates);
}
async dropColumns(columnNames: string[]): Promise<DropColumnsResult> {
return await this.inner.dropColumns(columnNames);
}
@@ -1118,10 +1089,6 @@ export class LocalTable extends Table {
return await this.inner.tags();
}
async branches(): Promise<Branches> {
return new Branches(await this.inner.branches());
}
async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> {
let cleanupOlderThanMs;
if (
@@ -1236,67 +1203,3 @@ export interface ColumnAlteration {
/** Set the new nullability. Note that a nullable column cannot be made non-nullable. */
nullable?: boolean;
}
/** A per-field metadata update, addressed by dot-path. */
export interface FieldMetadataUpdate {
/**
* Dot-separated path to the field. For a top-level column this is just its
* name; for a nested field it's the path, e.g. "a.b.c".
*/
path: string;
/**
* Metadata key/value pairs. Merged into the field's existing metadata by
* default; a value of `null` deletes that key.
*/
metadata: Record<string, string | null>;
/** If true, replace the field's entire metadata map instead of merging. */
replace?: boolean;
}
/**
* Branch manager for a {@link Table}.
*
* Unlike tags, `create` and `checkout` return a new {@link Table} handle scoped
* to the branch; writes on it do not affect `main`.
*/
export class Branches {
#inner: NativeBranches;
/**
* Construct a Branches manager. Internal use only.
* @hidden
*/
constructor(inner: NativeBranches) {
this.#inner = inner;
}
/** List all branches, mapping name to branch metadata. */
async list(): Promise<Record<string, BranchContents>> {
return await this.#inner.list();
}
/**
* Create a branch and return a handle scoped to it.
*
* @param name Name of the new branch.
* @param fromRef Source branch to fork from. Defaults to `main`.
* @param fromVersion A specific version on `fromRef`. Defaults to latest.
*/
async create(
name: string,
fromRef?: string,
fromVersion?: number,
): Promise<Table> {
return new LocalTable(await this.#inner.create(name, fromRef, fromVersion));
}
/** Check out an existing branch and return a handle scoped to it. */
async checkout(name: string): Promise<Table> {
return new LocalTable(await this.#inner.checkout(name));
}
/** Delete a branch. */
async delete(name: string): Promise<void> {
return await this.#inner.delete(name);
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.30.1-beta.0",
"version": "0.30.0-beta.1",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -5,9 +5,8 @@ use std::collections::HashMap;
use lancedb::ipc::{ipc_file_to_batches, ipc_file_to_schema};
use lancedb::table::{
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration,
FieldMetadataUpdate as LanceFieldMetadataUpdate, NewColumnTransform, OptimizeAction,
OptimizeOptions, Ref, Table as LanceDbTable,
AddDataMode, ColumnAlteration as LanceColumnAlteration, Duration, NewColumnTransform,
OptimizeAction, OptimizeOptions, Table as LanceDbTable,
};
use napi::bindgen_prelude::*;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
@@ -356,23 +355,6 @@ impl Table {
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn update_field_metadata(
&self,
updates: Vec<FieldMetadataUpdate>,
) -> napi::Result<UpdateFieldMetadataResult> {
let updates = updates
.into_iter()
.map(LanceFieldMetadataUpdate::from)
.collect::<Vec<_>>();
let res = self
.inner_ref()?
.update_field_metadata(&updates)
.await
.default_error()?;
Ok(res.into())
}
#[napi(catch_unwind)]
pub async fn drop_columns(&self, columns: Vec<String>) -> napi::Result<DropColumnsResult> {
let col_refs = columns.iter().map(String::as_str).collect::<Vec<_>>();
@@ -478,13 +460,6 @@ impl Table {
})
}
#[napi(catch_unwind)]
pub async fn branches(&self) -> napi::Result<Branches> {
Ok(Branches {
inner: self.inner_ref()?.clone(),
})
}
#[napi(catch_unwind)]
pub async fn optimize(
&self,
@@ -772,29 +747,6 @@ pub struct ColumnAlteration {
pub nullable: Option<bool>,
}
/// A per-field metadata update, addressed by dot-path. Merges into the field's
/// existing metadata by default; a `null` value deletes a key, and `replace`
/// swaps the field's entire metadata map.
#[napi(object)]
pub struct FieldMetadataUpdate {
/// Dot-separated path to the field (e.g. "embedding" or "a.b.c").
pub path: String,
/// Metadata keys to set; a `null` value deletes that key.
pub metadata: HashMap<String, Option<String>>,
/// If true, replace the field's entire metadata map instead of merging.
pub replace: Option<bool>,
}
impl From<FieldMetadataUpdate> for LanceFieldMetadataUpdate {
fn from(js: FieldMetadataUpdate) -> Self {
Self {
path: js.path,
metadata: js.metadata,
replace: js.replace.unwrap_or(false),
}
}
}
impl TryFrom<ColumnAlteration> for LanceColumnAlteration {
type Error = String;
fn try_from(js: ColumnAlteration) -> std::result::Result<Self, Self::Error> {
@@ -1035,19 +987,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
}
}
#[napi(object)]
pub struct UpdateFieldMetadataResult {
pub version: i64,
}
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
fn from(value: lancedb::table::UpdateFieldMetadataResult) -> Self {
Self {
version: value.version as i64,
}
}
}
#[napi(object)]
pub struct DropColumnsResult {
pub version: i64,
@@ -1067,13 +1006,6 @@ pub struct TagContents {
pub manifest_size: i64,
}
#[napi]
pub struct BranchContents {
pub parent_branch: Option<String>,
pub parent_version: i64,
pub manifest_size: i64,
}
#[napi]
pub struct Tags {
inner: LanceDbTable,
@@ -1142,60 +1074,3 @@ impl Tags {
.default_error()
}
}
#[napi]
pub struct Branches {
inner: LanceDbTable,
}
#[napi]
impl Branches {
#[napi]
pub async fn list(&self) -> napi::Result<HashMap<String, BranchContents>> {
let branches = self.inner.list_branches().await.default_error()?;
let result = branches
.into_iter()
.map(|(k, v)| {
(
k,
BranchContents {
parent_branch: v.parent_branch,
parent_version: v.parent_version as i64,
manifest_size: v.manifest_size as i64,
},
)
})
.collect();
Ok(result)
}
#[napi]
pub async fn create(
&self,
name: String,
from_ref: Option<String>,
from_version: Option<i64>,
) -> napi::Result<Table> {
// "main" and None are two spellings of the root branch; normalize so
// from_ref = "main" behaves identically to the default.
let from_ref = from_ref.filter(|b| b != "main");
let from = Ref::Version(from_ref, from_version.map(|v| v as u64));
let table = self
.inner
.create_branch(&name, from)
.await
.default_error()?;
Ok(Table::new(table))
}
#[napi]
pub async fn checkout(&self, name: String) -> napi::Result<Table> {
let table = self.inner.checkout_branch(&name).await.default_error()?;
Ok(Table::new(table))
}
#[napi]
pub async fn delete(&self, name: String) -> napi::Result<()> {
self.inner.delete_branch(&name).await.default_error()
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.33.1-beta.0"
current_version = "0.33.0-beta.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.33.1-beta.0"
version = "0.33.0-beta.1"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"

View File

@@ -315,15 +315,6 @@ def deserialize_conn(
manifest_enabled=parsed.get("manifest_enabled", False),
namespace_client_properties=parsed.get("namespace_client_properties"),
)
elif connection_type == "remote":
return RemoteDBConnection(
parsed["db_url"],
parsed["api_key"],
parsed.get("region", "us-east-1"),
host_override=parsed.get("host_override"),
client_config=parsed.get("client_config"),
storage_options=storage_options,
)
else:
raise ValueError(f"Unknown connection_type: {connection_type}")

View File

@@ -208,9 +208,6 @@ class Table:
async def alter_columns(
self, columns: list[dict[str, Any]]
) -> AlterColumnsResult: ...
async def update_field_metadata(
self, updates: list[dict[str, Any]]
) -> UpdateFieldMetadataResult: ...
async def optimize(
self,
*,
@@ -226,8 +223,6 @@ class Table:
async def close_lsm_writers(self) -> None: ...
@property
def tags(self) -> Tags: ...
@property
def branches(self) -> Branches: ...
def query(self) -> Query: ...
def take_offsets(self, offsets: list[int]) -> TakeQuery: ...
def take_row_ids(self, row_ids: list[int]) -> TakeQuery: ...
@@ -240,17 +235,6 @@ class Tags:
async def delete(self, tag: str): ...
async def update(self, tag: str, version: int): ...
class Branches:
async def list(self) -> Dict[str, Any]: ...
async def create(
self,
name: str,
from_ref: Optional[str] = None,
from_version: Optional[int] = None,
) -> Table: ...
async def checkout(self, name: str) -> Table: ...
async def delete(self, name: str) -> None: ...
class IndexConfig:
name: str
index_type: str
@@ -476,9 +460,6 @@ class AddColumnsResult:
class AlterColumnsResult:
version: int
class UpdateFieldMetadataResult:
version: int
class DropColumnsResult:
version: int

View File

@@ -3,13 +3,12 @@
import copy
import json
import os
from deprecation import deprecated
import pyarrow as pa
from ._lancedb import async_permutation_builder, PermutationReader
from .table import LanceTable, Table
from .table import LanceTable
from .background_loop import LOOP
from .util import batch_to_tensor, batch_to_tensor_rows
from typing import Any, Callable, Iterator, Literal, Optional, TYPE_CHECKING, Union
@@ -355,49 +354,6 @@ class Transforms:
DEFAULT_BATCH_SIZE = 100
def _table_to_pickle_state(table: Table) -> dict[str, Any]:
from .remote.table import RemoteTable
if isinstance(table, RemoteTable):
return {
"kind": "remote",
"table": table,
}
if not isinstance(table, LanceTable):
raise ValueError(f"Cannot pickle table of type {type(table)!r}")
base_uri = table._conn.uri
if base_uri.startswith("memory://"):
return {
"kind": "memory",
"name": table.name,
"data": table.to_arrow(),
}
return {
"kind": "local",
"name": table.name,
"uri": base_uri,
"namespace": table._namespace_path,
"storage_options": table._conn.storage_options,
}
def _table_from_pickle_state(state: dict[str, Any]) -> Table:
from . import connect
kind = state["kind"]
if kind == "remote":
return state["table"]
if kind == "memory":
return connect("memory://").create_table(state["name"], state["data"])
if kind == "local":
db = connect(state["uri"], storage_options=state["storage_options"])
return db.open_table(state["name"], namespace_path=state["namespace"] or None)
raise ValueError(f"Unknown table pickle state kind: {kind}")
class Permutation:
"""
A Permutation is a view of a dataset that can be used as input to model training
@@ -413,15 +369,15 @@ class Permutation:
def __init__(
self,
base_table: Table,
permutation_table: Optional[Table],
base_table: LanceTable,
permutation_table: Optional[LanceTable],
split: int,
selection: dict[str, str],
batch_size: int,
transform_fn: Callable[pa.RecordBatch, Any],
offset: Optional[int] = None,
limit: Optional[int] = None,
connection_factory: Optional[Callable[[str], Table]] = None,
connection_factory: Optional[Callable[[str], LanceTable]] = None,
_reader: Optional[PermutationReader] = None,
):
"""
@@ -441,7 +397,6 @@ class Permutation:
if _reader is None:
_reader = LOOP.run(self._build_reader())
self.reader: PermutationReader = _reader
self._pid = os.getpid()
async def _build_reader(self) -> PermutationReader:
reader = await PermutationReader.from_tables(
@@ -473,25 +428,29 @@ class Permutation:
return new
def with_connection_factory(
self, connection_factory: Callable[[str], Table]
self, connection_factory: Callable[[str], LanceTable]
) -> "Permutation":
"""
Creates a new permutation that will use ``connection_factory`` to reopen
the base table when this permutation is unpickled in a worker process.
The factory is a callable that takes a single argument, the base table
name, and returns a LanceDB table. It must be picklable; the worker
The factory is a callable that takes a single argument the base table
name and returns a [LanceTable]. It must be picklable; the worker
will pickle it via standard ``pickle`` and call it to recover the base
table. Picklable callables in practice means top-level (module-level)
functions, ``functools.partial`` of such functions, or instances of
picklable classes implementing ``__call__``. Lambdas and closures over
local variables don't pickle with the default protocol.
A factory is optional for normal local and remote LanceDB connections:
if not set, ``__getstate__`` captures the table's own picklable reopen
state. Use a factory when that default state is not enough, for example
when credentials should be loaded from the worker environment instead
of being embedded in the pickle.
Setting a factory is necessary when the URI alone is not enough to
re-open the connection — most importantly for LanceDB Cloud (``db://``)
connections, where ``api_key`` and ``region`` aren't recoverable from
the connection object after construction.
For local file or cloud-storage paths the factory is optional: if not
set, ``__getstate__`` falls back to capturing
``(uri, storage_options, namespace_path)`` and re-opening via
``lancedb.connect(uri, storage_options=...)``.
Examples
--------
@@ -549,7 +508,7 @@ class Permutation:
return new
@classmethod
def identity(cls, table: Table) -> "Permutation":
def identity(cls, table: LanceTable) -> "Permutation":
"""
Creates an identity permutation for the given table.
"""
@@ -558,8 +517,8 @@ class Permutation:
@classmethod
def from_tables(
cls,
base_table: Table,
permutation_table: Optional[Table] = None,
base_table: LanceTable,
permutation_table: Optional[LanceTable] = None,
split: Optional[Union[str, int]] = None,
) -> "Permutation":
"""
@@ -635,10 +594,11 @@ class Permutation:
The base table is captured either via a user-supplied
``connection_factory`` (see [with_connection_factory]) or, as a
fallback, by the table's own picklable reopen state. The permutation
table is captured as a pyarrow Table (which pickles via Arrow IPC
natively). The reader is dropped from the wire format and rebuilt
lazily on first use.
fallback, by introspecting ``(uri, storage_options, namespace_path)``
on the connection. The permutation table — always an in-memory
LanceDB table — is captured as a pyarrow Table (which pickles via
Arrow IPC natively). The reader is dropped from the wire format;
``__setstate__`` rebuilds it from the restored tables.
"""
permutation_data: Optional[pa.Table] = None
if self.permutation_table is not None:
@@ -662,9 +622,39 @@ class Permutation:
# namespace from the existing connection.
return common
# URI-introspection fallback: only viable for native (OSS) connections
# where (uri, storage_options) is enough to reopen. Remote / cloud
# connections don't expose recoverable api_key / region — those users
# must call with_connection_factory().
try:
base_uri = self.base_table._conn.uri
storage_options = self.base_table._conn.storage_options
except AttributeError as e:
raise ValueError(
"Cannot pickle this Permutation: the base table's connection "
"does not expose a uri/storage_options, which usually means it "
"is a remote (LanceDB Cloud) connection. Call "
"Permutation.with_connection_factory(...) first to provide a "
"picklable callable that re-opens the base table from a worker "
"process."
) from e
if base_uri.startswith("memory://"):
# In-memory base tables don't exist in any worker process by
# default, so dump the entire base table into the pickle. This
# can be expensive for large datasets — users with large
# in-memory base tables should either persist them or set a
# connection_factory.
return {
**common,
"base_table_data": self.base_table.to_arrow(),
}
return {
**common,
"base_table_state": _table_to_pickle_state(self.base_table),
"base_table_uri": base_uri,
"base_table_namespace": self.base_table._namespace_path,
"base_table_storage_options": storage_options,
}
def __setstate__(self, state: dict[str, Any]) -> None:
@@ -673,8 +663,6 @@ class Permutation:
connection_factory = state["connection_factory"]
if connection_factory is not None:
base_table = connection_factory(state["base_table_name"])
elif "base_table_state" in state:
base_table = _table_from_pickle_state(state["base_table_state"])
elif "base_table_data" in state:
# In-memory base table inlined into the pickle; rebuild the same
# way we rebuild the in-memory permutation table.
@@ -692,7 +680,7 @@ class Permutation:
namespace_path=state["base_table_namespace"] or None,
)
permutation_table: Optional[Table] = None
permutation_table: Optional[LanceTable] = None
if state["permutation_data"] is not None:
mem_db = connect("memory://")
permutation_table = mem_db.create_table(
@@ -708,28 +696,10 @@ class Permutation:
self.offset = state["offset"]
self.limit = state["limit"]
self.connection_factory = connection_factory
self.reader = None
self._pid = None
def _ensure_open(self) -> None:
pid = os.getpid()
if self.reader is not None and getattr(self, "_pid", None) == pid:
return
# The reader owns Rust-side table handles. Rebuild it after unpickle or
# fork even though the Python table wrappers reopen themselves.
if hasattr(self.base_table, "_ensure_open"):
self.base_table._ensure_open()
if self.permutation_table is not None and hasattr(
self.permutation_table, "_ensure_open"
):
self.permutation_table._ensure_open()
self.reader = LOOP.run(self._build_reader())
self._pid = pid
@property
def schema(self) -> pa.Schema:
self._ensure_open()
async def do_output_schema():
return await self.reader.output_schema(self.selection)
@@ -747,7 +717,6 @@ class Permutation:
"""
The number of rows in the permutation
"""
self._ensure_open()
return self.reader.count_rows()
@property
@@ -906,7 +875,6 @@ class Permutation:
If skip_last_batch is True, the last batch will be skipped if it is not a
multiple of batch_size.
"""
self._ensure_open()
async def get_iter():
return await self.reader.read(self.selection, batch_size=batch_size)
@@ -1008,7 +976,6 @@ class Permutation:
so `with_format` and `with_transform` affect this method in the same way
they affect iteration.
"""
self._ensure_open()
async def do_take_offsets():
return await self.reader.take_offsets(offsets, selection=self.selection)
@@ -1044,11 +1011,9 @@ class Permutation:
"""
Skip the first `skip` rows of the permutation
"""
self._ensure_open()
new = copy.copy(self)
new.offset = skip
new.reader = LOOP.run(new._build_reader())
new._pid = os.getpid()
return new
@deprecated(details="Use with_take instead")
@@ -1067,11 +1032,9 @@ class Permutation:
"""
Limit the permutation to `limit` rows (following any `skip`)
"""
self._ensure_open()
new = copy.copy(self)
new.limit = limit
new.reader = LOOP.run(new._build_reader())
new._pid = os.getpid()
return new
@deprecated(details="Use with_repeat instead")

View File

@@ -3,7 +3,6 @@
from datetime import timedelta
import json
import logging
from concurrent.futures import ThreadPoolExecutor
import sys
@@ -18,7 +17,7 @@ else:
# Remove this import to fix circular dependency
# from lancedb import connect_async
from lancedb.remote import ClientConfig, RetryConfig, TimeoutConfig, TlsConfig
from lancedb.remote import ClientConfig
import pyarrow as pa
from ..common import DATA
@@ -37,64 +36,6 @@ from ..table import Table
from ..util import validate_table_name
def _duration_seconds(value: Optional[timedelta]) -> Optional[float]:
return value.total_seconds() if value is not None else None
def _timeout_config_to_dict(
config: Optional[TimeoutConfig],
) -> Optional[dict[str, Any]]:
if config is None:
return None
return {
"timeout": _duration_seconds(config.timeout),
"connect_timeout": _duration_seconds(config.connect_timeout),
"read_timeout": _duration_seconds(config.read_timeout),
"pool_idle_timeout": _duration_seconds(config.pool_idle_timeout),
}
def _retry_config_to_dict(config: RetryConfig) -> dict[str, Any]:
return {
"retries": config.retries,
"connect_retries": config.connect_retries,
"read_retries": config.read_retries,
"backoff_factor": config.backoff_factor,
"backoff_jitter": config.backoff_jitter,
"statuses": config.statuses,
}
def _tls_config_to_dict(config: Optional[TlsConfig]) -> Optional[dict[str, Any]]:
if config is None:
return None
return {
"cert_file": config.cert_file,
"key_file": config.key_file,
"ssl_ca_cert": config.ssl_ca_cert,
"assert_hostname": config.assert_hostname,
}
def _client_config_to_dict(config: ClientConfig) -> dict[str, Any]:
if config.header_provider is not None:
raise ValueError(
"Cannot serialize a remote connection with a header_provider. "
"Use static api_key/extra_headers or provide a worker-side "
"connection factory instead."
)
return {
"user_agent": config.user_agent,
"retry_config": _retry_config_to_dict(config.retry_config),
"timeout_config": _timeout_config_to_dict(config.timeout_config),
"extra_headers": config.extra_headers,
"id_delimiter": config.id_delimiter,
"tls_config": _tls_config_to_dict(config.tls_config),
"header_provider": None,
"user_id": config.user_id,
}
class RemoteDBConnection(DBConnection):
"""A connection to a remote LanceDB database."""
@@ -148,11 +89,6 @@ class RemoteDBConnection(DBConnection):
parsed = urlparse(db_url)
if parsed.scheme != "db":
raise ValueError(f"Invalid scheme: {parsed.scheme}, only accepts db://")
self.db_url = db_url
self.api_key = api_key
self.region = region
self.host_override = host_override
self.storage_options = storage_options
self.db_name = parsed.netloc
self.client_config = client_config
@@ -175,20 +111,6 @@ class RemoteDBConnection(DBConnection):
def __repr__(self) -> str:
return f"RemoteConnect(name={self.db_name})"
@override
def serialize(self) -> str:
return json.dumps(
{
"connection_type": "remote",
"db_url": self.db_url,
"api_key": self.api_key,
"region": self.region,
"host_override": self.host_override,
"client_config": _client_config_to_dict(self.client_config),
"storage_options": self.storage_options,
}
)
@override
def list_namespaces(
self,
@@ -409,12 +331,7 @@ class RemoteDBConnection(DBConnection):
)
table = LOOP.run(self._conn.open_table(name, namespace_path=namespace_path))
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=namespace_path,
)
return RemoteTable(table, self.db_name)
def clone_table(
self,
@@ -463,12 +380,7 @@ class RemoteDBConnection(DBConnection):
is_shallow=is_shallow,
)
)
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=target_namespace_path,
)
return RemoteTable(table, self.db_name)
@override
def create_table(
@@ -613,12 +525,7 @@ class RemoteDBConnection(DBConnection):
fill_value=fill_value,
)
)
return RemoteTable(
table,
self.db_name,
connection_state=self.serialize,
namespace_path=namespace_path,
)
return RemoteTable(table, self.db_name)
@override
def drop_table(self, name: str, namespace_path: Optional[List[str]] = None):

View File

@@ -5,7 +5,6 @@ from datetime import timedelta
import deprecation
import logging
from functools import cached_property
import os
from typing import (
Any,
Callable,
@@ -25,7 +24,6 @@ from lancedb._lancedb import (
AddColumnsResult,
AddResult,
AlterColumnsResult,
UpdateFieldMetadataResult,
DeleteResult,
DropColumnsResult,
IndexConfig,
@@ -65,80 +63,14 @@ class RemoteTable(Table):
self,
table: AsyncTable,
db_name: str,
*,
connection_state: Optional[Union[str, Callable[[], str]]] = None,
namespace_path: Optional[List[str]] = None,
):
self._table_handle = table
self._name = table.name
self._table = table
self.db_name = db_name
self._connection_state = connection_state
self._namespace_path = list(namespace_path or [])
self._checkout_version: Optional[int] = None
self._pid = os.getpid()
def _serialized_connection_state(self) -> str:
if self._connection_state is None:
raise RuntimeError(
"Cannot reopen this remote table because it does not carry "
"serialized connection state"
)
if callable(self._connection_state):
self._connection_state = self._connection_state()
return self._connection_state
@property
def _table(self) -> AsyncTable:
self._ensure_open()
assert self._table_handle is not None
return self._table_handle
@_table.setter
def _table(self, table: AsyncTable) -> None:
self._table_handle = table
self._name = table.name
self._pid = os.getpid()
def _ensure_open(self) -> None:
pid = os.getpid()
if self._table_handle is not None and self._pid == pid:
return
# Pickle clears the handle; fork inherits a handle created in the
# parent process. In both cases reopen before touching the Rust client.
from lancedb import deserialize_conn
db = deserialize_conn(self._serialized_connection_state(), for_worker=True)
table = db.open_table(self._name, namespace_path=self._namespace_path)
if self._checkout_version is not None:
table.checkout(self._checkout_version)
self._table_handle = table._table
self.db_name = table.db_name
self._pid = pid
def __getstate__(self) -> dict:
return {
"connection_state": self._serialized_connection_state(),
"db_name": self.db_name,
"name": self.name,
"namespace_path": self._namespace_path,
"checkout_version": self._checkout_version,
}
def __setstate__(self, state: dict) -> None:
self._table_handle = None
self._name = state["name"]
self.db_name = state["db_name"]
self._connection_state = state["connection_state"]
self._namespace_path = state["namespace_path"]
self._checkout_version = state["checkout_version"]
self._pid = None
@property
def name(self) -> str:
"""The name of the table"""
return self._name
return self._table.name
def __repr__(self) -> str:
return f"RemoteTable({self.db_name}.{self.name})"
@@ -188,19 +120,13 @@ class RemoteTable(Table):
raise NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
def checkout(self, version: Union[int, str]):
result = LOOP.run(self._table.checkout(version))
self._checkout_version = self.version
return result
return LOOP.run(self._table.checkout(version))
def checkout_latest(self):
result = LOOP.run(self._table.checkout_latest())
self._checkout_version = None
return result
return LOOP.run(self._table.checkout_latest())
def restore(self, version: Optional[Union[int, str]] = None):
result = LOOP.run(self._table.restore(version))
self._checkout_version = None
return result
return LOOP.run(self._table.restore(version))
def list_indices(self) -> Iterable[IndexConfig]:
"""List all the indices on the table"""
@@ -851,11 +777,6 @@ class RemoteTable(Table):
) -> AlterColumnsResult:
return LOOP.run(self._table.alter_columns(*alterations))
def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
return LOOP.run(self._table.update_field_metadata(*updates))
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))

View File

@@ -154,7 +154,6 @@ if TYPE_CHECKING:
AddColumnsResult,
AddResult,
AlterColumnsResult,
UpdateFieldMetadataResult,
DeleteResult,
DropColumnsResult,
LsmWriteSpec,
@@ -758,15 +757,6 @@ class Table(ABC):
"""
raise NotImplementedError
@property
def branches(self) -> "Branches":
"""Branch management for the table.
Branches are isolated, writable lines of history forked from another
branch (or version). Writes on a branch do not affect ``main``.
"""
raise NotImplementedError
def __len__(self) -> int:
"""The number of rows in this Table"""
return self.count_rows(None)
@@ -1809,29 +1799,6 @@ class Table(ABC):
version: the new version number of the table after the alteration.
"""
@abstractmethod
def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
"""
Update per-field (column) metadata.
Parameters
----------
updates : dict
One or more dicts, each with:
- "path": str — dot-path to the field (e.g. "embedding" or "a.b.c").
- "metadata": dict[str, str | None] — keys to set; a value of ``None``
deletes that key.
- "replace": bool, optional — replace the field's whole metadata map
instead of merging (default False).
Returns
-------
UpdateFieldMetadataResult
version: the new table version after the update.
"""
@abstractmethod
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
"""
@@ -2176,15 +2143,6 @@ class LanceTable(Table):
"""
return Tags(self._table)
@property
def branches(self) -> "Branches":
"""Branch management for the table.
``create``/``checkout`` return a new table handle scoped to the branch;
writes on it do not affect ``main``.
"""
return Branches(self._table)
def checkout(self, version: Union[int, str]):
"""Checkout a version of the table. This is an in-place operation.
@@ -3625,11 +3583,6 @@ class LanceTable(Table):
) -> AlterColumnsResult:
return LOOP.run(self._table.alter_columns(*alterations))
def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
return LOOP.run(self._table.update_field_metadata(*updates))
def drop_columns(self, columns: Iterable[str]) -> DropColumnsResult:
return LOOP.run(self._table.drop_columns(columns))
@@ -3684,18 +3637,10 @@ class LanceTable(Table):
"""
LOOP.run(self._table.migrate_v2_manifest_paths())
@deprecation.deprecated(
deprecated_in="0.33.1",
current_version=__version__,
details="Use update_field_metadata() instead.",
)
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
"""
Replace the metadata of a field in the schema
.. deprecated:: 0.33.1
Use :func:`update_field_metadata` instead.
Parameters
----------
field_name: str
@@ -5289,13 +5234,6 @@ class AsyncTable:
"""
return await self._inner.alter_columns(alterations)
async def update_field_metadata(
self, *updates: dict[str, Any]
) -> UpdateFieldMetadataResult:
"""Update per-field metadata. See
[`Table.update_field_metadata`][lancedb.table.Table.update_field_metadata]."""
return await self._inner.update_field_metadata(updates)
async def drop_columns(self, columns: Iterable[str]):
"""
Drop columns from the table.
@@ -5460,15 +5398,6 @@ class AsyncTable:
"""
return AsyncTags(self._inner)
@property
def branches(self) -> AsyncBranches:
"""Branch management for the table.
Branches are isolated, writable lines of history forked from another
branch (or version). Writes on a branch do not affect ``main``.
"""
return AsyncBranches(self._inner)
async def optimize(
self,
*,
@@ -5589,20 +5518,12 @@ class AsyncTable:
"""
await self._inner.migrate_manifest_paths_v2()
@deprecation.deprecated(
deprecated_in="0.33.1",
current_version=__version__,
details="Use update_field_metadata() instead.",
)
async def replace_field_metadata(
self, field_name: str, new_metadata: dict[str, str]
):
"""
Replace the metadata of a field in the schema
.. deprecated:: 0.33.1
Use :func:`update_field_metadata` instead.
Parameters
----------
field_name: str
@@ -5804,50 +5725,6 @@ class Tags:
LOOP.run(self._table.tags.update(tag, version))
class Branches:
"""
Table branch manager.
"""
def __init__(self, table):
self._table = table
def list(self) -> Dict[str, Any]:
"""List all branches, mapping name to branch metadata."""
return LOOP.run(self._table.branches.list())
def create(
self,
name: str,
from_ref: Optional[str] = None,
from_version: Optional[int] = None,
) -> "LanceTable":
"""Create a branch and return a handle scoped to it.
Parameters
----------
name: str
Name of the new branch.
from_ref: str, optional
Source branch to fork from. Defaults to ``main``.
from_version: int, optional
A specific version on ``from_ref`` to fork from. Defaults to latest.
"""
async_table = LOOP.run(
self._table.branches.create(name, from_ref, from_version)
)
return LanceTable.from_inner(async_table._inner)
def checkout(self, name: str) -> "LanceTable":
"""Check out an existing branch and return a handle scoped to it."""
async_table = LOOP.run(self._table.branches.checkout(name))
return LanceTable.from_inner(async_table._inner)
def delete(self, name: str) -> None:
"""Delete a branch."""
LOOP.run(self._table.branches.delete(name))
class AsyncTags:
"""
Async table tag manager.
@@ -5915,47 +5792,3 @@ class AsyncTags:
The new table version to tag.
"""
await self._table.tags.update(tag, version)
class AsyncBranches:
"""Async table branch manager."""
def __init__(self, table):
self._table = table
async def list(self) -> Dict[str, Any]:
"""List all branches, mapping name to branch metadata."""
return await self._table.branches.list()
async def create(
self,
name: str,
from_ref: Optional[str] = None,
from_version: Optional[int] = None,
) -> "AsyncTable":
"""Create a branch and return a handle scoped to it.
Parameters
----------
name: str
Name of the new branch.
from_ref: str, optional
Source branch to fork from. Defaults to ``main``.
from_version: int, optional
A specific version on ``from_ref`` to fork from. Defaults to latest.
"""
# "main" and None are two spellings of the root branch in lance; normalize
# so from_ref="main" behaves identically to the default.
if from_ref == "main":
from_ref = None
inner = await self._table.branches.create(name, from_ref, from_version)
return AsyncTable(inner)
async def checkout(self, name: str) -> "AsyncTable":
"""Check out an existing branch and return a handle scoped to it."""
inner = await self._table.branches.checkout(name)
return AsyncTable(inner)
async def delete(self, name: str) -> None:
"""Delete a branch."""
await self._table.branches.delete(name)

View File

@@ -1,13 +1,12 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import re
from concurrent.futures import ThreadPoolExecutor
import contextlib
from datetime import timedelta
import http.server
import json
import multiprocessing as mp
import pickle
import re
import sys
import threading
import time
@@ -172,155 +171,6 @@ def test_table_len_sync():
assert len(table) == 1
def test_remote_connection_serializes():
def handler(request):
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"tables": []}')
with mock_lancedb_connection(handler) as db:
serialized = json.loads(db.serialize())
assert isinstance(serialized["client_config"], dict)
restored = lancedb.deserialize_conn(db.serialize())
assert restored.table_names() == []
def test_remote_table_is_picklable():
def handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
{
"version": 1,
"schema": {
"fields": [
{"name": "id", "type": {"type": "int64"}, "nullable": False}
]
},
}
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"3")
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
table = db.open_table("test")
restored = pickle.loads(pickle.dumps(table))
assert restored.count_rows() == 3
def test_remote_table_open_does_not_require_picklable_client_config():
from lancedb.remote import HeaderProvider
class LocalHeaderProvider(HeaderProvider):
def get_headers(self):
return {"X-Test-Header": "present"}
def handler(request):
request.close_connection = True
assert request.headers.get("X-Test-Header") == "present"
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"3")
else:
request.send_response(404)
request.end_headers()
with http.server.HTTPServer(
("localhost", 0), make_mock_http_handler(handler)
) as server:
port = server.server_address[1]
handle = threading.Thread(target=server.serve_forever)
handle.start()
try:
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
"header_provider": LocalHeaderProvider(),
},
)
table = db.open_table("test")
assert table.count_rows() == 3
with pytest.raises(ValueError, match="header_provider"):
pickle.dumps(table)
finally:
server.shutdown()
handle.join()
def test_remote_permutation_is_picklable():
from lancedb.permutation import Permutation
rows = list(range(10))
def handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
payload = json.dumps(
{
"version": 1,
"schema": {
"fields": [
{"name": "a", "type": {"type": "int64"}, "nullable": False}
]
},
}
)
request.wfile.write(payload.encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(str(len(rows)).encode())
elif request.path == "/v1/table/test/query/":
content_len = int(request.headers.get("Content-Length"))
body = json.loads(request.rfile.read(content_len))
if "filter" in body:
match = re.search(r"_rowoffset in \((.*?)\)", body["filter"])
offsets = [int(offset.strip()) for offset in match.group(1).split(",")]
else:
offsets = rows
table = pa.table({"a": [rows[offset] for offset in offsets]})
request.send_response(200)
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
request.end_headers()
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
writer.write_table(table)
else:
request.send_response(404)
request.end_headers()
with mock_lancedb_connection(handler) as db:
permutation = Permutation.identity(db.open_table("test"))
restored = pickle.loads(pickle.dumps(permutation))
assert restored.__getitems__([0, 2, 4]) == [{"a": 0}, {"a": 2}, {"a": 4}]
def test_create_table_exist_ok():
def handler(request):
if request.path == "/v1/table/test/create/?mode=exist_ok":
@@ -1550,10 +1400,6 @@ def _remote_fork_child(port: int, queue) -> None:
queue.put(db.table_names())
def _remote_table_fork_child(table, queue) -> None:
queue.put(table.count_rows())
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
@@ -1616,65 +1462,3 @@ def test_remote_connection_after_fork():
finally:
server.shutdown()
server_thread.join()
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
"fork() is unavailable on Windows and unsafe on macOS "
"(Apple frameworks/TLS are not fork-safe)"
),
)
def test_inherited_remote_table_reopens_after_fork():
def handler(request):
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"version": 1, "schema": {"fields": []}}')
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b"7")
else:
request.send_response(404)
request.end_headers()
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
port = server.server_address[1]
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
try:
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
table = db.open_table("test")
assert table.count_rows() == 7
ctx = mp.get_context("fork")
queue = ctx.Queue()
proc = ctx.Process(target=_remote_table_fork_child, args=(table, queue))
proc.start()
proc.join(timeout=15)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
pytest.fail("Remote table hung after fork")
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no result"
assert queue.get() == 7
finally:
server.shutdown()
server_thread.join()

View File

@@ -903,79 +903,6 @@ async def test_async_tags(mem_db_async: AsyncConnection):
)
def test_branches(tmp_path):
db = lancedb.connect(tmp_path)
table = db.create_table(
"test",
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
],
)
assert table.count_rows() == 2
# fork an isolated, writable branch from main
branch = table.branches.create("exp")
assert branch.count_rows() == 2
branch.add(data=[{"vector": [10.0, 11.0], "item": "baz", "price": 30.0}])
# writes on the branch do not touch main
assert branch.count_rows() == 3
assert table.count_rows() == 2
# the branch is listed, with main (None) as its parent
branches = table.branches.list()
assert "exp" in branches
assert branches["exp"]["parent_branch"] is None
# from_ref="main" is equivalent to the default
table.branches.create("exp2", from_ref="main")
assert table.branches.list()["exp2"]["parent_branch"] is None
# checkout returns a handle scoped to the branch's latest
checked_out = table.branches.checkout("exp")
assert checked_out.count_rows() == 3
# delete removes it
table.branches.delete("exp")
table.branches.delete("exp2")
assert "exp" not in table.branches.list()
@pytest.mark.asyncio
async def test_async_branches(tmp_path):
db = await lancedb.connect_async(tmp_path)
table = await db.create_table(
"test",
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
],
)
assert await table.count_rows() == 2
branch = await table.branches.create("exp")
assert await branch.count_rows() == 2
await branch.add(data=[{"vector": [10.0, 11.0], "item": "baz", "price": 30.0}])
assert await branch.count_rows() == 3
assert await table.count_rows() == 2
branches = await table.branches.list()
assert "exp" in branches
assert branches["exp"]["parent_branch"] is None
await table.branches.create("exp2", from_ref="main")
assert (await table.branches.list())["exp2"]["parent_branch"] is None
checked_out = await table.branches.checkout("exp")
assert await checked_out.count_rows() == 3
await table.branches.delete("exp")
await table.branches.delete("exp2")
assert "exp" not in await table.branches.list()
@patch("lancedb.table.AsyncTable.create_index")
def test_create_index_method(mock_create_index, mem_db: DBConnection):
table = mem_db.create_table(
@@ -2545,30 +2472,6 @@ def test_alter_columns(mem_db: DBConnection):
assert table.to_arrow().column_names == ["new_id"]
def test_update_field_metadata(mem_db: DBConnection):
data = pa.table({"id": [0, 1], "category": ["a", "b"]})
table = mem_db.create_table("my_table", data=data)
res = table.update_field_metadata(
{"path": "category", "metadata": {"unit": "label", "pii": "false"}}
)
assert res.version == 2
# Arrow field metadata is bytes-keyed
assert table.schema.field("category").metadata == {
b"unit": b"label",
b"pii": b"false",
}
# merge: add a key, delete one via None, keep the rest
table.update_field_metadata(
{"path": "category", "metadata": {"source": "import", "pii": None}}
)
assert table.schema.field("category").metadata == {
b"unit": b"label",
b"source": b"import",
}
@pytest.mark.asyncio
async def test_alter_columns_async(mem_db_async: AsyncConnection):
data = pa.table({"id": [0, 1]})

View File

@@ -1,15 +1,10 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import contextlib
import functools
import http.server
import json
import multiprocessing as mp
import pickle
import re
import sys
import threading
import lancedb
import pyarrow as pa
@@ -20,107 +15,6 @@ from lancedb.util import tbl_to_tensor
torch = pytest.importorskip("torch")
REMOTE_ROWS = list(range(100))
def _make_mock_http_handler(handler):
class MockLanceDBHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
handler(self)
def do_POST(self):
handler(self)
return MockLanceDBHandler
def _remote_schema_payload():
return {
"version": 1,
"schema": {
"fields": [
{"name": "a", "type": {"type": "int64"}, "nullable": False},
]
},
}
def _offsets_from_filter(filter_sql: str | None) -> list[int]:
if filter_sql is None:
return REMOTE_ROWS
match = re.search(r"_rowoffset in \((.*?)\)", filter_sql)
if match is None:
return REMOTE_ROWS
raw_offsets = match.group(1).strip()
if raw_offsets == "":
return []
return [int(offset.strip()) for offset in raw_offsets.split(",")]
def _remote_dataset_handler(request):
request.close_connection = True
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(json.dumps(_remote_schema_payload()).encode())
elif request.path == "/v1/table/test/count_rows/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(str(len(REMOTE_ROWS)).encode())
elif request.path == "/v1/table/test/query/":
content_len = int(request.headers.get("Content-Length"))
body = json.loads(request.rfile.read(content_len))
offsets = _offsets_from_filter(body.get("filter"))
requested_columns = body.get("columns") or ["a"]
if isinstance(requested_columns, dict):
requested_columns = list(requested_columns)
data = {}
for column in requested_columns:
if column == "a":
data[column] = [REMOTE_ROWS[offset] for offset in offsets]
elif column == "_rowoffset":
data[column] = offsets
elif column == "_rowid":
data[column] = offsets
table = pa.table(data)
request.send_response(200)
request.send_header("Content-Type", "application/vnd.apache.arrow.file")
request.end_headers()
with pa.ipc.new_file(request.wfile, schema=table.schema) as writer:
writer.write_table(table)
else:
request.send_response(404)
request.end_headers()
@contextlib.contextmanager
def _remote_dataset_table():
with http.server.ThreadingHTTPServer(
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
) as server:
port = server.server_address[1]
handle = threading.Thread(target=server.serve_forever)
handle.start()
try:
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
yield db.open_table("test")
finally:
server.shutdown()
handle.join()
def _open_native_table(uri: str, table_name: str):
"""Top-level connection factory used by the explicit-factory pickle test.
@@ -213,39 +107,6 @@ def test_permutation_dataloader_multiprocessing(tmp_db):
assert seen == 1000
def test_remote_table_dataloader_multiprocessing():
with _remote_dataset_table() as table:
dataloader = torch.utils.data.DataLoader(
table,
collate_fn=tbl_to_tensor,
batch_size=10,
num_workers=2,
multiprocessing_context="spawn",
)
seen = 0
for batch in dataloader:
assert batch.size(0) == 1
assert batch.size(1) == 10
seen += batch.size(1)
assert seen == len(REMOTE_ROWS)
def test_remote_permutation_dataloader_multiprocessing():
with _remote_dataset_table() as table:
permutation = Permutation.identity(table)
dataloader = torch.utils.data.DataLoader(
permutation,
batch_size=10,
num_workers=2,
multiprocessing_context="spawn",
)
seen = 0
for batch in dataloader:
assert batch["a"].size(0) == 10
seen += batch["a"].size(0)
assert seen == len(REMOTE_ROWS)
def test_permutation_pickle_with_connection_factory(tmp_path):
"""When the user provides a connection_factory, pickling should round-trip
through that factory rather than introspecting the connection URI. Useful
@@ -310,35 +171,6 @@ def _multiworker_dataloader_target(db_uri: str, result_queue):
result_queue.put(count)
def _remote_multiworker_dataloader_target(port: int, result_queue):
import lancedb
from lancedb.permutation import Permutation
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
table = db.open_table("test")
permutation = Permutation.identity(table)
dataloader = torch.utils.data.DataLoader(
permutation,
batch_size=10,
num_workers=2,
multiprocessing_context="fork",
)
count = 0
for batch in dataloader:
assert batch["a"].size(0) == 10
count += 1
result_queue.put(count)
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
@@ -376,46 +208,3 @@ def test_permutation_dataloader_fork_workers(tmp_path):
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no batches"
assert queue.get() == 100
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
"fork() is unavailable on Windows and unsafe on macOS "
"(Apple frameworks/TLS are not fork-safe)"
),
)
def test_remote_permutation_dataloader_fork_workers():
with http.server.ThreadingHTTPServer(
("localhost", 0), _make_mock_http_handler(_remote_dataset_handler)
) as server:
port = server.server_address[1]
handle = threading.Thread(target=server.serve_forever)
handle.start()
try:
ctx = mp.get_context("spawn")
queue = ctx.Queue()
proc = ctx.Process(
target=_remote_multiworker_dataloader_target,
args=(port, queue),
)
proc.start()
proc.join(timeout=30)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
pytest.fail(
"Remote permutation hung when iterated in a fork-based "
"DataLoader worker"
)
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no batches"
assert queue.get() == 10
finally:
server.shutdown()
handle.join()

View File

@@ -16,7 +16,7 @@ use query::{FTSQuery, HybridQuery, Query, VectorQuery};
use session::Session;
use table::{
AddColumnsResult, AddResult, AlterColumnsResult, DeleteResult, DropColumnsResult, LsmWriteSpec,
MergeResult, Table, UpdateFieldMetadataResult, UpdateResult,
MergeResult, Table, UpdateResult,
};
pub mod arrow;
@@ -50,7 +50,6 @@ pub fn _lancedb(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<RecordBatchStream>()?;
m.add_class::<AddColumnsResult>()?;
m.add_class::<AlterColumnsResult>()?;
m.add_class::<UpdateFieldMetadataResult>()?;
m.add_class::<AddResult>()?;
m.add_class::<MergeResult>()?;
m.add_class::<LsmWriteSpec>()?;

View File

@@ -16,12 +16,12 @@ use arrow::{
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
};
use lancedb::table::{
AddDataMode, ColumnAlteration, Duration, FieldMetadataUpdate, NewColumnTransform,
OptimizeAction, OptimizeOptions, Ref, Table as LanceDbTable,
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
Table as LanceDbTable,
};
use pyo3::{
Bound, FromPyObject, Py, PyAny, PyRef, PyResult, Python,
exceptions::{PyRuntimeError, PyValueError},
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
};
@@ -357,27 +357,6 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct UpdateFieldMetadataResult {
pub version: u64,
}
#[pymethods]
impl UpdateFieldMetadataResult {
pub fn __repr__(&self) -> String {
format!("UpdateFieldMetadataResult(version={})", self.version)
}
}
impl From<lancedb::table::UpdateFieldMetadataResult> for UpdateFieldMetadataResult {
fn from(result: lancedb::table::UpdateFieldMetadataResult) -> Self {
Self {
version: result.version,
}
}
}
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct DropColumnsResult {
@@ -864,11 +843,6 @@ impl Table {
Ok(Tags::new(self.inner_ref()?.clone()))
}
#[getter]
pub fn branches(&self) -> PyResult<Branches> {
Ok(Branches::new(self.inner_ref()?.clone()))
}
#[pyo3(signature = (offsets))]
pub fn take_offsets(self_: PyRef<'_, Self>, offsets: Vec<u64>) -> PyResult<TakeQuery> {
Ok(TakeQuery::new(
@@ -1128,57 +1102,31 @@ impl Table {
field_name: String,
metadata: &Bound<'_, PyDict>,
) -> PyResult<Bound<'a, PyAny>> {
// Deprecated: forwards to the update_field_metadata path (replace mode).
let mut update = FieldMetadataUpdate::new(field_name).replace();
for (key, value) in metadata.into_iter() {
update = update.set(key.extract::<String>()?, value.extract::<String>()?);
let mut new_metadata = HashMap::<String, String>::new();
for (column_name, value) in metadata.into_iter() {
let key: String = column_name.extract()?;
let value: String = value.extract()?;
new_metadata.insert(key, value);
}
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.update_field_metadata(&[update]).await.infer_error()?;
let native_tbl = inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
let schema = native_tbl.manifest().await.infer_error()?.schema;
let field = schema
.field(&field_name)
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
native_tbl
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
.await
.infer_error()?;
Ok(())
})
}
pub fn update_field_metadata<'a>(
self_: PyRef<'a, Self>,
updates: Vec<Bound<PyDict>>,
) -> PyResult<Bound<'a, PyAny>> {
let updates = updates
.iter()
.map(|update| {
let path: String = update
.get_item("path")?
.ok_or_else(|| PyValueError::new_err("Missing path"))?
.extract()?;
let mut field_update = FieldMetadataUpdate::new(path);
if let Some(metadata) = update.get_item("metadata")? {
let metadata_dict = metadata.cast::<PyDict>()?;
for (key, value) in metadata_dict.iter() {
let key: String = key.extract()?;
if value.is_none() {
field_update = field_update.remove(key);
} else {
field_update = field_update.set(key, value.extract::<String>()?);
}
}
}
if let Some(replace) = update.get_item("replace")?
&& replace.extract::<bool>()?
{
field_update = field_update.replace();
}
Ok(field_update)
})
.collect::<PyResult<Vec<_>>>()?;
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let result = inner.update_field_metadata(&updates).await.infer_error()?;
Ok(UpdateFieldMetadataResult::from(result))
})
}
}
#[derive(FromPyObject)]
@@ -1270,66 +1218,3 @@ impl Tags {
})
}
}
#[pyclass]
pub struct Branches {
inner: LanceDbTable,
}
impl Branches {
pub fn new(table: LanceDbTable) -> Self {
Self { inner: table }
}
}
#[pymethods]
impl Branches {
pub fn list(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let res = inner.list_branches().await.infer_error()?;
Python::attach(|py| {
let py_dict = PyDict::new(py);
for (name, contents) in res {
let value = PyDict::new(py);
value.set_item("parent_branch", contents.parent_branch)?;
value.set_item("parent_version", contents.parent_version)?;
value.set_item("manifest_size", contents.manifest_size)?;
py_dict.set_item(name, value)?;
}
Ok(py_dict.unbind())
})
})
}
#[pyo3(signature = (name, from_ref=None, from_version=None))]
pub fn create(
self_: PyRef<'_, Self>,
name: String,
from_ref: Option<String>,
from_version: Option<u64>,
) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let from = Ref::Version(from_ref, from_version);
let table = inner.create_branch(&name, from).await.infer_error()?;
Ok(Table::new(table))
})
}
pub fn checkout(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
let table = inner.checkout_branch(&name).await.infer_error()?;
Ok(Table::new(table))
})
}
pub fn delete(self_: PyRef<'_, Self>, name: String) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner.delete_branch(&name).await.infer_error()?;
Ok(())
})
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.30.1-beta.0"
version = "0.30.0-beta.1"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -18,13 +18,13 @@ use crate::index::waiter::wait_for_index;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::AddColumnsResult;
use crate::table::AddResult;
use crate::table::AlterColumnsResult;
use crate::table::DeleteResult;
use crate::table::DropColumnsResult;
use crate::table::MergeResult;
use crate::table::Tags;
use crate::table::UpdateResult;
use crate::table::query::create_multi_vector_plan;
use crate::table::{AlterColumnsResult, FieldMetadataUpdate, UpdateFieldMetadataResult};
use crate::table::{AnyQuery, Filter, Predicate, PreprocessingOutput, TableStatistics};
use crate::utils::background_cache::BackgroundCache;
use crate::utils::{
@@ -1383,38 +1383,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
.map_err(unwrap_shared_error)
}
async fn create_branch(
&self,
_name: &str,
_from: lance::dataset::refs::Ref,
) -> Result<Arc<dyn BaseTable>> {
Err(Error::NotSupported {
message: "branching is not yet supported on remote tables".into(),
})
}
async fn checkout_branch(&self, _name: &str) -> Result<Arc<dyn BaseTable>> {
Err(Error::NotSupported {
message: "branching is not yet supported on remote tables".into(),
})
}
async fn list_branches(&self) -> Result<HashMap<String, lance::dataset::refs::BranchContents>> {
Err(Error::NotSupported {
message: "branching is not yet supported on remote tables".into(),
})
}
async fn delete_branch(&self, _name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "branching is not yet supported on remote tables".into(),
})
}
fn current_branch(&self) -> Option<String> {
None
}
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
@@ -2000,35 +1968,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(result)
}
async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
self.check_mutable().await?;
let body = serde_json::json!({ "updates": updates });
let request = self
.client
.post(&format!(
"/v1/table/{}/update_field_metadata/",
self.identifier
))
.json(&body);
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
let result: UpdateFieldMetadataResult =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse update_field_metadata response: {}", e).into(),
request_id,
status_code: None,
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
self.check_mutable().await?;
let body = serde_json::json!({ "columns": columns });
@@ -2322,7 +2261,6 @@ mod tests {
use crate::remote::client::{ClientConfig, RetryConfig};
use crate::table::AddDataMode;
use crate::table::FieldMetadataUpdate;
use arrow::{array::AsArray, compute::concat_batches, datatypes::Int32Type};
use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator, record_batch};
@@ -6522,25 +6460,4 @@ mod tests {
assert!(!headers.contains_key("x-lancedb-min-version"));
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
}
#[tokio::test]
async fn test_update_field_metadata() {
let table = Table::new_with_handler("my_table", |request| {
assert_eq!(request.method(), "POST");
assert_eq!(
request.url().path(),
"/v1/table/my_table/update_field_metadata/"
);
http::Response::builder()
.status(200)
.body(r#"{"version": 7, "fields": {"category": {"unit": "label"}}}"#)
.unwrap()
});
let result = table
.update_field_metadata(&[FieldMetadataUpdate::new("category").set("unit", "label")])
.await
.unwrap();
assert_eq!(result.version, 7);
}
}

View File

@@ -86,15 +86,12 @@ pub use add_data::{AddDataBuilder, AddDataMode, AddResult, NaNVectorBehavior};
pub use chrono::Duration;
pub use delete::DeleteResult;
use futures::future::join_all;
pub use lance::dataset::refs::{BranchContents, Ref, TagContents, Tags as LanceTags};
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
pub use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::statistics::DatasetStatisticsExt;
pub use lance_index::optimize::OptimizeOptions;
pub use optimize::{CompactionOptions, OptimizeAction, OptimizeStats};
pub use schema_evolution::{
AddColumnsResult, AlterColumnsResult, DropColumnsResult, FieldMetadataUpdate,
UpdateFieldMetadataResult,
};
pub use schema_evolution::{AddColumnsResult, AlterColumnsResult, DropColumnsResult};
use serde_with::skip_serializing_none;
pub use update::{UpdateBuilder, UpdateResult};
@@ -625,20 +622,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
async fn restore(&self) -> Result<()>;
/// List the versions of the table.
async fn list_versions(&self) -> Result<Vec<Version>>;
/// Create a new branch from `from` and return a handle scoped to it.
async fn create_branch(
&self,
name: &str,
from: lance::dataset::refs::Ref,
) -> Result<Arc<dyn BaseTable>>;
/// Check out an existing branch and return a handle scoped to it.
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>>;
/// List the branches of the table.
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>>;
/// Delete a branch.
async fn delete_branch(&self, name: &str) -> Result<()>;
/// The branch this handle is scoped to, or `None` for `main`.
fn current_branch(&self) -> Option<String>;
/// Get the table definition.
async fn table_definition(&self) -> Result<TableDefinition>;
/// Get the table URI (storage location)
@@ -677,19 +660,6 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
message: "create_insert_exec not implemented".to_string(),
})
}
/// Update per-field metadata. Merges into existing metadata by default;
/// [`FieldMetadataUpdate::remove`] deletes a key and
/// [`FieldMetadataUpdate::replace`] swaps the field's whole map.
///
/// The default returns `NotSupported`; Lance-backed and remote tables override it.
async fn update_field_metadata(
&self,
_updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
Err(Error::NotSupported {
message: "update_field_metadata is not supported on this table type".into(),
})
}
}
/// A Table is a collection of strong typed Rows.
@@ -1370,14 +1340,6 @@ impl Table {
self.inner.alter_columns(alterations).await
}
/// Update per-field metadata (merges by default).
pub async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
self.inner.update_field_metadata(updates).await
}
/// Remove columns from the table.
pub async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
self.inner.drop_columns(columns).await
@@ -1639,46 +1601,6 @@ impl Table {
self.inner.tags().await
}
/// Create a new branch from `from` (a version, tag, or branch) and return
/// a writable, isolated handle scoped to it. `self` is unaffected.
pub async fn create_branch(
&self,
name: &str,
from: impl Into<lance::dataset::refs::Ref>,
) -> Result<Self> {
let inner = self.inner.create_branch(name, from.into()).await?;
Ok(Self {
inner,
database: self.database.clone(),
embedding_registry: self.embedding_registry.clone(),
})
}
/// Check out an existing branch and return a handle scoped to it.
pub async fn checkout_branch(&self, name: &str) -> Result<Self> {
let inner = self.inner.checkout_branch(name).await?;
Ok(Self {
inner,
database: self.database.clone(),
embedding_registry: self.embedding_registry.clone(),
})
}
/// List the branches of the table.
pub async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
self.inner.list_branches().await
}
/// Delete a branch.
pub async fn delete_branch(&self, name: &str) -> Result<()> {
self.inner.delete_branch(name).await
}
/// The branch this handle is scoped to, or `None` for `main`.
pub fn current_branch(&self) -> Option<String> {
self.inner.current_branch()
}
/// Retrieve statistics on the table
pub async fn stats(&self) -> Result<TableStatistics> {
self.inner.stats().await
@@ -1915,21 +1837,6 @@ impl NativeTable {
self
}
/// Build a sibling `NativeTable` with the same identity but a different
/// (independent) dataset wrapper — used to hand out branch-scoped handles.
fn with_dataset(&self, dataset: dataset::DatasetConsistencyWrapper) -> Self {
Self {
name: self.name.clone(),
namespace: self.namespace.clone(),
id: self.id.clone(),
uri: self.uri.clone(),
dataset,
read_consistency_interval: self.read_consistency_interval,
namespace_client: self.namespace_client.clone(),
pushdown_operations: self.pushdown_operations.clone(),
}
}
/// Opens an existing Table using a namespace client.
///
/// This method uses `DatasetBuilder::from_namespace` to open the table, which
@@ -2673,7 +2580,6 @@ impl NativeTable {
/// field id and the second element is a hashmap of metadata key-value
/// pairs.
///
#[deprecated(since = "0.33.1", note = "Use `update_field_metadata` instead")]
pub async fn replace_field_metadata(
&self,
new_values: impl IntoIterator<Item = (u32, HashMap<String, String>)>,
@@ -2721,43 +2627,6 @@ impl BaseTable for NativeTable {
self.dataset.reload().await
}
async fn create_branch(
&self,
name: &str,
from: lance::dataset::refs::Ref,
) -> Result<Arc<dyn BaseTable>> {
let mut ds = (*self.dataset.get().await?).clone();
let branch_ds = ds.create_branch(name, from, None).await?;
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
branch_ds,
self.read_consistency_interval,
);
Ok(Arc::new(self.with_dataset(dataset)))
}
async fn checkout_branch(&self, name: &str) -> Result<Arc<dyn BaseTable>> {
let branch_ds = self.dataset.get().await?.checkout_branch(name).await?;
let dataset = dataset::DatasetConsistencyWrapper::new_latest(
branch_ds,
self.read_consistency_interval,
);
Ok(Arc::new(self.with_dataset(dataset)))
}
async fn list_branches(&self) -> Result<HashMap<String, BranchContents>> {
Ok(self.dataset.get().await?.list_branches().await?)
}
async fn delete_branch(&self, name: &str) -> Result<()> {
let mut ds = (*self.dataset.get().await?).clone();
ds.delete_branch(name).await?;
Ok(())
}
fn current_branch(&self) -> Option<String> {
self.dataset.current_branch()
}
async fn list_versions(&self) -> Result<Vec<Version>> {
Ok(self.dataset.get().await?.versions().await?)
}
@@ -3017,13 +2886,6 @@ impl BaseTable for NativeTable {
schema_evolution::execute_alter_columns(self, alterations).await
}
async fn update_field_metadata(
&self,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
schema_evolution::execute_update_field_metadata(self, updates).await
}
async fn drop_columns(&self, columns: &[&str]) -> Result<DropColumnsResult> {
schema_evolution::execute_drop_columns(self, columns).await
}
@@ -3274,6 +3136,7 @@ pub struct FragmentSummaryStats {
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
@@ -3484,56 +3347,6 @@ mod tests {
assert_eq!(table.version().await.unwrap(), 4);
}
#[tokio::test]
async fn test_branches() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
// main: one row at v1
let table = conn
.create_table("my_table", some_sample_data())
.execute()
.await
.unwrap();
assert_eq!(table.count_rows(None).await.unwrap(), 1);
assert_eq!(table.current_branch(), None);
let main_version = table.version().await.unwrap();
// branch off main's current version; it starts with main's data
let branch = table.create_branch("exp", main_version).await.unwrap();
assert_eq!(branch.current_branch().as_deref(), Some("exp"));
assert_eq!(branch.count_rows(None).await.unwrap(), 1);
// writes on the branch are isolated from main
branch.add(some_sample_data()).execute().await.unwrap();
assert_eq!(branch.count_rows(None).await.unwrap(), 2);
assert_eq!(
table.count_rows(None).await.unwrap(),
1,
"main must be untouched by branch writes"
);
// the branch shows up in the listing
let branches = table.list_branches().await.unwrap();
assert!(branches.contains_key("exp"));
// checking out the branch from the main handle sees the branch's latest data
let checked_out = table.checkout_branch("exp").await.unwrap();
assert_eq!(checked_out.current_branch().as_deref(), Some("exp"));
assert_eq!(checked_out.count_rows(None).await.unwrap(), 2);
// delete removes it from the listing
table.delete_branch("exp").await.unwrap();
let branches = table.list_branches().await.unwrap();
assert!(!branches.contains_key("exp"));
}
#[tokio::test]
async fn test_create_index() {
use arrow_array::RecordBatch;
@@ -4636,10 +4449,10 @@ mod tests {
Some(&"test_val2_update".to_string())
);
let mut new_field_metadata = HashMap::<String, String>::new();
new_field_metadata.insert("test_field_key1".into(), "test_field_val1".into());
native_tbl
.update_field_metadata(&[
FieldMetadataUpdate::new("i").set("test_field_key1", "test_field_val1")
])
.replace_field_metadata(vec![(field.id as u32, new_field_metadata)])
.await
.unwrap();

View File

@@ -870,8 +870,10 @@ mod tests {
.await
.unwrap();
// Should return empty or nearly empty result
assert!(result[0].num_rows() <= 1);
assert_eq!(
result.iter().map(|batch| batch.num_rows()).sum::<usize>(),
0
);
}
#[tokio::test]

View File

@@ -144,19 +144,8 @@ impl DatasetConsistencyWrapper {
}
/// Checkout a branch and track its HEAD for new versions.
pub async fn as_branch(&self, branch: impl Into<String>) -> Result<()> {
let branch = branch.into();
let dataset = { self.state.lock()?.dataset.clone() };
let new_dataset = dataset.checkout_branch(&branch).await?;
let mut state = self.state.lock()?;
state.dataset = Arc::new(new_dataset);
state.pinned_version = None;
drop(state);
if let ConsistencyMode::Eventual(bg_cache) = &self.consistency {
bg_cache.invalidate();
}
Ok(())
pub async fn as_branch(&self, _branch: impl Into<String>) -> Result<()> {
todo!("Branch support not yet implemented")
}
/// Check that the dataset is in a mutable mode (Latest).
@@ -172,17 +161,6 @@ impl DatasetConsistencyWrapper {
}
}
/// The branch this wrapper is currently tracking, or `None` for `main`.
pub fn current_branch(&self) -> Option<String> {
self.state
.lock()
.unwrap_or_else(|e| e.into_inner())
.dataset
.manifest()
.branch
.clone()
}
/// Returns the version, if in time travel mode, or None otherwise.
pub fn time_travel_version(&self) -> Option<u64> {
self.state
@@ -759,31 +737,4 @@ mod tests {
let result = wrapper.reload().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_as_branch_is_writable_and_tracked() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// v1 on main, then shallow-clone a branch off it
let mut ds = create_test_dataset(uri).await;
let v1 = ds.version().version;
ds.create_branch("exp", v1, None).await.unwrap();
// wrapper starts on main: latest, writable, no branch
let wrapper = DatasetConsistencyWrapper::new_latest(ds, None);
assert_eq!(wrapper.current_branch(), None);
// switch to the branch
wrapper.as_branch("exp").await.unwrap();
assert_eq!(wrapper.current_branch().as_deref(), Some("exp"));
// a branch is writable (unlike a pinned/time-travel checkout)
wrapper.ensure_mutable().unwrap();
assert_eq!(wrapper.time_travel_version(), None);
// get() returns the branch dataset
let on_branch = wrapper.get().await.unwrap();
assert_eq!(on_branch.manifest().branch.as_deref(), Some("exp"));
}
}

View File

@@ -10,7 +10,6 @@
use lance::dataset::{ColumnAlteration, NewColumnTransform};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use super::NativeTable;
use crate::Result;
@@ -45,52 +44,6 @@ pub struct DropColumnsResult {
pub version: u64,
}
/// A single field's metadata update, addressed by dot-path.
///
/// Merges into the field's existing metadata by default. Use [`Self::remove`] to
/// delete a key, or [`Self::replace`] to swap the field's entire metadata map.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize)]
pub struct FieldMetadataUpdate {
/// Dot-separated path to the field (e.g. `"embedding"` or `"address.zip"`).
pub path: String,
/// Keys to set (`Some`) or delete (`None`).
pub metadata: HashMap<String, Option<String>>,
/// If `true`, replace the field's entire metadata map instead of merging.
pub replace: bool,
}
impl FieldMetadataUpdate {
pub fn new(path: impl Into<String>) -> Self {
Self {
path: path.into(),
metadata: HashMap::new(),
replace: false,
}
}
pub fn set(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), Some(value.into()));
self
}
pub fn remove(mut self, key: impl Into<String>) -> Self {
self.metadata.insert(key.into(), None);
self
}
pub fn replace(mut self) -> Self {
self.replace = true;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct UpdateFieldMetadataResult {
/// The commit version associated with the operation.
#[serde(default)]
pub version: u64,
}
/// Internal implementation of the add columns logic.
///
/// Adds new columns to the table using the provided transforms.
@@ -137,32 +90,6 @@ pub(crate) async fn execute_drop_columns(
Ok(DropColumnsResult { version })
}
/// Internal implementation of the update field metadata logic.
///
/// Merges or replaces per-field metadata, addressing fields by dot-path.
pub(crate) async fn execute_update_field_metadata(
table: &NativeTable,
updates: &[FieldMetadataUpdate],
) -> Result<UpdateFieldMetadataResult> {
table.dataset.ensure_mutable()?;
let mut dataset = (*table.dataset.get().await?).clone();
let mut builder = dataset.update_field_metadata();
for update in updates {
let entries = update.metadata.iter().map(|(k, v)| (k.clone(), v.clone()));
builder = if update.replace {
builder.replace(&update.path, entries)?
} else {
builder.update(&update.path, entries)?
};
}
builder.await?;
let version = dataset.version().version;
table.dataset.update(dataset);
Ok(UpdateFieldMetadataResult { version })
}
#[cfg(test)]
mod tests {
use arrow_array::{Int32Array, StringArray, record_batch};
@@ -170,7 +97,6 @@ mod tests {
use futures::TryStreamExt;
use lance::dataset::ColumnAlteration;
use super::FieldMetadataUpdate;
use crate::connect;
use crate::query::{ExecutableQuery, QueryBase, Select};
use crate::table::NewColumnTransform;
@@ -684,46 +610,4 @@ mod tests {
let v4 = table.version().await.unwrap();
assert_eq!(drop_result.version, v4);
}
#[tokio::test]
async fn test_update_field_metadata() {
let conn = connect("memory://").execute().await.unwrap();
let batch = record_batch!(
("id", Int32, [1, 2, 3]),
("category", Utf8, ["A", "B", "C"])
)
.unwrap();
let table = conn
.create_table("test_update_field_metadata", batch)
.execute()
.await
.unwrap();
// Set metadata on a field.
table
.update_field_metadata(&[FieldMetadataUpdate::new("category")
.set("unit", "label")
.set("pii", "false")])
.await
.unwrap();
let schema = table.schema().await.unwrap();
let field = schema.field_with_name("category").unwrap();
assert_eq!(
field.metadata().get("unit").map(String::as_str),
Some("label")
);
// Merge: add a key, delete one, keep the rest.
table
.update_field_metadata(&[FieldMetadataUpdate::new("category")
.set("source", "import")
.remove("pii")])
.await
.unwrap();
let schema = table.schema().await.unwrap();
let md = schema.field_with_name("category").unwrap().metadata();
assert_eq!(md.get("unit").map(String::as_str), Some("label")); // preserved
assert_eq!(md.get("source").map(String::as_str), Some("import")); // added
assert!(!md.contains_key("pii")); // deleted
}
}