Compare commits

..

17 Commits

Author SHA1 Message Date
lancedb automation
6586ea2704 chore: update lance dependency to v7.0.0-beta.5 2026-05-06 17:59:54 +00:00
LanceDB Robot
47a34f5cca chore: update lance dependency to v7.0.0-beta.4 (#3348)
## Summary
- Update Lance Rust dependencies to `v7.0.0-beta.4` using
`ci/set_lance_version.py`.
- Update the Java `lance-core` dependency property to `7.0.0-beta.4`.
- Align LanceDB with dependency updates required by Lance 7, including
`object_store` 0.13 API compatibility.

Triggering tag:
https://github.com/lance-format/lance/releases/tag/v7.0.0-beta.4

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-05-05 18:36:39 -07:00
Weston Pace
a17c241e86 feat(python): make Permutation fork-safe for PyTorch DataLoader workers (#3339)
## Summary

PyTorch's `DataLoader` uses fork-based multiprocessing by default on
Linux, but threads do not survive `fork()`. LanceDB's Python bindings
drive async work through two threaded layers, both of which become inert
in a forked child:

- `BackgroundEventLoop` runs an asyncio loop on a Python
`threading.Thread`.
- `pyo3-async-runtimes::tokio` holds a global multi-threaded tokio
runtime whose worker threads also die on fork — and its runtime lives in
a `OnceLock` that cannot be replaced after first use.

As a result, any `Permutation` (or other async API) used inside a
fork-based `DataLoader` worker hangs indefinitely. This PR makes both
layers fork-safe so `Permutation` works as a `torch.utils.data.Dataset`
with `num_workers > 0`.

## Approach

### Rust — new `python/src/runtime.rs`

Mirrors the pattern used in [Lance's Python
bindings](456198cd6f/python/src/lib.rs (L139)),
adapted for the async-bridge use case.

- `LanceRuntime` implements `pyo3_async_runtimes::generic::Runtime +
ContextExt`, backed by an `AtomicPtr<tokio::runtime::Runtime>` we own
(sidestepping `pyo3-async-runtimes`'s frozen `OnceLock` global).
- A `pthread_atfork(after_in_child)` handler nulls the pointer; the next
`spawn` rebuilds the runtime in the child. The previous runtime is
intentionally **leaked** — calling `Drop` would try to join now-dead
worker threads and hang.
- `runtime::future_into_py` is a drop-in for
`pyo3_async_runtimes::tokio::future_into_py`. All ~80 call sites in
`arrow.rs` / `connection.rs` / `permutation.rs` / `query.rs` /
`table.rs` are updated to route through it.
- `python/Cargo.toml` adds `libc = "0.2"` and the tokio
`rt-multi-thread` feature.

### Python — `lancedb/background_loop.py`

- Refactors `BackgroundEventLoop.__init__` to a reusable `_start()`
method.
- An `os.register_at_fork(after_in_child=…)` hook calls `LOOP._start()`
to give the singleton a fresh asyncio loop and thread **in place**. This
matters because the rest of the codebase imports `LOOP` via `from
.background_loop import LOOP` — rebinding the module attribute would
leave those references holding the dead loop.

### Python — `lancedb/__init__.py`

Removes the `__warn_on_fork` pre-fork warning (and the now-unused
`import warnings`). Fork is supported.

## Test plan

- [x] New `test_permutation_dataloader_fork_workers` in
`python/tests/test_torch.py`: runs a `Permutation` through
`torch.utils.data.DataLoader(num_workers=2,
multiprocessing_context="fork")` inside a spawn-isolated child with a
30s hang detector. **Pre-fix**: timed out at 36s. **Post-fix**: passes
in ~3.6s.
- [x] New `test_remote_connection_after_fork` in
`python/tests/test_remote_db.py`: forks a child that creates a fresh
`lancedb.connect(...)` against a mock HTTP server and calls
`table_names()`; passes in <1s, validates the runtime reset is
sufficient for fresh remote clients.
- [x] All 62 tests in `test_torch.py` + `test_permutation.py` pass.
- [x] All 35 tests in `test_remote_db.py` pass.
- [x] `test_table.py` (87) + `test_db.py` + `test_query.py` (157, minus
one unrelated `sentence_transformers` import skip) — 244 passing.
- [x] `cargo clippy -p lancedb-python --tests` clean.
- [x] `cargo fmt`, `ruff check`, `ruff format` all clean.

## Known limitation (follow-up)

This PR makes a **freshly-built** `lancedb.connect(...)` work in a
forked child. An **inherited** `Connection` from the parent still
carries an inherited `reqwest::Client` whose hyper connection pool
references socket FDs and TCP/TLS state shared with the parent — using
it from the child after fork is unsafe (especially with HTTP/1.1
keep-alive). The recommended pattern for fork-based `DataLoader` workers
that hit a remote DB is to construct a new connection inside the worker.
Auto-clearing inherited HTTP client pools on fork would require tracking
live `Connection` instances in `lancedb` core and is left for a
follow-up PR.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 13:44:10 -07:00
Weston Pace
1fc23e5473 fix(python): make Permutation picklable for PyTorch multiprocessing (#3335)
## Summary

When pytorch is used with multiprocessing and the mp mode is spawn then
the Permutation needs to be pickled. It could not be pickled because
`Table` and `Connection` are not serializable. This PR adds pickle
support to Permutation without adding general pickle support to `Table`
or `Connection`. To add general support we probably need to start by
adding serialization in the namespace client.

In the meantime this PR enable pickling by adding special cases for:

 * In-memory tables (just serialize as Arrow IPC)
 * Native tables (serialize the URI)

If a user is not using one of the above cases (e.g. using a remote
connection) then they will need to provide a connection factory that can
be pickled.

## Breaking change

`PermutationBuilder.persist(...)` is removed from the Python bindings;
the permutation table is now always in-memory. The underlying Rust
`PermutationBuilder::persist` API is untouched and can be re-exposed
later if needed. It probably won't make sense to do that until we have a
way to serialize `Table` and `Connection`.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 21:37:58 -07:00
qingfeng-occ
87b831bcae fix(node): remove redundant postbuild:release script to fix build failure (#3285)
The `build:release` command already outputs the `*.node` files directly
to the `dist/` directory via the `--output-dir dist` flag.

Therefore, the `postbuild:release` script, which attempts to copy
`*.node` files from the `lancedb/` source directory, fails with a "no
such file or directory" error because the source files do not exist
there.

This commit removes the redundant `postbuild:release` script to resolve
the build failure.

fix #3284

Signed-off-by: qingfeng-occ <qing.feng@zte.com.cn>
2026-05-04 09:37:18 -07:00
Nitesh Yadav
59db036118 fix(python): add missing space in hybrid query error message (#3340)
Hi, the hybrid query error message looks like it can use a space, just
added it.

```python
def _validate_query(self, query, vector=None, text=None):
    if query is not None and (vector is not None or text is not None):
        raise ValueError(
            "You can either provide a string query in search() method"
            "or set `vector()` and `text()` explicitly for hybrid search."
            "But not both."
        )
```
2026-05-02 15:51:00 -07:00
Lance Release
c091243d5b Bump version: 0.28.0-beta.10 → 0.28.0-beta.11 2026-04-29 17:53:49 +00:00
Lance Release
a2aea7b4e5 Bump version: 0.31.0-beta.10 → 0.31.0-beta.11 2026-04-29 17:53:22 +00:00
LanceDB Robot
4a5341edb1 chore: update lance dependency to v6.0.0-beta.7 (#3334)
## Summary
- Update Lance Rust dependencies to `6.0.0-beta.7` using
`ci/set_lance_version.py`.
- Update Java `lance-core.version` to `6.0.0-beta.7`.
- Align Arrow/DataFusion/PyO3 dependency versions and apply required
compatibility fixes for the Lance upgrade.

Triggering tag:
[v6.0.0-beta.7](https://github.com/lance-format/lance/releases/tag/v6.0.0-beta.7)

## Verification
- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-04-29 10:52:25 -07:00
Jack Ye
25dfe2cfd4 feat: add manifest-enabled directory namespace mode (#3332)
Adds manifest_enabled for local/native connections so directory
namespace manifests can be the source of truth, including migration from
directory listing and Azure credential vending feature wiring. Also
exposes the option through Rust, Python, and Node bindings with focused
validation.
2026-04-29 09:22:06 -07:00
Lance Release
4dcd7f4314 Bump version: 0.28.0-beta.9 → 0.28.0-beta.10 2026-04-28 13:29:26 +00:00
Lance Release
2e36cd9dad Bump version: 0.31.0-beta.9 → 0.31.0-beta.10 2026-04-28 13:29:00 +00:00
Weston Pace
f31e27768a fix: address RUSTSEC-2026-0104 cargo-deny advisory (#3326)
## Summary

- Update `rustls-webpki` 0.103.10 → 0.103.13 to fix RUSTSEC-2026-0104
(reachable panic in CRL parsing)
- Add advisory ignore for the legacy `rustls-webpki` 0.101.7 copy pinned
to the aws-smithy/rustls 0.21 chain (same chain already exempted for
RUSTSEC-2026-0098/0099)

Fixes the `deny` CI job failure seen in #3325.

## Test plan

- [x] `cargo deny check advisories` passes locally

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 17:56:10 -07:00
LanceDB Robot
b84150a53e chore: update lance dependency to v6.0.0-beta.4 (#3325)
## Summary

- Updates Lance Rust dependencies to `6.0.0-beta.4` using
`ci/set_lance_version.py`.
- Updates the Java `lance-core.version` property to `6.0.0-beta.4`.
- Triggering Lance tag:
https://github.com/lance-format/lance/releases/tag/v6.0.0-beta.4

## Verification

- `cargo clippy --workspace --tests --all-features -- -D warnings`
- `cargo fmt --all`
2026-04-27 15:13:07 -07:00
Will Jones
d135c18db6 ci: add cargo-deny configuration and CI check (#3307)
Adds a `deny.toml` at the workspace root and a `deny` CI job that runs
`cargo deny check` on every PR. Catches yanked crates, license drift,
banned or wildcard dependencies, unapproved sources, and new RUSTSEC
advisories.

As part of wiring this up:

- Updated `aws-lc-rs` 1.13.0 → 1.16.3 / `aws-lc-sys` 0.28.0 → 0.40.0 to
  clear four 2026 AWS-LC advisories (timing side-channel, PKCS7 bypass,
  CRL scope). Removed the `=0.28.0` workaround pin; the original build
  failure no longer reproduces.
- Updated `bytes`, `zlib-rs`, `rand`, `rustls-webpki`, `lz4_flex` to
  clear their current advisories.
- Marked `lancedb-nodejs` and `lancedb-python` as `publish = false` and
  pinned `lzma-sys` from `*` to `0.1` so `bans.wildcards = "deny"` can
  be enforced.

10 remaining advisories have no safe upgrade available (transitive via
opendal, lance, datafusion, async-openai, aws-sdk on the legacy rustls
0.21 chain). Each is ignored in `deny.toml` with a per-entry rationale
and a link to the RUSTSEC advisory. New advisories still fail CI.

Fixes #3297

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 20:53:15 -07:00
Will Jones
ef399de092 ci: switch PyPI publish to OIDC trusted publishing (#3302)
## Summary

- Replaces `LANCEDB_PYPI_API_TOKEN` (long-lived token) with OIDC trusted
publishing via `pypa/gh-action-pypi-publish`
- Adds `id-token: write` permission to linux/mac/windows jobs
- Removes `twine`-based upload and the `pypi_token` input from
`upload_wheel` composite action
- Enables PEP 740 Sigstore attestations on published wheels as a bonus

After merging, rotate/revoke the `LANCEDB_PYPI_API_TOKEN` secret.

Closes #3294

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-24 20:53:06 -07:00
Will Jones
0d767abd0e ci: add Dependabot config for shipped Rust binaries (#3300)
Adds `.github/dependabot.yml` enabling weekly cargo update PRs for the
root workspace, which produces the Rust binaries we ship: the Node.js
and Python native extensions. The `rust/lancedb` library crate shares
the same lockfile — its consumers pick versions themselves, but bumping
transitive deps here keeps the shipped binaries current.

Also removes the misleading `exclude = ["python"]` line from the root
`Cargo.toml`: `python` is listed in `members`, and `cargo metadata`
confirms it's a workspace member, so the exclude was dead code that
implied the opposite.

Minor/patch updates are grouped to reduce PR noise.

Part of #3292. Only covers the cargo ecosystem; pip, npm, and
github-actions can follow.
2026-04-24 20:52:54 -07:00
58 changed files with 2966 additions and 689 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.28.0-beta.9"
current_version = "0.28.0-beta.11"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

18
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
version: 2
# Scope: the root Cargo workspace, which produces the Rust binaries we
# ship to users (the Node.js and Python native extensions). The
# `rust/lancedb` library crate shares the same lockfile; its consumers
# pick their own dependency versions, but bumping transitive deps here
# keeps the binaries we ship current.
updates:
- package-ecosystem: cargo
directory: /
schedule:
interval: weekly
open-pull-requests-limit: 10
groups:
rust-minor-patch:
update-types:
- minor
- patch

View File

@@ -21,6 +21,9 @@ jobs:
linux:
name: Python ${{ matrix.config.platform }} manylinux${{ matrix.config.manylinux }}
timeout-minutes: 60
permissions:
id-token: write
contents: read
strategy:
matrix:
config:
@@ -60,10 +63,12 @@ jobs:
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
pypi_token: ${{ secrets.LANCEDB_PYPI_API_TOKEN }}
fury_token: ${{ secrets.FURY_TOKEN }}
mac:
timeout-minutes: 90
permissions:
id-token: write
contents: read
runs-on: ${{ matrix.config.runner }}
strategy:
matrix:
@@ -88,10 +93,12 @@ jobs:
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
pypi_token: ${{ secrets.LANCEDB_PYPI_API_TOKEN }}
fury_token: ${{ secrets.FURY_TOKEN }}
windows:
timeout-minutes: 60
permissions:
id-token: write
contents: read
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
@@ -110,7 +117,6 @@ jobs:
- uses: ./.github/workflows/upload_wheel
if: startsWith(github.ref, 'refs/tags/python-v')
with:
pypi_token: ${{ secrets.LANCEDB_PYPI_API_TOKEN }}
fury_token: ${{ secrets.FURY_TOKEN }}
gh-release:
if: startsWith(github.ref, 'refs/tags/python-v')

View File

@@ -9,7 +9,10 @@ on:
- Cargo.toml
- Cargo.lock
- rust-toolchain.toml
- deny.toml
- rust/**
- nodejs/Cargo.toml
- python/Cargo.toml
- .github/workflows/rust.yml
permissions:
@@ -56,6 +59,17 @@ jobs:
- name: Run clippy (without remote feature)
run: cargo clippy --profile ci --workspace --tests -- -D warnings
deny:
# Supply-chain checks: advisories, licenses, banned crates, and source
# restrictions. Configuration lives in `deny.toml` at the workspace root.
timeout-minutes: 10
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: EmbarkStudios/cargo-deny-action@v2
with:
command: check advisories bans licenses sources
build-no-lock:
runs-on: ubuntu-24.04
timeout-minutes: 30

View File

@@ -2,9 +2,6 @@ name: upload-wheel
description: "Upload wheels to Pypi"
inputs:
pypi_token:
required: true
description: "release token for the repo"
fury_token:
required: true
description: "release token for the fury repo"
@@ -12,12 +9,6 @@ inputs:
runs:
using: "composite"
steps:
- name: Install dependencies
shell: bash
run: |
python -m pip install --upgrade pip
pip install twine
python3 -m pip install --upgrade pkginfo
- name: Choose repo
shell: bash
id: choose_repo
@@ -27,19 +18,17 @@ runs:
else
echo "repo=pypi" >> $GITHUB_OUTPUT
fi
- name: Publish to PyPI
- name: Publish to Fury
if: steps.choose_repo.outputs.repo == 'fury'
shell: bash
env:
FURY_TOKEN: ${{ inputs.fury_token }}
PYPI_TOKEN: ${{ inputs.pypi_token }}
run: |
if [[ ${{ steps.choose_repo.outputs.repo }} == fury ]]; then
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
echo "Uploading $WHEEL to Fury"
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
else
twine upload --repository ${{ steps.choose_repo.outputs.repo }} \
--username __token__ \
--password $PYPI_TOKEN \
target/wheels/lancedb-*.whl
fi
WHEEL=$(ls target/wheels/lancedb-*.whl 2> /dev/null | head -n 1)
echo "Uploading $WHEEL to Fury"
curl -f -F package=@$WHEEL https://$FURY_TOKEN@push.fury.io/lancedb/
- name: Publish to PyPI
if: steps.choose_repo.outputs.repo == 'pypi'
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: target/wheels/

1696
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,5 @@
[workspace]
members = ["rust/lancedb", "nodejs", "python"]
# Python package needs to be built by maturin.
exclude = ["python"]
resolver = "2"
[workspace.package]
@@ -15,40 +13,40 @@ categories = ["database-implementations"]
rust-version = "1.91.0"
[workspace.dependencies]
lance = { "version" = "=6.0.0-beta.2", default-features = false, "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=6.0.0-beta.2", default-features = false, "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=6.0.0-beta.2", default-features = false, "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=6.0.0-beta.2", "tag" = "v6.0.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
lance = { "version" = "=7.0.0-beta.5", default-features = false, "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-core = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-datagen = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-file = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-io = { "version" = "=7.0.0-beta.5", default-features = false, "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-index = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-linalg = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-namespace-impls = { "version" = "=7.0.0-beta.5", default-features = false, "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-table = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-testing = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-datafusion = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-encoding = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
lance-arrow = { "version" = "=7.0.0-beta.5", "tag" = "v7.0.0-beta.5", "git" = "https://github.com/lance-format/lance.git" }
ahash = "0.8"
# Note that this one does not include pyarrow
arrow = { version = "57.2", optional = false }
arrow-array = "57.2"
arrow-data = "57.2"
arrow-ipc = "57.2"
arrow-ord = "57.2"
arrow-schema = "57.2"
arrow-select = "57.2"
arrow-cast = "57.2"
arrow = { version = "58.0.0", optional = false }
arrow-array = "58.0.0"
arrow-data = "58.0.0"
arrow-ipc = "58.0.0"
arrow-ord = "58.0.0"
arrow-schema = "58.0.0"
arrow-select = "58.0.0"
arrow-cast = "58.0.0"
async-trait = "0"
datafusion = { version = "52.1", default-features = false }
datafusion-catalog = "52.1"
datafusion-common = { version = "52.1", default-features = false }
datafusion-execution = "52.1"
datafusion-expr = "52.1"
datafusion-functions = "52.1"
datafusion-physical-plan = "52.1"
datafusion-physical-expr = "52.1"
datafusion-sql = "52.1"
datafusion = { version = "53.0.0", default-features = false }
datafusion-catalog = "53.0.0"
datafusion-common = { version = "53.0.0", default-features = false }
datafusion-execution = "53.0.0"
datafusion-expr = "53.0.0"
datafusion-functions = "53.0.0"
datafusion-physical-plan = "53.0.0"
datafusion-physical-expr = "53.0.0"
datafusion-sql = "53.0.0"
env_logger = "0.11"
half = { "version" = "2.7.1", default-features = false, features = [
"num-traits",
@@ -56,7 +54,7 @@ half = { "version" = "2.7.1", default-features = false, features = [
futures = "0"
log = "0.4"
moka = { version = "0.12", features = ["future"] }
object_store = "0.12.0"
object_store = "0.13.2"
pin-project = "1.0.7"
rand = "0.9"
snafu = "0.8"

172
deny.toml Normal file
View File

@@ -0,0 +1,172 @@
# cargo-deny configuration for LanceDB.
#
# Run locally with `cargo deny check`. See
# https://embarkstudios.github.io/cargo-deny/ for the full reference.
# The set of target triples we care about. cargo-deny will only consider
# dependencies that are used on at least one of these targets. Keeping this
# explicit avoids noise from platform-specific crates (e.g. wasm, android,
# ios) that we never actually ship.
[graph]
targets = [
"x86_64-unknown-linux-gnu",
"aarch64-unknown-linux-gnu",
"x86_64-apple-darwin",
"aarch64-apple-darwin",
"x86_64-pc-windows-msvc",
"aarch64-pc-windows-msvc",
]
all-features = true
[output]
feature-depth = 1
# ---------------------------------------------------------------------------
# Advisories: security vulnerabilities and yanked crates.
# ---------------------------------------------------------------------------
[advisories]
version = 2
# Fail the check if any crate in the lockfile has been yanked from crates.io.
# Yanked crates are a signal the author retracted the release (often due to
# bugs or security issues) and should not be depended on.
yanked = "deny"
# Advisory IDs we have explicitly reviewed and chosen to accept. Every
# entry must include a rationale and, where possible, an upstream issue
# pointing to a fix. Revisit this list whenever dependencies are updated.
ignore = [
# rsa: Marvin Attack timing side-channel in PKCS#1 v1.5 decryption.
# Reached only through opendal → reqsign → rsa. We do not use RSA
# decryption in LanceDB ourselves; this is dormant in the signing path.
# No fixed release exists upstream as of this writing.
# https://rustsec.org/advisories/RUSTSEC-2023-0071
{ id = "RUSTSEC-2023-0071", reason = "rsa crate via opendal/reqsign; no fixed upstream release" },
# instant: unmaintained. Pulled in via backoff → instant. Upstream
# recommends switching to `web-time`; fix has to come from backoff.
# https://rustsec.org/advisories/RUSTSEC-2024-0384
{ id = "RUSTSEC-2024-0384", reason = "transitive via backoff; waiting on backoff replacement" },
# paste: unmaintained (author archived the repo). Used transitively by
# datafusion and the arrow ecosystem; widespread, no drop-in replacement.
# https://rustsec.org/advisories/RUSTSEC-2024-0436
{ id = "RUSTSEC-2024-0436", reason = "transitive via datafusion; awaiting ecosystem migration" },
# tantivy: segfault on malformed input due to missing bounds check.
# Pulled in via lance for full-text search. We only feed tantivy
# documents we construct ourselves, not attacker-controlled bytes.
# Tracked for a lance dependency bump.
# https://rustsec.org/advisories/RUSTSEC-2025-0003
{ id = "RUSTSEC-2025-0003", reason = "tantivy via lance; inputs are internally produced, not user-supplied bytes" },
# backoff: unmaintained. Reached only via async-openai. Replacement
# requires async-openai to migrate (or us to drop async-openai).
# https://rustsec.org/advisories/RUSTSEC-2025-0012
{ id = "RUSTSEC-2025-0012", reason = "transitive via async-openai; waiting on upstream migration" },
# number_prefix: unmaintained. Transitive via indicatif → hf-hub.
# No security impact, just maintenance status.
# https://rustsec.org/advisories/RUSTSEC-2025-0119
{ id = "RUSTSEC-2025-0119", reason = "transitive via hf-hub/indicatif; cosmetic formatting crate" },
# rustls-pemfile: unmaintained. Reached from two separate chains:
# rustls-native-certs 0.6 (via hyper-rustls 0.24) and object_store 0.12.
# Both upstream dependencies need to move before we can drop it.
# https://rustsec.org/advisories/RUSTSEC-2025-0134
{ id = "RUSTSEC-2025-0134", reason = "transitive via rustls-native-certs/object_store; waiting on upstream migration" },
# rustls-webpki 0.101.7 (old major line): name-constraint checks for
# URI / wildcard names. Pulled in only via the legacy rustls 0.21 chain
# from aws-smithy-http-client. The 0.103 line we actively use is patched.
# Clearing the 0.101 copy requires the aws-sdk chain to migrate off
# rustls 0.21.
# https://rustsec.org/advisories/RUSTSEC-2026-0098
# https://rustsec.org/advisories/RUSTSEC-2026-0099
{ id = "RUSTSEC-2026-0098", reason = "only affects rustls-webpki 0.101 from legacy aws-smithy/rustls 0.21 chain" },
{ id = "RUSTSEC-2026-0099", reason = "only affects rustls-webpki 0.101 from legacy aws-smithy/rustls 0.21 chain" },
# rustls-webpki 0.101.7: reachable panic in CRL parsing. Same legacy
# rustls 0.21 chain from aws-smithy-http-client as above. The 0.103 line
# we actively use is upgraded to 0.103.13 which contains the fix.
# https://rustsec.org/advisories/RUSTSEC-2026-0104
{ id = "RUSTSEC-2026-0104", reason = "only affects rustls-webpki 0.101 from legacy aws-smithy/rustls 0.21 chain" },
]
# ---------------------------------------------------------------------------
# Licenses: only allow licenses we've reviewed as compatible with Apache-2.0.
# ---------------------------------------------------------------------------
[licenses]
version = 2
# SPDX identifiers for licenses that are compatible with our Apache-2.0
# distribution. Additions require legal review.
allow = [
"Apache-2.0",
"Apache-2.0 WITH LLVM-exception",
"MIT",
"BSD-2-Clause",
"BSD-3-Clause",
"ISC",
"Unicode-3.0",
"Unicode-DFS-2016",
"Zlib",
"CC0-1.0",
"MPL-2.0",
"BSL-1.0",
"OpenSSL",
# 0BSD ("BSD Zero Clause") is effectively public domain — no attribution
# required. Pulled in by `mock_instant`.
"0BSD",
# bzip2-1.0.6 is the permissive upstream bzip2 license (BSD-like). Pulled
# in by `libbz2-rs-sys`, the pure-Rust bzip2 implementation.
"bzip2-1.0.6",
# CDLA-Permissive-2.0 is a permissive data license used by `webpki-roots`
# for the Mozilla CA root bundle. Data-only, distribution-compatible.
"CDLA-Permissive-2.0",
]
confidence-threshold = 0.8
# Crates whose license cannot be determined from Cargo metadata but whose
# license we've manually confirmed from upstream. Keep this list minimal.
[[licenses.clarify]]
# polars-arrow-format omits the `license` field in its Cargo.toml, but the
# upstream repo (pola-rs/polars-arrow-format) is dual-licensed Apache-2.0 OR
# MIT. See https://github.com/pola-rs/polars-arrow-format/blob/main/LICENSE
crate = "polars-arrow-format"
expression = "Apache-2.0 OR MIT"
license-files = []
# ---------------------------------------------------------------------------
# Bans: disallow specific crates and flag dependency hygiene issues.
# ---------------------------------------------------------------------------
[bans]
# Warn (not deny) on duplicate versions of the same crate. In a large
# workspace like this one, duplicates are common and often unavoidable
# transitively. We surface them to discourage growth, but don't fail CI.
multiple-versions = "warn"
# Wildcard version requirements (`foo = "*"`) are a footgun — they let any
# future release in without review. Ban them outright.
wildcards = "deny"
# Internal workspace crates reference each other via `path = "..."`, which
# cargo-deny sees as a wildcard version. That's fine for private workspace
# members (not published to crates.io), so allow it specifically for paths.
allow-wildcard-paths = true
# Features that, if enabled, should cause the check to fail.
deny = []
# Crates to skip when checking for duplicate versions.
skip = []
# Similar to `skip`, but also skips the entire transitive subtree.
skip-tree = []
# ---------------------------------------------------------------------------
# Sources: restrict where crates can come from.
# ---------------------------------------------------------------------------
[sources]
# Deny any registry other than the ones explicitly listed below.
unknown-registry = "deny"
# Deny any git dependency whose host isn't in the allow-list below. This
# prevents accidental pulls from arbitrary forks.
unknown-git = "deny"
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
# Lance is developed in a sibling repo and pulled as a git dependency until
# releases are cut to crates.io. Allow that specific host.
allow-git = [
"https://github.com/lance-format/lance",
]

View File

@@ -14,7 +14,7 @@ Add the following dependency to your `pom.xml`:
<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-core</artifactId>
<version>0.28.0-beta.9</version>
<version>0.28.0-beta.11</version>
</dependency>
```

View File

@@ -41,6 +41,29 @@ for testing purposes.
***
### manifestEnabled?
```ts
optional manifestEnabled: boolean;
```
(For LanceDB OSS only): use directory namespace manifests as the source
of truth for table metadata. Existing directory-listed root tables are
migrated into the manifest on access.
***
### namespaceClientProperties?
```ts
optional namespaceClientProperties: Record<string, string>;
```
(For LanceDB OSS only): extra properties for the backing namespace
client used by manifest-enabled native connections.
***
### readConsistencyInterval?
```ts

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.9</version>
<version>0.28.0-beta.11</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.28.0-beta.9</version>
<version>0.28.0-beta.11</version>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>LanceDB Java SDK Parent POM</description>
@@ -28,7 +28,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<lance-core.version>6.0.0-beta.2</lance-core.version>
<lance-core.version>7.0.0-beta.5</lance-core.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>

View File

@@ -1,7 +1,8 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.28.0-beta.9"
version = "0.28.0-beta.11"
publish = false
license.workspace = true
description.workspace = true
repository.workspace = true
@@ -15,7 +16,7 @@ crate-type = ["cdylib"]
async-trait.workspace = true
arrow-ipc.workspace = true
arrow-array.workspace = true
arrow-buffer = "57.2"
arrow-buffer = "58.0.0"
half.workspace = true
arrow-schema.workspace = true
env_logger.workspace = true
@@ -31,8 +32,8 @@ lzma-sys = { version = "0.1", features = ["static"] }
log.workspace = true
# Pin to resolve build failures; update periodically for security patches.
aws-lc-sys = "=0.38.0"
aws-lc-rs = "=1.16.1"
aws-lc-sys = "=0.40.0"
aws-lc-rs = "=1.16.3"
[build-dependencies]
napi-build = "2.3.1"

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.28.0-beta.9",
"version": "0.28.0-beta.11",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",
@@ -75,7 +75,6 @@
"build:debug": "napi build --platform --dts ../lancedb/native.d.ts --js ../lancedb/native.js --output-dir lancedb",
"postbuild:debug": "shx mkdir -p dist && shx cp lancedb/*.node dist/",
"build:release": "napi build --platform --release --dts ../lancedb/native.d.ts --js ../lancedb/native.js --output-dir dist",
"postbuild:release": "shx mkdir -p dist && shx cp lancedb/*.node dist/",
"build": "npm run build:debug && npm run tsc",
"build-release": "npm run build:release && npm run tsc",
"tsc": "tsc -b",

View File

@@ -67,6 +67,12 @@ impl Connection {
builder = builder.storage_option(key, value);
}
}
if let Some(manifest_enabled) = options.manifest_enabled {
builder = builder.manifest_enabled(manifest_enabled);
}
if let Some(namespace_client_properties) = options.namespace_client_properties {
builder = builder.namespace_client_properties(namespace_client_properties);
}
// Create client config, optionally with header provider
let client_config = options.client_config.unwrap_or_default();

View File

@@ -37,6 +37,13 @@ pub struct ConnectionOptions {
///
/// The available options are described at https://docs.lancedb.com/storage/
pub storage_options: Option<HashMap<String, String>>,
/// (For LanceDB OSS only): use directory namespace manifests as the source
/// of truth for table metadata. Existing directory-listed root tables are
/// migrated into the manifest on access.
pub manifest_enabled: Option<bool>,
/// (For LanceDB OSS only): extra properties for the backing namespace
/// client used by manifest-enabled native connections.
pub namespace_client_properties: Option<HashMap<String, String>>,
/// (For LanceDB OSS only): the session to use for this connection. Holds
/// shared caches and other session-specific state.
pub session: Option<session::Session>,

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.31.0-beta.9"
current_version = "0.31.0-beta.11"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,7 @@
[package]
name = "lancedb-python"
version = "0.31.0-beta.9"
version = "0.31.0-beta.11"
publish = false
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true
@@ -14,7 +15,7 @@ name = "_lancedb"
crate-type = ["cdylib"]
[dependencies]
arrow = { version = "57.2", features = ["pyarrow"] }
arrow = { version = "58.0.0", features = ["pyarrow"] }
async-trait = "0.1"
bytes = "1"
lancedb = { path = "../rust/lancedb", default-features = false }
@@ -24,8 +25,8 @@ lance-namespace-impls.workspace = true
lance-io.workspace = true
env_logger.workspace = true
log.workspace = true
pyo3 = { version = "0.26", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.26", features = [
pyo3 = { version = "0.28", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.28", features = [
"attributes",
"tokio-runtime",
] }
@@ -34,10 +35,11 @@ futures.workspace = true
serde = "1"
serde_json = "1"
snafu.workspace = true
tokio = { version = "1.40", features = ["sync"] }
tokio = { version = "1.40", features = ["sync", "rt-multi-thread"] }
libc = "0.2"
[build-dependencies]
pyo3-build-config = { version = "0.26", features = [
pyo3-build-config = { version = "0.28", features = [
"extension-module",
"abi3-py39",
] }

View File

@@ -7,7 +7,6 @@ import os
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import Dict, Optional, Union, Any, List
import warnings
__version__ = importlib.metadata.version("lancedb")
@@ -73,6 +72,7 @@ def connect(
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_impl: Optional[str] = None,
namespace_client_properties: Optional[Dict[str, str]] = None,
namespace_client_pushdown_operations: Optional[List[str]] = None,
@@ -111,6 +111,10 @@ def connect(
storage_options: dict, optional
Additional options for the storage backend. See available options at
<https://docs.lancedb.com/storage/>
manifest_enabled : bool, default False
When true for local/native connections, use directory namespace
manifests as the source of truth for table metadata. Existing
directory-listed root tables are migrated into the manifest on access.
session: Session, optional
(For LanceDB OSS only)
A session to use for this connection. Sessions allow you to configure
@@ -158,11 +162,11 @@ def connect(
conn : DBConnection
A connection to a LanceDB database.
"""
if namespace_client_impl is not None or namespace_client_properties is not None:
if namespace_client_impl is None or namespace_client_properties is None:
if namespace_client_impl is not None:
if namespace_client_properties is None:
raise ValueError(
"Both namespace_client_impl and "
"namespace_client_properties must be provided"
"namespace_client_properties must be provided when "
"namespace_client_impl is set"
)
if kwargs:
raise ValueError(f"Unknown keyword arguments: {kwargs}")
@@ -175,6 +179,12 @@ def connect(
namespace_client_pushdown_operations=namespace_client_pushdown_operations,
)
if namespace_client_properties is not None and not manifest_enabled:
raise ValueError(
"namespace_client_impl must be provided when using "
"namespace_client_properties unless manifest_enabled=True"
)
if namespace_client_pushdown_operations is not None:
raise ValueError(
"namespace_client_pushdown_operations is only valid when "
@@ -212,6 +222,8 @@ def connect(
read_consistency_interval=read_consistency_interval,
storage_options=storage_options,
session=session,
manifest_enabled=manifest_enabled,
namespace_client_properties=namespace_client_properties,
)
@@ -289,6 +301,8 @@ def deserialize_conn(
parsed["uri"],
read_consistency_interval=rci,
storage_options=storage_options,
manifest_enabled=parsed.get("manifest_enabled", False),
namespace_client_properties=parsed.get("namespace_client_properties"),
)
else:
raise ValueError(f"Unknown connection_type: {connection_type}")
@@ -304,6 +318,8 @@ async def connect_async(
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> AsyncConnection:
"""Connect to a LanceDB database.
@@ -343,6 +359,13 @@ async def connect_async(
cache sizes for index and metadata caches, which can significantly
impact memory use and performance. They can also be re-used across
multiple connections to share the same cache state.
manifest_enabled : bool, default False
When true for local/native connections, use directory namespace
manifests as the source of truth for table metadata. Existing
directory-listed root tables are migrated into the manifest on access.
namespace_client_properties : dict, optional
Additional directory namespace client properties to use with
``manifest_enabled=True``.
Examples
--------
@@ -385,6 +408,8 @@ async def connect_async(
client_config,
storage_options,
session,
manifest_enabled,
namespace_client_properties,
)
)
@@ -412,13 +437,3 @@ __all__ = [
"Table",
"__version__",
]
def __warn_on_fork():
warnings.warn(
"lance is not fork-safe. If you are using multiprocessing, use spawn instead.",
)
if hasattr(os, "register_at_fork"):
os.register_at_fork(before=__warn_on_fork) # type: ignore[attr-defined]

View File

@@ -242,6 +242,8 @@ async def connect(
client_config: Optional[Union[ClientConfig, Dict[str, Any]]],
storage_options: Optional[Dict[str, str]],
session: Optional[Session],
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
) -> Connection: ...
class RecordBatchStream:
@@ -440,7 +442,7 @@ class AsyncPermutationBuilder:
async def execute(self) -> Table: ...
def async_permutation_builder(
table: Table, dest_table_name: str
table: Table,
) -> AsyncPermutationBuilder: ...
def fts_query_to_json(query: Any) -> str: ...

View File

@@ -2,7 +2,9 @@
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import asyncio
import os
import threading
import warnings
class BackgroundEventLoop:
@@ -13,6 +15,9 @@ class BackgroundEventLoop:
"""
def __init__(self):
self._start()
def _start(self):
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
@@ -31,3 +36,30 @@ class BackgroundEventLoop:
LOOP = BackgroundEventLoop()
_FORK_WARNED = False
def _reset_after_fork():
# Threads do not survive fork(), so the asyncio loop in LOOP.thread is
# dead in the child. Re-initialize the singleton in place so existing
# `from .background_loop import LOOP` references in other modules see
# the new state. The Rust-side tokio runtime is reset analogously by a
# pthread_atfork hook installed in the _lancedb extension.
LOOP._start()
global _FORK_WARNED
if not _FORK_WARNED:
_FORK_WARNED = True
warnings.warn(
"lancedb fork support is experimental: the internal async "
"runtime has been reset in the forked child, but a small chance "
"of deadlock remains if other state was mid-operation at fork "
"time. The 'forkserver' or 'spawn' multiprocessing start method "
"is likely a safer alternative.",
RuntimeWarning,
stacklevel=2,
)
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=_reset_after_fork)

View File

@@ -590,8 +590,13 @@ class LanceDBConnection(DBConnection):
read_consistency_interval: Optional[timedelta] = None,
storage_options: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
manifest_enabled: bool = False,
namespace_client_properties: Optional[Dict[str, str]] = None,
_inner: Optional[LanceDbConnection] = None,
):
self.storage_options = storage_options
self._manifest_enabled = manifest_enabled
self._namespace_client_properties = namespace_client_properties
if _inner is not None:
self._conn = _inner
self._cached_namespace_client = None
@@ -633,6 +638,8 @@ class LanceDBConnection(DBConnection):
None,
storage_options,
session,
manifest_enabled,
namespace_client_properties,
)
# TODO: It would be nice if we didn't store self.storage_options but it is
@@ -640,7 +647,6 @@ class LanceDBConnection(DBConnection):
# work because some paths like LanceDBConnection.from_inner will lose the
# storage_options. Also, this class really shouldn't be holding any state
# beyond _conn.
self.storage_options = storage_options
self._conn = AsyncConnection(LOOP.run(do_connect()))
self._cached_namespace_client: Optional[LanceNamespace] = None
@@ -677,6 +683,8 @@ class LanceDBConnection(DBConnection):
"connection_type": "local",
"uri": self.uri,
"storage_options": self.storage_options,
"manifest_enabled": self._manifest_enabled,
"namespace_client_properties": self._namespace_client_properties,
"read_consistency_interval_seconds": (
rci.total_seconds() if rci else None
),

View File

@@ -1,11 +1,12 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from deprecation import deprecated
from lancedb import AsyncConnection, DBConnection
import pyarrow as pa
import copy
import json
from deprecation import deprecated
import pyarrow as pa
from ._lancedb import async_permutation_builder, PermutationReader
from .table import LanceTable
from .background_loop import LOOP
@@ -36,10 +37,7 @@ class PermutationBuilder:
be referenced by name in the future. If names are not provided then they can only
be referenced by their ordinal index. There is no requirement to name every split.
By default, the permutation will be stored in memory and will be lost when the
program exits. To persist the permutation (for very large datasets or to share
the permutation across multiple workers) use the [persist](#persist) method to
create a permanent table.
The permutation is stored in memory and will be lost when the program exits.
"""
def __init__(self, table: LanceTable):
@@ -51,15 +49,6 @@ class PermutationBuilder:
"""
self._async = async_permutation_builder(table)
def persist(
self, database: Union[DBConnection, AsyncConnection], table_name: str
) -> "PermutationBuilder":
"""
Persist the permutation to the given database.
"""
self._async.persist(database, table_name)
return self
def split_random(
self,
*,
@@ -380,20 +369,44 @@ class Permutation:
def __init__(
self,
reader: PermutationReader,
base_table: LanceTable,
permutation_table: Optional[LanceTable],
split: int,
selection: dict[str, str],
batch_size: int,
transform_fn: Callable[pa.RecordBatch, Any],
offset: Optional[int] = None,
limit: Optional[int] = None,
connection_factory: Optional[Callable[[str], LanceTable]] = None,
_reader: Optional[PermutationReader] = None,
):
"""
Internal constructor. Use [from_tables](#from_tables) instead.
"""
assert reader is not None, "reader is required"
assert base_table is not None, "base_table is required"
assert selection is not None, "selection is required"
self.reader = reader
self.base_table = base_table
self.permutation_table = permutation_table
self.split = split
self.selection = selection
self.transform_fn = transform_fn
self.batch_size = batch_size
self.offset = offset
self.limit = limit
self.connection_factory = connection_factory
if _reader is None:
_reader = LOOP.run(self._build_reader())
self.reader: PermutationReader = _reader
async def _build_reader(self) -> PermutationReader:
reader = await PermutationReader.from_tables(
self.base_table, self.permutation_table, self.split
)
if self.offset is not None:
reader = await reader.with_offset(self.offset)
if self.limit is not None:
reader = await reader.with_limit(self.limit)
return reader
def _with_selection(self, selection: dict[str, str]) -> "Permutation":
"""
@@ -402,21 +415,97 @@ class Permutation:
Does not validation of the selection and it replaces it entirely. This is not
intended for public use.
"""
return Permutation(self.reader, selection, self.batch_size, self.transform_fn)
def _with_reader(self, reader: PermutationReader) -> "Permutation":
"""
Creates a new permutation with the given reader
This is an internal method and should not be used directly.
"""
return Permutation(reader, self.selection, self.batch_size, self.transform_fn)
new = copy.copy(self)
new.selection = selection
return new
def with_batch_size(self, batch_size: int) -> "Permutation":
"""
Creates a new permutation with the given batch size
"""
return Permutation(self.reader, self.selection, batch_size, self.transform_fn)
new = copy.copy(self)
new.batch_size = batch_size
return new
def with_connection_factory(
self, connection_factory: Callable[[str], LanceTable]
) -> "Permutation":
"""
Creates a new permutation that will use ``connection_factory`` to reopen
the base table when this permutation is unpickled in a worker process.
The factory is a callable that takes a single argument — the base table
name — and returns a [LanceTable]. It must be picklable; the worker
will pickle it via standard ``pickle`` and call it to recover the base
table. Picklable callables in practice means top-level (module-level)
functions, ``functools.partial`` of such functions, or instances of
picklable classes implementing ``__call__``. Lambdas and closures over
local variables don't pickle with the default protocol.
Setting a factory is necessary when the URI alone is not enough to
re-open the connection — most importantly for LanceDB Cloud (``db://``)
connections, where ``api_key`` and ``region`` aren't recoverable from
the connection object after construction.
For local file or cloud-storage paths the factory is optional: if not
set, ``__getstate__`` falls back to capturing
``(uri, storage_options, namespace_path)`` and re-opening via
``lancedb.connect(uri, storage_options=...)``.
Examples
--------
Basic native (file-system path), parameterized via ``functools.partial``::
import functools, lancedb
from lancedb.permutation import Permutation
def open_native_table(uri: str, table_name: str):
return lancedb.connect(uri).open_table(table_name)
factory = functools.partial(open_native_table, "/data/lance_db")
permutation = Permutation.identity(
factory("training")
).with_connection_factory(factory)
Native via :func:`lancedb.connect_namespace` (e.g. a directory- or
REST-backed namespace client). The factory takes the
implementation name and properties dict as partial-bound args so
the worker can rebuild the same namespace connection::
def open_via_namespace(
impl: str, properties: dict[str, str], table_name: str,
):
return lancedb.connect_namespace(impl, properties).open_table(
table_name,
)
factory = functools.partial(
open_via_namespace,
"dir",
{"root": "/data/lance_db"},
)
LanceDB Cloud, reading credentials from env vars at worker startup
so secrets aren't pickled into the dataset::
import os, lancedb
def open_remote_table(table_name: str):
db = lancedb.connect(
"db://my-database",
api_key=os.environ["LANCEDB_API_KEY"],
region=os.environ.get("LANCEDB_REGION", "us-east-1"),
)
return db.open_table(table_name)
permutation = Permutation.identity(
open_remote_table("training")
).with_connection_factory(open_remote_table)
"""
assert connection_factory is not None, "connection_factory is required"
new = copy.copy(self)
new.connection_factory = connection_factory
return new
@classmethod
def identity(cls, table: LanceTable) -> "Permutation":
@@ -489,11 +578,126 @@ class Permutation:
schema = await reader.output_schema(None)
initial_selection = {name: name for name in schema.names}
return cls(
reader, initial_selection, DEFAULT_BATCH_SIZE, Transforms.arrow2python
base_table,
permutation_table,
split,
initial_selection,
DEFAULT_BATCH_SIZE,
Transforms.arrow2python,
_reader=reader,
)
return LOOP.run(do_from_tables())
def __getstate__(self) -> dict[str, Any]:
"""Build a picklable state dict for this permutation.
The base table is captured either via a user-supplied
``connection_factory`` (see [with_connection_factory]) or, as a
fallback, by introspecting ``(uri, storage_options, namespace_path)``
on the connection. The permutation table — always an in-memory
LanceDB table — is captured as a pyarrow Table (which pickles via
Arrow IPC natively). The reader is dropped from the wire format;
``__setstate__`` rebuilds it from the restored tables.
"""
permutation_data: Optional[pa.Table] = None
if self.permutation_table is not None:
permutation_data = self.permutation_table.to_arrow()
common = {
"base_table_name": self.base_table.name,
"permutation_data": permutation_data,
"split": self.split,
"selection": self.selection,
"batch_size": self.batch_size,
"transform_fn": self.transform_fn,
"offset": self.offset,
"limit": self.limit,
"connection_factory": self.connection_factory,
}
if self.connection_factory is not None:
# The factory carries enough state to recover the base table on
# its own; we don't need to capture the URI / storage options /
# namespace from the existing connection.
return common
# URI-introspection fallback: only viable for native (OSS) connections
# where (uri, storage_options) is enough to reopen. Remote / cloud
# connections don't expose recoverable api_key / region — those users
# must call with_connection_factory().
try:
base_uri = self.base_table._conn.uri
storage_options = self.base_table._conn.storage_options
except AttributeError as e:
raise ValueError(
"Cannot pickle this Permutation: the base table's connection "
"does not expose a uri/storage_options, which usually means it "
"is a remote (LanceDB Cloud) connection. Call "
"Permutation.with_connection_factory(...) first to provide a "
"picklable callable that re-opens the base table from a worker "
"process."
) from e
if base_uri.startswith("memory://"):
# In-memory base tables don't exist in any worker process by
# default, so dump the entire base table into the pickle. This
# can be expensive for large datasets — users with large
# in-memory base tables should either persist them or set a
# connection_factory.
return {
**common,
"base_table_data": self.base_table.to_arrow(),
}
return {
**common,
"base_table_uri": base_uri,
"base_table_namespace": self.base_table._namespace_path,
"base_table_storage_options": storage_options,
}
def __setstate__(self, state: dict[str, Any]) -> None:
from . import connect
connection_factory = state["connection_factory"]
if connection_factory is not None:
base_table = connection_factory(state["base_table_name"])
elif "base_table_data" in state:
# In-memory base table inlined into the pickle; rebuild the same
# way we rebuild the in-memory permutation table.
mem_db = connect("memory://")
base_table = mem_db.create_table(
state["base_table_name"], state["base_table_data"]
)
else:
base_db = connect(
state["base_table_uri"],
storage_options=state["base_table_storage_options"],
)
base_table = base_db.open_table(
state["base_table_name"],
namespace_path=state["base_table_namespace"] or None,
)
permutation_table: Optional[LanceTable] = None
if state["permutation_data"] is not None:
mem_db = connect("memory://")
permutation_table = mem_db.create_table(
"permutation", state["permutation_data"]
)
self.base_table = base_table
self.permutation_table = permutation_table
self.split = state["split"]
self.selection = state["selection"]
self.batch_size = state["batch_size"]
self.transform_fn = state["transform_fn"]
self.offset = state["offset"]
self.limit = state["limit"]
self.connection_factory = connection_factory
self.reader = LOOP.run(self._build_reader())
@property
def schema(self) -> pa.Schema:
async def do_output_schema():
@@ -760,7 +964,9 @@ class Permutation:
for expensive operations such as image decoding.
"""
assert transform is not None, "transform is required"
return Permutation(self.reader, self.selection, self.batch_size, transform)
new = copy.copy(self)
new.transform_fn = transform
return new
def __getitem__(self, index: int) -> Any:
"""
@@ -795,12 +1001,10 @@ class Permutation:
"""
Skip the first `skip` rows of the permutation
"""
async def do_with_skip():
reader = await self.reader.with_offset(skip)
return self._with_reader(reader)
return LOOP.run(do_with_skip())
new = copy.copy(self)
new.offset = skip
new.reader = LOOP.run(new._build_reader())
return new
@deprecated(details="Use with_take instead")
def take(self, limit: int) -> "Permutation":
@@ -818,12 +1022,10 @@ class Permutation:
"""
Limit the permutation to `limit` rows (following any `skip`)
"""
async def do_with_take():
reader = await self.reader.with_limit(limit)
return self._with_reader(reader)
return LOOP.run(do_with_take())
new = copy.copy(self)
new.limit = limit
new.reader = LOOP.run(new._build_reader())
return new
@deprecated(details="Use with_repeat instead")
def repeat(self, times: int) -> "Permutation":

View File

@@ -1643,7 +1643,7 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
def _validate_query(self, query, vector=None, text=None):
if query is not None and (vector is not None or text is not None):
raise ValueError(
"You can either provide a string query in search() method"
"You can either provide a string query in search() method "
"or set `vector()` and `text()` explicitly for hybrid search."
"But not both."
)

View File

@@ -9,21 +9,6 @@ from lancedb import DBConnection, Table, connect
from lancedb.permutation import Permutation, Permutations, permutation_builder
def test_permutation_persistence(tmp_path):
db = connect(tmp_path)
tbl = db.create_table("test_table", pa.table({"x": range(100), "y": range(100)}))
permutation_tbl = (
permutation_builder(tbl).shuffle().persist(db, "test_permutation").execute()
)
assert permutation_tbl.count_rows() == 100
re_open = db.open_table("test_permutation")
assert re_open.count_rows() == 100
assert permutation_tbl.to_arrow() == re_open.to_arrow()
def test_split_random_ratios(mem_db):
"""Test random splitting with ratios."""
tbl = mem_db.create_table(

View File

@@ -6,6 +6,8 @@ import contextlib
from datetime import timedelta
import http.server
import json
import multiprocessing as mp
import sys
import threading
import time
from unittest.mock import MagicMock, patch
@@ -1230,3 +1232,82 @@ def test_background_loop_cancellation(exception):
with pytest.raises(exception):
loop.run(None)
mock_future.cancel.assert_called_once()
def _remote_fork_child(port: int, queue) -> None:
# Build a fresh Connection in the child so we exercise the at-fork-child
# tokio runtime reset rather than relying on an inherited reqwest client.
db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
queue.put(db.table_names())
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
"fork() is unavailable on Windows and unsafe on macOS "
"(Apple frameworks/TLS are not fork-safe)"
),
)
def test_remote_connection_after_fork():
"""A freshly-built remote Connection in a forked child should not hang.
The pyo3-async-runtimes tokio runtime would otherwise be inherited from
the parent with dead worker threads; the at-fork-child handler in our
runtime module rebuilds it on first use in the child.
"""
def handler(request):
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.end_headers()
request.wfile.write(b'{"tables": []}')
server = http.server.HTTPServer(("localhost", 0), make_mock_http_handler(handler))
port = server.server_address[1]
server_thread = threading.Thread(target=server.serve_forever)
server_thread.start()
try:
# Hit the server in the parent first so the runtime + LOOP are warm
# before fork; a fresh child must still succeed.
parent_db = lancedb.connect(
"db://dev",
api_key="fake",
host_override=f"http://localhost:{port}",
client_config={
"retry_config": {"retries": 0},
"timeout_config": {"connect_timeout": 2, "read_timeout": 2},
},
)
assert parent_db.table_names() == []
ctx = mp.get_context("fork")
queue = ctx.Queue()
proc = ctx.Process(target=_remote_fork_child, args=(port, queue))
proc.start()
proc.join(timeout=15)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
pytest.fail("Remote connection hung after fork")
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no result"
assert queue.get() == []
# Parent connection must still be usable after the child returned.
assert parent_db.table_names() == []
finally:
server.shutdown()
server_thread.join()

View File

@@ -1,14 +1,29 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import functools
import multiprocessing as mp
import pickle
import sys
import lancedb
import pyarrow as pa
import pytest
from lancedb.permutation import Permutation, Permutations, permutation_builder
from lancedb.util import tbl_to_tensor
from lancedb.permutation import Permutation
torch = pytest.importorskip("torch")
def _open_native_table(uri: str, table_name: str):
"""Top-level connection factory used by the explicit-factory pickle test.
Defined at module scope so that pickle can resolve it by name in the
worker / unpickling process.
"""
return lancedb.connect(uri).open_table(table_name)
def test_table_dataloader(mem_db):
table = mem_db.create_table("test_table", pa.table({"a": range(1000)}))
dataloader = torch.utils.data.DataLoader(
@@ -40,3 +55,156 @@ def test_permutation_dataloader(mem_db):
for batch in dataloader:
assert batch.size(0) == 1
assert batch.size(1) == 10
def test_permutation_is_picklable(tmp_db):
"""A Permutation must be picklable so it can be used with PyTorch's
DataLoader when num_workers > 0 (which uses multiprocessing and pickles
the dataset to pass it to worker processes)."""
table = tmp_db.create_table("test_table", pa.table({"a": range(1000)}))
permutation = Permutation.identity(table)
pickled = pickle.dumps(permutation)
restored = pickle.loads(pickled)
assert len(restored) == 1000
rows = restored.__getitems__([0, 1, 2])
assert rows == [{"a": 0}, {"a": 1}, {"a": 2}]
def test_permutation_with_memory_base_is_picklable(mem_db):
"""An in-memory base table is inlined into the pickle as Arrow IPC bytes
and rebuilt on the other side as an in-memory LanceTable, so the
Permutation round-trips even though the original database can't be
reopened across processes."""
table = mem_db.create_table("test_table", pa.table({"a": range(50)}))
permutation = Permutation.identity(table)
restored = pickle.loads(pickle.dumps(permutation))
assert len(restored) == 50
assert restored.__getitems__([0, 10, 49]) == [{"a": 0}, {"a": 10}, {"a": 49}]
def test_permutation_dataloader_multiprocessing(tmp_db):
"""Using a Permutation with a PyTorch DataLoader that has num_workers > 0
must work end-to-end. Each worker process gets a pickled copy of the
dataset and reads batches from it."""
table = tmp_db.create_table("test_table", pa.table({"a": range(1000)}))
permutation = Permutation.identity(table)
dataloader = torch.utils.data.DataLoader(
permutation,
batch_size=10,
shuffle=True,
num_workers=2,
multiprocessing_context="spawn",
)
seen = 0
for batch in dataloader:
assert batch["a"].size(0) == 10
seen += batch["a"].size(0)
assert seen == 1000
def test_permutation_pickle_with_connection_factory(tmp_path):
"""When the user provides a connection_factory, pickling should round-trip
through that factory rather than introspecting the connection URI. Useful
for remote / cloud connections where the URI alone isn't reopenable."""
db = lancedb.connect(tmp_path)
db.create_table("test_table", pa.table({"a": range(50)}))
factory = functools.partial(_open_native_table, str(tmp_path))
permutation = Permutation.identity(factory("test_table")).with_connection_factory(
factory
)
restored = pickle.loads(pickle.dumps(permutation))
assert len(restored) == 50
# The factory survives pickling and is what powered base-table reopen.
assert restored.connection_factory is not None
assert restored.connection_factory.func is _open_native_table
assert restored.__getitems__([0, 1, 2]) == [{"a": 0}, {"a": 1}, {"a": 2}]
def test_permutation_with_builder_is_picklable(tmp_db):
"""A Permutation built from a non-identity permutation table must round-trip
through pickle while preserving the row order defined by the permutation."""
table = tmp_db.create_table("test_table", pa.table({"a": range(100)}))
perm_tbl = (
permutation_builder(table)
.split_random(ratios=[0.8, 0.2], seed=42, split_names=["train", "test"])
.shuffle(seed=42)
.execute()
)
permutations = Permutations(table, perm_tbl)
permutation = permutations["train"]
indices = list(range(len(permutation)))
expected = permutation.__getitems__(indices)
restored = pickle.loads(pickle.dumps(permutation))
assert len(restored) == len(permutation)
assert restored.__getitems__(indices) == expected
def _multiworker_dataloader_target(db_uri: str, result_queue):
import lancedb
from lancedb.permutation import Permutation
db = lancedb.connect(db_uri)
table = db.open_table("test_table")
permutation = Permutation.identity(table)
dataloader = torch.utils.data.DataLoader(
permutation,
batch_size=10,
num_workers=2,
multiprocessing_context="fork",
)
count = 0
for batch in dataloader:
assert batch["a"].size(0) == 10
count += 1
result_queue.put(count)
@pytest.mark.skipif(
sys.platform != "linux",
reason=(
"fork() is unavailable on Windows and unsafe on macOS "
"(Apple frameworks/TLS are not fork-safe)"
),
)
def test_permutation_dataloader_fork_workers(tmp_path):
"""A Permutation used by a fork-based DataLoader should not hang.
PyTorch's DataLoader uses fork-based multiprocessing by default on Linux.
LanceDB drives async work through a background asyncio thread that does
not survive a fork, so any LOOP.run() in a worker blocks forever.
"""
import lancedb
db_uri = str(tmp_path / "db")
db = lancedb.connect(db_uri)
db.create_table("test_table", pa.table({"a": list(range(1000))}))
ctx = mp.get_context("spawn")
queue = ctx.Queue()
proc = ctx.Process(target=_multiworker_dataloader_target, args=(db_uri, queue))
proc.start()
proc.join(timeout=30)
if proc.is_alive():
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
proc.kill()
proc.join()
pytest.fail("Permutation hung when iterated in a fork-based DataLoader worker")
assert proc.exitcode == 0, f"child exited with code {proc.exitcode}"
assert not queue.empty(), "child produced no batches"
assert queue.get() == 100

View File

@@ -3,6 +3,8 @@
use std::sync::Arc;
use crate::error::PythonErrorExt;
use crate::runtime::future_into_py;
use arrow::{
datatypes::SchemaRef,
pyarrow::{IntoPyArrow, ToPyArrow},
@@ -12,9 +14,6 @@ use lancedb::arrow::SendableRecordBatchStream;
use pyo3::{
Bound, Py, PyAny, PyRef, PyResult, Python, exceptions::PyStopAsyncIteration, pyclass, pymethods,
};
use pyo3_async_runtimes::tokio::future_into_py;
use crate::error::PythonErrorExt;
#[pyclass]
pub struct RecordBatchStream {

View File

@@ -7,6 +7,12 @@ use std::{
time::Duration,
};
use crate::{
error::PythonErrorExt,
namespace::{create_namespace_storage_options_provider, extract_namespace_arc},
runtime::future_into_py,
table::Table,
};
use arrow::{datatypes::Schema, ffi_stream::ArrowArrayStreamReader, pyarrow::FromPyArrow};
use lancedb::{
connection::Connection as LanceConnection,
@@ -20,13 +26,6 @@ use pyo3::{
pyclass, pyfunction, pymethods,
types::{PyDict, PyDictMethods},
};
use pyo3_async_runtimes::tokio::future_into_py;
use crate::{
error::PythonErrorExt,
namespace::{create_namespace_storage_options_provider, extract_namespace_arc},
table::Table,
};
#[pyclass]
pub struct Connection {
@@ -525,7 +524,7 @@ impl Connection {
}
#[pyfunction]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None))]
#[pyo3(signature = (uri, api_key=None, region=None, host_override=None, read_consistency_interval=None, client_config=None, storage_options=None, session=None, manifest_enabled=false, namespace_client_properties=None))]
#[allow(clippy::too_many_arguments)]
pub fn connect(
py: Python<'_>,
@@ -537,6 +536,8 @@ pub fn connect(
client_config: Option<PyClientConfig>,
storage_options: Option<HashMap<String, String>>,
session: Option<crate::session::Session>,
manifest_enabled: bool,
namespace_client_properties: Option<HashMap<String, String>>,
) -> PyResult<Bound<'_, PyAny>> {
future_into_py(py, async move {
let mut builder = lancedb::connect(&uri);
@@ -556,6 +557,12 @@ pub fn connect(
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);
}
if manifest_enabled {
builder = builder.manifest_enabled(true);
}
if let Some(namespace_client_properties) = namespace_client_properties {
builder = builder.namespace_client_properties(namespace_client_properties);
}
#[cfg(feature = "remote")]
if let Some(client_config) = client_config {
builder = builder.client_config(client_config.into());

View File

@@ -17,7 +17,7 @@ use pyo3::{Bound, PyAny, PyResult, exceptions::PyValueError, prelude::*, pyfunct
/// [`expr_lit`] and combined with the methods on this struct. On the Python
/// side a thin wrapper class (`lancedb.expr.Expr`) delegates to these methods
/// and adds Python operator overloads.
#[pyclass(name = "PyExpr")]
#[pyclass(name = "PyExpr", from_py_object)]
#[derive(Clone)]
pub struct PyExpr(pub DfExpr);

View File

@@ -33,7 +33,7 @@ impl PyHeaderProvider {
Ok(headers_py) => {
// Convert Python dict to Rust HashMap
let bound_headers = headers_py.bind(py);
let dict: &Bound<PyDict> = bound_headers.downcast().map_err(|e| {
let dict: &Bound<PyDict> = bound_headers.cast().map_err(|e| {
format!("HeaderProvider.get_headers must return a dict: {}", e)
})?;

View File

@@ -13,7 +13,7 @@ use pyo3::{
Bound, FromPyObject, PyAny, PyResult, Python,
exceptions::{PyKeyError, PyValueError},
intern, pyclass, pymethods,
types::PyAnyMethods,
types::{PyAnyMethods, PyString},
};
use crate::util::parse_distance_type;
@@ -22,7 +22,7 @@ pub fn class_name(ob: &'_ Bound<'_, PyAny>) -> PyResult<String> {
let full_name = ob
.getattr(intern!(ob.py(), "__class__"))?
.getattr(intern!(ob.py(), "__name__"))?;
let full_name = full_name.downcast()?.to_string_lossy();
let full_name = full_name.cast::<PyString>()?.to_string_lossy();
match full_name.rsplit_once('.') {
Some((_, name)) => Ok(name.to_string()),

View File

@@ -28,6 +28,7 @@ pub mod index;
pub mod namespace;
pub mod permutation;
pub mod query;
pub mod runtime;
pub mod session;
pub mod table;
pub mod util;

View File

@@ -183,7 +183,7 @@ async fn call_py_method_primitive<Req, Resp>(
) -> lance_core::Result<Resp>
where
Req: serde::Serialize + Send + 'static,
Resp: for<'py> pyo3::FromPyObject<'py> + Send + 'static,
Resp: for<'a, 'py> pyo3::FromPyObject<'a, 'py> + Send + 'static,
{
let request_json = serde_json::to_string(&request).map_err(|e| {
lance_core::Error::io(format!(
@@ -203,7 +203,7 @@ where
// Call the Python method
let result = py_namespace.call_method1(py, method_name, (request_arg,))?;
let value: Resp = result.extract(py)?;
let value: Resp = result.extract(py).map_err(Into::into)?;
Ok::<_, PyErr>(value)
})
})

View File

@@ -4,7 +4,7 @@
use std::sync::{Arc, Mutex};
use crate::{
arrow::RecordBatchStream, connection::Connection, error::PythonErrorExt, table::Table,
arrow::RecordBatchStream, error::PythonErrorExt, runtime::future_into_py, table::Table,
};
use arrow::pyarrow::{PyArrowType, ToPyArrow};
use lancedb::{
@@ -21,16 +21,15 @@ use pyo3::{
pyclass, pymethods,
types::{PyAnyMethods, PyDict, PyDictMethods, PyType},
};
use pyo3_async_runtimes::tokio::future_into_py;
fn table_from_py<'a>(table: Bound<'a, PyAny>) -> PyResult<Bound<'a, Table>> {
if table.hasattr("_inner")? {
Ok(table.getattr("_inner")?.downcast_into::<Table>()?)
Ok(table.getattr("_inner")?.cast_into::<Table>()?)
} else if table.hasattr("_table")? {
Ok(table
.getattr("_table")?
.getattr("_inner")?
.downcast_into::<Table>()?)
.cast_into::<Table>()?)
} else {
Err(PyRuntimeError::new_err(
"Provided table does not appear to be a Table or RemoteTable instance",
@@ -80,24 +79,6 @@ impl PyAsyncPermutationBuilder {
#[pymethods]
impl PyAsyncPermutationBuilder {
#[pyo3(signature = (database, table_name))]
pub fn persist(
slf: PyRefMut<'_, Self>,
database: Bound<'_, PyAny>,
table_name: String,
) -> PyResult<Self> {
let conn = if database.hasattr("_conn")? {
database
.getattr("_conn")?
.getattr("_inner")?
.downcast_into::<Connection>()?
} else {
database.getattr("_inner")?.downcast_into::<Connection>()?
};
let database = conn.borrow().database()?;
slf.modify(|builder| builder.persist(database, table_name))
}
#[pyo3(signature = (*, ratios=None, counts=None, fixed=None, seed=None, split_names=None))]
pub fn split_random(
slf: PyRefMut<'_, Self>,
@@ -243,7 +224,7 @@ impl PyPermutationReader {
let Some(selection) = selection else {
return Ok(Select::All);
};
let selection = selection.downcast_into::<PyDict>()?;
let selection = selection.cast_into::<PyDict>()?;
let selection = selection
.iter()
.map(|(key, value)| {

View File

@@ -4,6 +4,11 @@
use std::sync::Arc;
use std::time::Duration;
use crate::expr::PyExpr;
use crate::runtime::future_into_py;
use crate::util::parse_distance_type;
use crate::{arrow::RecordBatchStream, util::PyLanceDB};
use crate::{error::PythonErrorExt, index::class_name};
use arrow::array::Array;
use arrow::array::ArrayData;
use arrow::array::make_array;
@@ -33,19 +38,16 @@ use pyo3::pyfunction;
use pyo3::pymethods;
use pyo3::types::PyList;
use pyo3::types::{PyDict, PyString};
use pyo3::{FromPyObject, exceptions::PyRuntimeError};
use pyo3::{Borrowed, FromPyObject, exceptions::PyRuntimeError};
use pyo3::{PyErr, pyclass};
use pyo3::{exceptions::PyValueError, intern};
use pyo3_async_runtimes::tokio::future_into_py;
use crate::expr::PyExpr;
use crate::util::parse_distance_type;
use crate::{arrow::RecordBatchStream, util::PyLanceDB};
use crate::{error::PythonErrorExt, index::class_name};
impl<'a, 'py> FromPyObject<'a, 'py> for PyLanceDB<FtsQuery> {
type Error = PyErr;
impl FromPyObject<'_> for PyLanceDB<FtsQuery> {
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
match class_name(ob)?.as_str() {
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
let ob = ob.to_owned();
match class_name(&ob)?.as_str() {
"MatchQuery" => {
let query = ob.getattr("query")?.extract()?;
let column = ob.getattr("column")?.extract()?;
@@ -424,7 +426,7 @@ impl Query {
"Query text is required for nearest_to_text",
))?;
let query = if let Ok(query_text) = fts_query.downcast::<PyString>() {
let query = if let Ok(query_text) = fts_query.cast::<PyString>() {
let mut query_text = query_text.to_string();
let columns = query
.get_item("columns")?
@@ -606,7 +608,7 @@ impl TakeQuery {
}
}
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct FTSQuery {
inner: LanceDbQuery,
@@ -735,7 +737,7 @@ impl FTSQuery {
}
}
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct VectorQuery {
inner: LanceDbVectorQuery,

142
python/src/runtime.rs Normal file
View File

@@ -0,0 +1,142 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Fork-safe wrapper around tokio + pyo3-async-runtimes.
//!
//! `pyo3_async_runtimes::tokio` keeps its multi-threaded runtime in a
//! `OnceLock` that can never be replaced. Tokio's worker threads do not
//! survive `fork()`, so once a child inherits a "frozen" runtime, every
//! `future_into_py` call hangs forever.
//!
//! We sidestep the global by routing every future through our own
//! [`LanceRuntime`] (a [`pyo3_async_runtimes::generic::Runtime`] impl) backed
//! by an [`AtomicPtr`] to a tokio runtime that we own. A `pthread_atfork`
//! child handler nulls the pointer; the next `spawn` rebuilds the runtime in
//! the child. This mirrors the pattern used in the Lance Python bindings.
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use pyo3::{Bound, PyAny, PyResult, Python, conversion::IntoPyObject};
use pyo3_async_runtimes::{
TaskLocals,
generic::{ContextExt, JoinError, Runtime},
};
use tokio::{runtime, task};
static RUNTIME: AtomicPtr<runtime::Runtime> = AtomicPtr::new(std::ptr::null_mut());
static RUNTIME_INSTALLING: AtomicBool = AtomicBool::new(false);
static ATFORK_INSTALLED: AtomicBool = AtomicBool::new(false);
fn create_runtime() -> runtime::Runtime {
runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("lancedb-tokio-worker")
.build()
.expect("Failed to build tokio runtime")
}
fn get_runtime() -> &'static runtime::Runtime {
loop {
let ptr = RUNTIME.load(Ordering::SeqCst);
if !ptr.is_null() {
return unsafe { &*ptr };
}
if !RUNTIME_INSTALLING.fetch_or(true, Ordering::SeqCst) {
break;
}
std::thread::yield_now();
}
if !ATFORK_INSTALLED.fetch_or(true, Ordering::SeqCst) {
install_atfork();
}
let new_ptr = Box::into_raw(Box::new(create_runtime()));
RUNTIME.store(new_ptr, Ordering::SeqCst);
unsafe { &*new_ptr }
}
/// Runs in async-signal context after `fork()` in the child. We can only
/// touch atomics here; we deliberately leak the previous runtime because
/// dropping a tokio `Runtime` would try to join its (now-dead) worker
/// threads and hang.
extern "C" fn atfork_child() {
RUNTIME.store(std::ptr::null_mut(), Ordering::SeqCst);
RUNTIME_INSTALLING.store(false, Ordering::SeqCst);
}
#[cfg(not(windows))]
fn install_atfork() {
unsafe { libc::pthread_atfork(None, None, Some(atfork_child)) };
}
#[cfg(windows)]
fn install_atfork() {}
/// Marker type implementing [`Runtime`] over our fork-safe runtime slot.
pub struct LanceRuntime;
/// Newtype wrapper around `tokio::task::JoinError` so we can implement the
/// foreign [`JoinError`] trait without violating orphan rules.
pub struct LanceJoinError(task::JoinError);
impl JoinError for LanceJoinError {
fn is_panic(&self) -> bool {
self.0.is_panic()
}
fn into_panic(self) -> Box<dyn std::any::Any + Send + 'static> {
self.0.into_panic()
}
}
impl Runtime for LanceRuntime {
type JoinError = LanceJoinError;
type JoinHandle = Pin<Box<dyn Future<Output = Result<(), Self::JoinError>> + Send>>;
fn spawn<F>(fut: F) -> Self::JoinHandle
where
F: Future<Output = ()> + Send + 'static,
{
let handle = get_runtime().spawn(fut);
Box::pin(async move { handle.await.map_err(LanceJoinError) })
}
fn spawn_blocking<F>(f: F) -> Self::JoinHandle
where
F: FnOnce() + Send + 'static,
{
let handle = get_runtime().spawn_blocking(f);
Box::pin(async move { handle.await.map_err(LanceJoinError) })
}
}
tokio::task_local! {
static TASK_LOCALS: std::cell::OnceCell<TaskLocals>;
}
impl ContextExt for LanceRuntime {
fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
where
F: Future<Output = R> + Send + 'static,
{
let cell = std::cell::OnceCell::new();
cell.set(locals).unwrap();
Box::pin(TASK_LOCALS.scope(cell, fut))
}
fn get_task_locals() -> Option<TaskLocals> {
TASK_LOCALS
.try_with(|c| c.get().cloned())
.unwrap_or_default()
}
}
/// Drop-in replacement for `pyo3_async_runtimes::tokio::future_into_py` that
/// uses our fork-safe runtime.
pub fn future_into_py<F, T>(py: Python<'_>, fut: F) -> PyResult<Bound<'_, PyAny>>
where
F: Future<Output = PyResult<T>> + Send + 'static,
T: for<'py> IntoPyObject<'py> + Send + 'static,
{
pyo3_async_runtimes::generic::future_into_py::<LanceRuntime, _, T>(py, fut)
}

View File

@@ -11,7 +11,7 @@ use pyo3::{PyResult, pyclass, pymethods};
/// Sessions allow you to configure cache sizes for index and metadata caches,
/// which can significantly impact memory use and performance. They can
/// also be re-used across multiple connections to share the same cache state.
#[pyclass]
#[pyclass(from_py_object)]
#[derive(Clone)]
pub struct Session {
pub(crate) inner: Arc<LanceSession>,

View File

@@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, sync::Arc};
use crate::runtime::future_into_py;
use crate::{
connection::Connection,
error::PythonErrorExt,
@@ -24,12 +25,11 @@ use pyo3::{
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
};
use pyo3_async_runtimes::tokio::future_into_py;
mod scannable;
/// Statistics about a compaction operation.
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct CompactionStats {
/// The number of fragments removed
@@ -43,7 +43,7 @@ pub struct CompactionStats {
}
/// Statistics about a cleanup operation
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct RemovalStats {
/// The number of bytes removed
@@ -53,7 +53,7 @@ pub struct RemovalStats {
}
/// Statistics about an optimize operation
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct OptimizeStats {
/// Statistics about the compaction operation
@@ -62,7 +62,7 @@ pub struct OptimizeStats {
pub prune: RemovalStats,
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct UpdateResult {
pub rows_updated: u64,
@@ -88,7 +88,7 @@ impl From<lancedb::table::UpdateResult> for UpdateResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddResult {
pub version: u64,
@@ -109,7 +109,7 @@ impl From<lancedb::table::AddResult> for AddResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct DeleteResult {
pub num_deleted_rows: u64,
@@ -135,7 +135,7 @@ impl From<lancedb::table::DeleteResult> for DeleteResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct MergeResult {
pub version: u64,
@@ -171,7 +171,7 @@ impl From<lancedb::table::MergeResult> for MergeResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AddColumnsResult {
pub version: u64,
@@ -192,7 +192,7 @@ impl From<lancedb::table::AddColumnsResult> for AddColumnsResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct AlterColumnsResult {
pub version: u64,
@@ -213,7 +213,7 @@ impl From<lancedb::table::AlterColumnsResult> for AlterColumnsResult {
}
}
#[pyclass(get_all)]
#[pyclass(get_all, from_py_object)]
#[derive(Clone, Debug)]
pub struct DropColumnsResult {
pub version: u64,

View File

@@ -126,8 +126,11 @@ impl Scannable for PyScannable {
}
}
impl<'py> FromPyObject<'py> for PyScannable {
fn extract_bound(ob: &pyo3::Bound<'py, PyAny>) -> pyo3::PyResult<Self> {
impl<'a, 'py> FromPyObject<'a, 'py> for PyScannable {
type Error = pyo3::PyErr;
fn extract(ob: pyo3::Borrowed<'a, 'py, PyAny>) -> pyo3::PyResult<Self> {
let ob = ob.to_owned();
// Convert from Scannable dataclass.
let schema: PyArrowType<Schema> = ob.getattr("schema")?.extract()?;
let schema = Arc::new(schema.0);

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.28.0-beta.9"
version = "0.28.0-beta.11"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -108,10 +108,20 @@ test-log = "0.2"
[features]
default = []
aws = ["lance/aws", "lance-io/aws", "lance-namespace-impls/dir-aws"]
aws = [
"lance/aws",
"lance-io/aws",
"lance-namespace-impls/dir-aws",
"object_store/aws",
]
oss = ["lance/oss", "lance-io/oss", "lance-namespace-impls/dir-oss"]
gcs = ["lance/gcp", "lance-io/gcp", "lance-namespace-impls/dir-gcp"]
azure = ["lance/azure", "lance-io/azure", "lance-namespace-impls/dir-azure"]
azure = [
"lance/azure",
"lance-io/azure",
"lance-namespace-impls/dir-azure",
"lance-namespace-impls/credential-vendor-azure",
]
huggingface = [
"lance/huggingface",
"lance-io/huggingface",

View File

@@ -590,6 +590,15 @@ pub struct ConnectRequest {
/// storage options.
pub namespace_client_properties: HashMap<String, String>,
/// Use directory namespace manifests as the source of truth for native
/// LanceDB table metadata.
///
/// When enabled for a local/native connection, LanceDB returns a
/// namespace-backed database directly. Directory listing fallback remains
/// enabled for migration, and directory-listing-to-manifest migration is
/// forced on.
pub manifest_enabled: bool,
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
@@ -630,6 +639,7 @@ impl ConnectBuilder {
read_consistency_interval: None,
options: HashMap::new(),
namespace_client_properties: HashMap::new(),
manifest_enabled: false,
session: None,
},
embedding_registry: None,
@@ -791,6 +801,17 @@ impl ConnectBuilder {
self
}
/// Enable or disable manifest-backed directory namespace mode for local
/// native connections.
///
/// When enabled, the connection uses the directory namespace database
/// directly for all table operations and forces
/// `dir_listing_to_manifest_migration_enabled=true`.
pub fn manifest_enabled(mut self, enabled: bool) -> Self {
self.request.manifest_enabled = enabled;
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
@@ -886,6 +907,16 @@ impl ConnectBuilder {
pub async fn execute(self) -> Result<Connection> {
if self.request.uri.starts_with("db") {
self.execute_remote()
} else if self.request.manifest_enabled {
let internal = Arc::new(
ListingDatabase::connect_manifest_enabled_namespace_database(&self.request).await?,
);
Ok(Connection {
internal,
embedding_registry: self
.embedding_registry
.unwrap_or_else(|| Arc::new(MemoryRegistry::new())),
})
} else {
let internal = Arc::new(ListingDatabase::connect_with_options(&self.request).await?);
Ok(Connection {
@@ -1132,6 +1163,9 @@ mod tests {
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use tempfile::tempdir;
use crate::database::listing::{ListingDatabaseOptions, OPT_NEW_TABLE_V2_MANIFEST_PATHS};
use crate::database::namespace::LanceNamespaceDatabase;
use crate::table::NativeTable;
use crate::test_utils::connection::new_test_connection;
use super::*;
@@ -1204,6 +1238,147 @@ mod tests {
);
}
#[tokio::test]
async fn test_connect_with_manifest_enabled_uses_directory_namespace() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri)
.manifest_enabled(true)
.storage_option("timeout", "30s")
.namespace_client_property("manifest_enabled", "false")
.namespace_client_property("dir_listing_to_manifest_migration_enabled", "false")
.execute()
.await
.unwrap();
assert!(
db.database()
.as_any()
.downcast_ref::<LanceNamespaceDatabase>()
.is_some()
);
assert_eq!(db.uri(), uri);
let (ns_impl, properties) = db.namespace_client_config().await.unwrap();
assert_eq!(ns_impl, "dir");
assert_eq!(properties.get("root"), Some(&uri.to_string()));
assert_eq!(
properties.get("manifest_enabled"),
Some(&"true".to_string())
);
assert_eq!(
properties.get("dir_listing_to_manifest_migration_enabled"),
Some(&"true".to_string())
);
assert_eq!(properties.get("storage.timeout"), Some(&"30s".to_string()));
}
#[tokio::test]
async fn test_manifest_enabled_rejects_commit_engine_uri() {
let Err(err) = connect("s3+ddb://bucket/db?ddbTableName=manifest")
.manifest_enabled(true)
.execute()
.await
else {
panic!("expected manifest-enabled s3+ddb connection to fail");
};
assert!(
matches!(err, Error::NotSupported { message } if message.contains("commit engine URI schemes"))
);
let Err(err) = connect("s3://bucket/db?engine=ddb&ddbTableName=manifest")
.manifest_enabled(true)
.execute()
.await
else {
panic!("expected manifest-enabled engine query connection to fail");
};
assert!(
matches!(err, Error::NotSupported { message } if message.contains("commit engine"))
);
}
#[tokio::test]
async fn test_manifest_enabled_connection_migrates_root_listing_table() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
connect(uri)
.execute()
.await
.unwrap()
.create_empty_table("legacy", schema)
.execute()
.await
.unwrap();
let db = connect(uri).manifest_enabled(true).execute().await.unwrap();
let tables = db.table_names().execute().await.unwrap();
assert_eq!(tables, vec!["legacy".to_string()]);
db.open_table("legacy").execute().await.unwrap();
}
#[tokio::test]
async fn test_manifest_enabled_preserves_new_table_options() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let options = ListingDatabaseOptions::builder()
.enable_v2_manifest_paths(true)
.build();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let table = connect(uri)
.manifest_enabled(true)
.database_options(&options)
.execute()
.await
.unwrap()
.create_empty_table("v1_manifest", schema)
.storage_option(OPT_NEW_TABLE_V2_MANIFEST_PATHS, "false")
.execute()
.await
.unwrap();
let native_table = table
.base_table()
.as_any()
.downcast_ref::<NativeTable>()
.unwrap();
assert!(!native_table.uses_v2_manifest_paths().await.unwrap());
}
#[tokio::test]
async fn test_manifest_enabled_vend_input_storage_options() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)]));
let table = connect(uri)
.manifest_enabled(true)
.storage_option("test_storage_option", "test_value")
.namespace_client_property("vend_input_storage_options", "true")
.namespace_client_property(
"vend_input_storage_options_refresh_interval_millis",
"60000",
)
.execute()
.await
.unwrap()
.create_empty_table("vended", schema)
.execute()
.await
.unwrap();
let storage_options = table.latest_storage_options().await.unwrap().unwrap();
assert_eq!(
storage_options.get("test_storage_option"),
Some(&"test_value".to_string())
);
assert!(storage_options.contains_key("expires_at_millis"));
}
#[tokio::test]
async fn test_table_names() {
let tc = new_test_connection().await.unwrap();

View File

@@ -285,7 +285,7 @@ const MIRRORED_STORE: &str = "mirroredStore";
/// A connection to LanceDB
impl ListingDatabase {
fn build_namespace_client_properties(
pub(crate) fn build_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
@@ -298,6 +298,24 @@ impl ListingDatabase {
properties
}
pub(crate) fn build_manifest_enabled_namespace_client_properties(
uri: &str,
storage_options: &HashMap<String, String>,
namespace_client_properties: HashMap<String, String>,
) -> HashMap<String, String> {
let mut properties = Self::build_namespace_client_properties(
uri,
storage_options,
namespace_client_properties,
);
properties.insert("manifest_enabled".to_string(), "true".to_string());
properties.insert(
"dir_listing_to_manifest_migration_enabled".to_string(),
"true".to_string(),
);
properties
}
async fn connect_namespace_database(
uri: &str,
storage_options: HashMap<String, String>,
@@ -323,6 +341,119 @@ impl ListingDatabase {
))
}
async fn prepare_namespace_root(
uri: &str,
storage_options: &HashMap<String, String>,
session: Arc<lance::session::Session>,
) -> Result<String> {
match url::Url::parse(uri) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&ObjectStoreParams::default(),
)
.await?;
if object_store.is_local() {
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(uri.to_string())
}
Ok(mut url) => {
if url.scheme().contains('+') {
return Err(Error::NotSupported {
message: "commit engine URI schemes are not supported for manifest-enabled namespace connections".to_string(),
});
}
for (key, value) in url.query_pairs() {
if key == ENGINE {
return Err(Error::NotSupported {
message: format!(
"commit engine '{}' is not supported for manifest-enabled namespace connections",
value
),
});
} else if key == MIRRORED_STORE {
return Err(Error::NotSupported {
message: "mirrored store is not supported for manifest-enabled namespace connections"
.to_string(),
});
}
}
url.set_query(None);
let plain_uri = url.to_string();
let os_params = ObjectStoreParams {
storage_options_accessor: if storage_options.is_empty() {
None
} else {
Some(Arc::new(StorageOptionsAccessor::with_static_options(
storage_options.clone(),
)))
},
..Default::default()
};
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
&plain_uri,
&os_params,
)
.await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu {
path: plain_uri.clone(),
})?;
}
Ok(plain_uri)
}
Err(_) => {
let (object_store, _) = ObjectStore::from_uri_and_params(
session.store_registry(),
uri,
&ObjectStoreParams::default(),
)
.await?;
if object_store.is_local() {
Self::try_create_dir(uri).context(CreateDirSnafu { path: uri })?;
}
Ok(uri.to_string())
}
}
}
pub(crate) async fn connect_manifest_enabled_namespace_database(
request: &ConnectRequest,
) -> Result<LanceNamespaceDatabase> {
let options = ListingDatabaseOptions::parse_from_map(&request.options)?;
let session = request
.session
.clone()
.unwrap_or_else(|| Arc::new(lance::session::Session::default()));
let namespace_root =
Self::prepare_namespace_root(&request.uri, &options.storage_options, session.clone())
.await?;
let ns_properties = Self::build_manifest_enabled_namespace_client_properties(
&namespace_root,
&options.storage_options,
request.namespace_client_properties.clone(),
);
LanceNamespaceDatabase::connect_with_new_table_config(
"dir",
ns_properties,
options.storage_options,
request.read_consistency_interval,
Some(session),
HashSet::new(),
options.new_table_config,
)
.await
.map(|db| db.with_uri(request.uri.clone()))
}
/// Connect to a listing database
///
/// The URI should be a path to a directory where the tables are stored.
@@ -584,7 +715,7 @@ impl ListingDatabase {
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)).await?;
for name in names {
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
let full_path = self.base_path.child(dir_name.clone());
let full_path = self.base_path.clone().join(dir_name.clone());
commit_handler.delete(&full_path).await?;
@@ -690,15 +821,12 @@ impl ListingDatabase {
store_params.storage_options_accessor = Some(Arc::new(accessor));
}
write_params.data_storage_version = self
.new_table_config
.data_storage_version
.or(storage_version_override);
write_params.data_storage_version = storage_version_override
.or(write_params.data_storage_version)
.or(self.new_table_config.data_storage_version);
if let Some(enable_v2_manifest_paths) = self
.new_table_config
.enable_v2_manifest_paths
.or(v2_manifest_override)
if let Some(enable_v2_manifest_paths) =
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
{
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
@@ -1158,6 +1286,7 @@ mod tests {
client_config: Default::default(),
options: Default::default(),
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1292,6 +1421,7 @@ mod tests {
client_config: Default::default(),
options: options.clone(),
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1827,6 +1957,7 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -1933,6 +2064,7 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2005,6 +2137,7 @@ mod tests {
client_config: Default::default(),
options,
namespace_client_properties: Default::default(),
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};
@@ -2202,6 +2335,7 @@ mod tests {
client_config: Default::default(),
options: Default::default(),
namespace_client_properties,
manifest_enabled: false,
read_consistency_interval: None,
session: None,
};

View File

@@ -8,9 +8,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use lance::io::commit::namespace_manifest::LanceNamespaceExternalManifestStore;
use lance_io::object_store::{
ObjectStore as LanceObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
};
use lance_io::object_store::{ObjectStoreParams, StorageOptionsAccessor};
use lance_namespace::{
LanceNamespace,
models::{
@@ -26,6 +24,10 @@ use lance_table::io::commit::external_manifest::ExternalManifestCommitHandler;
use crate::connection::NamespaceClientPushdownOperation;
use crate::database::ReadConsistency;
use crate::database::listing::{
NewTableConfig, OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS, OPT_NEW_TABLE_STORAGE_VERSION,
OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
use crate::error::{Error, Result};
use crate::table::NativeTable;
use lance::dataset::WriteMode;
@@ -52,6 +54,8 @@ pub struct LanceNamespaceDatabase {
ns_impl: String,
// Namespace properties used to construct the namespace client
ns_properties: HashMap<String, String>,
// Options for tables created by this connection
new_table_config: NewTableConfig,
}
impl LanceNamespaceDatabase {
@@ -73,9 +77,15 @@ impl LanceNamespaceDatabase {
pushdown_operations: namespace_client_pushdown_operations,
ns_impl: namespace_client_impl,
ns_properties: namespace_client_properties,
new_table_config: NewTableConfig::default(),
}
}
pub(crate) fn with_uri(mut self, uri: impl Into<String>) -> Self {
self.uri = uri.into();
self
}
pub async fn connect(
ns_impl: &str,
ns_properties: HashMap<String, String>,
@@ -83,6 +93,27 @@ impl LanceNamespaceDatabase {
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
) -> Result<Self> {
Self::connect_with_new_table_config(
ns_impl,
ns_properties,
storage_options,
read_consistency_interval,
session,
pushdown_operations,
NewTableConfig::default(),
)
.await
}
pub(crate) async fn connect_with_new_table_config(
ns_impl: &str,
ns_properties: HashMap<String, String>,
storage_options: HashMap<String, String>,
read_consistency_interval: Option<std::time::Duration>,
session: Option<Arc<lance::session::Session>>,
pushdown_operations: HashSet<NamespaceClientPushdownOperation>,
new_table_config: NewTableConfig,
) -> Result<Self> {
let mut builder = ConnectBuilder::new(ns_impl);
for (key, value) in ns_properties.clone() {
@@ -104,8 +135,79 @@ impl LanceNamespaceDatabase {
pushdown_operations,
ns_impl: ns_impl.to_string(),
ns_properties,
new_table_config,
})
}
fn extract_storage_overrides(
&self,
request: &DbCreateTableRequest,
) -> Result<(
Option<lance_encoding::version::LanceFileVersion>,
Option<bool>,
Option<bool>,
)> {
let storage_options = request
.write_options
.lance_write_params
.as_ref()
.and_then(|p| p.store_params.as_ref())
.and_then(|sp| sp.storage_options());
let storage_version_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_STORAGE_VERSION))
.map(|s| s.parse::<lance_encoding::version::LanceFileVersion>())
.transpose()?;
let v2_manifest_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_V2_MANIFEST_PATHS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_v2_manifest_paths must be a boolean".to_string(),
})?;
let stable_row_ids_override = storage_options
.and_then(|opts| opts.get(OPT_NEW_TABLE_ENABLE_STABLE_ROW_IDS))
.map(|s| s.parse::<bool>())
.transpose()
.map_err(|_| Error::InvalidInput {
message: "enable_stable_row_ids must be a boolean".to_string(),
})?;
Ok((
storage_version_override,
v2_manifest_override,
stable_row_ids_override,
))
}
fn apply_new_table_config(
&self,
params: &mut lance::dataset::WriteParams,
request: &DbCreateTableRequest,
) -> Result<()> {
let (storage_version_override, v2_manifest_override, stable_row_ids_override) =
self.extract_storage_overrides(request)?;
params.data_storage_version = storage_version_override
.or(params.data_storage_version)
.or(self.new_table_config.data_storage_version);
if let Some(enable_v2_manifest_paths) =
v2_manifest_override.or(self.new_table_config.enable_v2_manifest_paths)
{
params.enable_v2_manifest_paths = enable_v2_manifest_paths;
}
if let Some(enable_stable_row_ids) =
stable_row_ids_override.or(self.new_table_config.enable_stable_row_ids)
{
params.enable_stable_row_ids = enable_stable_row_ids;
}
Ok(())
}
}
impl std::fmt::Debug for LanceNamespaceDatabase {
@@ -301,7 +403,12 @@ impl Database for LanceNamespaceDatabase {
};
// Build write params with storage options and commit handler
let mut params = request.write_options.lance_write_params.unwrap_or_default();
let mut params = request
.write_options
.lance_write_params
.clone()
.unwrap_or_default();
self.apply_new_table_config(&mut params, &request)?;
if matches!(request.mode, CreateTableMode::Overwrite) {
params.mode = WriteMode::Overwrite;
@@ -343,20 +450,6 @@ impl Database for LanceNamespaceDatabase {
)
.await?;
// lance >= v6.0.0-beta.2: declare_table creates a `.lance-reserved` marker file to
// prevent directory scanning from seeing an incomplete table. After writing data we
// must remove it so the table appears in directory listings.
if let Ok(dataset) = native_table.dataset.get().await {
let registry = Arc::new(ObjectStoreRegistry::default());
if let Ok(table_path) = LanceObjectStore::extract_path_from_uri(registry, dataset.uri())
{
let _ = dataset
.object_store
.delete(&table_path.child(".lance-reserved"))
.await;
}
}
Ok(Arc::new(native_table))
}

View File

@@ -5,11 +5,12 @@
use std::{fmt::Formatter, sync::Arc};
use futures::{TryFutureExt, stream::BoxStream};
use futures::{StreamExt, TryFutureExt, stream::BoxStream};
use lance::io::WrappingObjectStore;
use object_store::{
Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
UploadPart, path::Path,
};
use async_trait::async_trait;
@@ -93,20 +94,6 @@ impl ObjectStore for MirroringObjectStore {
self.primary.get_opts(location, options).await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
self.primary.head(location).await
}
async fn delete(&self, location: &Path) -> Result<()> {
if !location.primary_only() {
match self.secondary.delete(location).await {
Err(Error::NotFound { .. }) | Ok(_) => {}
Err(e) => return Err(e),
}
}
self.primary.delete(location).await
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.primary.list(prefix)
}
@@ -115,21 +102,40 @@ impl ObjectStore for MirroringObjectStore {
self.primary.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
if to.primary_only() {
self.primary.copy(from, to).await
} else {
self.secondary.copy(from, to).await?;
self.primary.copy(from, to).await?;
Ok(())
}
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
let primary = self.primary.clone();
let secondary = self.secondary.clone();
locations
.map(move |location| {
let primary = primary.clone();
let secondary = secondary.clone();
async move {
let location = location?;
if !location.primary_only() {
match secondary.delete(&location).await {
Err(Error::NotFound { .. }) | Ok(_) => {}
Err(e) => return Err(e),
}
}
primary.delete(&location).await?;
Ok(location)
}
})
.buffered(10)
.boxed()
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
if !to.primary_only() {
self.secondary.copy(from, to).await?;
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
if to.primary_only() {
self.primary.copy_opts(from, to, options).await
} else {
self.secondary.copy_opts(from, to, options.clone()).await?;
self.primary.copy_opts(from, to, options).await?;
Ok(())
}
self.primary.copy_if_not_exists(from, to).await
}
}

View File

@@ -10,9 +10,9 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use lance::io::WrappingObjectStore;
use object_store::{
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result as OSResult, UploadPart,
path::Path,
CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result as OSResult,
UploadPart, path::Path,
};
#[derive(Debug, Default)]
@@ -81,11 +81,6 @@ impl IoTrackingStore {
#[async_trait::async_trait]
#[deny(clippy::missing_trait_methods)]
impl ObjectStore for IoTrackingStore {
async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult<PutResult> {
self.record_write(bytes.content_length() as u64);
self.target.put(location, bytes).await
}
async fn put_opts(
&self,
location: &Path,
@@ -96,14 +91,6 @@ impl ObjectStore for IoTrackingStore {
self.target.put_opts(location, bytes, opts).await
}
async fn put_multipart(&self, location: &Path) -> OSResult<Box<dyn MultipartUpload>> {
let target = self.target.put_multipart(location).await?;
Ok(Box::new(IoTrackingMultipartUpload {
target,
stats: self.stats.clone(),
}))
}
async fn put_multipart_opts(
&self,
location: &Path,
@@ -116,15 +103,6 @@ impl ObjectStore for IoTrackingStore {
}))
}
async fn get(&self, location: &Path) -> OSResult<GetResult> {
let result = self.target.get(location).await;
if let Ok(result) = &result {
let num_bytes = result.range.end - result.range.start;
self.record_read(num_bytes);
}
result
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
let result = self.target.get_opts(location, options).await;
if let Ok(result) = &result {
@@ -134,14 +112,6 @@ impl ObjectStore for IoTrackingStore {
result
}
async fn get_range(&self, location: &Path, range: std::ops::Range<u64>) -> OSResult<Bytes> {
let result = self.target.get_range(location, range).await;
if let Ok(result) = &result {
self.record_read(result.len() as u64);
}
result
}
async fn get_ranges(
&self,
location: &Path,
@@ -154,20 +124,11 @@ impl ObjectStore for IoTrackingStore {
result
}
async fn head(&self, location: &Path) -> OSResult<ObjectMeta> {
self.record_read(0);
self.target.head(location).await
}
async fn delete(&self, location: &Path) -> OSResult<()> {
fn delete_stream(
&self,
locations: BoxStream<'static, OSResult<Path>>,
) -> BoxStream<'static, OSResult<Path>> {
self.record_write(0);
self.target.delete(location).await
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, OSResult<Path>>,
) -> BoxStream<'a, OSResult<Path>> {
self.target.delete_stream(locations)
}
@@ -190,24 +151,14 @@ impl ObjectStore for IoTrackingStore {
self.target.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> {
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> OSResult<()> {
self.record_write(0);
self.target.copy(from, to).await
self.target.copy_opts(from, to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> {
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> OSResult<()> {
self.record_write(0);
self.target.rename(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
self.record_write(0);
self.target.rename_if_not_exists(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
self.record_write(0);
self.target.copy_if_not_exists(from, to).await
self.target.rename_opts(from, to, options).await
}
}

View File

@@ -43,7 +43,7 @@ pub struct RemoteInsertExec<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
properties: PlanProperties,
properties: Arc<PlanProperties>,
add_result: Arc<Mutex<Option<AddResult>>>,
metrics: ExecutionPlanMetricsSet,
upload_id: Option<String>,
@@ -118,7 +118,7 @@ impl<S: HttpSend + 'static> RemoteInsertExec<S> {
client,
input,
overwrite,
properties,
properties: Arc::new(properties),
add_result: Arc::new(Mutex::new(None)),
metrics: ExecutionPlanMetricsSet::new(),
upload_id,
@@ -232,7 +232,7 @@ impl<S: HttpSend + 'static> ExecutionPlan for RemoteInsertExec<S> {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -39,21 +39,26 @@ use lance_index::scalar::FullTextSearchQuery;
struct MetadataEraserExec {
input: Arc<dyn ExecutionPlan>,
schema: Arc<ArrowSchema>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
}
impl MetadataEraserExec {
fn compute_properties_from_input(
input: &Arc<dyn ExecutionPlan>,
schema: &Arc<ArrowSchema>,
) -> PlanProperties {
) -> Arc<PlanProperties> {
let input_properties = input.properties();
let eq_properties = input_properties
.eq_properties
.clone()
.with_new_schema(schema.clone())
.unwrap();
input_properties.clone().with_eq_properties(eq_properties)
Arc::new(
input_properties
.as_ref()
.clone()
.with_eq_properties(eq_properties),
)
}
fn new(input: Arc<dyn ExecutionPlan>) -> Self {
@@ -87,7 +92,7 @@ impl ExecutionPlan for MetadataEraserExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -81,7 +81,7 @@ pub struct InsertExec {
dataset: Arc<Dataset>,
input: Arc<dyn ExecutionPlan>,
write_params: WriteParams,
properties: PlanProperties,
properties: Arc<PlanProperties>,
partial_transactions: Arc<Mutex<Vec<Transaction>>>,
metrics: ExecutionPlanMetricsSet,
}
@@ -107,7 +107,7 @@ impl InsertExec {
dataset,
input,
write_params,
properties,
properties: Arc::new(properties),
partial_transactions: Arc::new(Mutex::new(Vec::with_capacity(num_partitions))),
metrics: ExecutionPlanMetricsSet::new(),
}
@@ -136,7 +136,7 @@ impl ExecutionPlan for InsertExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}

View File

@@ -20,7 +20,7 @@ pub(crate) struct ScannableExec {
// We don't require Scannable to be Sync, so we wrap it in a Mutex to allow safe concurrent access.
source: Mutex<Box<dyn Scannable>>,
num_rows: Option<usize>,
properties: PlanProperties,
properties: Arc<PlanProperties>,
tracker: Option<Arc<WriteProgressTracker>>,
}
@@ -49,7 +49,7 @@ impl ScannableExec {
Self {
source,
num_rows,
properties,
properties: Arc::new(properties),
tracker,
}
}
@@ -70,7 +70,7 @@ impl ExecutionPlan for ScannableExec {
self
}
fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}