Compare commits

...

36 Commits

Author SHA1 Message Date
Lance Release
a3b45a4d00 Bump version: 0.21.1-beta.0 → 0.21.1 2025-03-11 13:14:30 +00:00
Lance Release
c316c2f532 Bump version: 0.21.0 → 0.21.1-beta.0 2025-03-11 13:14:29 +00:00
Weston Pace
3966b16b63 fix: restore pylance as mandatory dependency (#2204)
We attempted to make pylance optional in
https://github.com/lancedb/lancedb/pull/2156 but it appears this did not
quite work. Users are unable to use lancedb from a fresh install. This
reverts the optional-ness so we can get back in a working state while we
fix the issue.
2025-03-11 06:13:52 -07:00
Lance Release
5661cc15ac Updating package-lock.json 2025-03-10 23:53:56 +00:00
Lance Release
4e7220400f Updating package-lock.json 2025-03-10 23:13:52 +00:00
Lance Release
ae4928fe77 Updating package-lock.json 2025-03-10 23:13:36 +00:00
Lance Release
e80a405dee Bump version: 0.18.0-beta.1 → 0.18.0 2025-03-10 23:13:18 +00:00
Lance Release
a53e19e386 Bump version: 0.18.0-beta.0 → 0.18.0-beta.1 2025-03-10 23:13:13 +00:00
Lance Release
c0097c5f0a Bump version: 0.21.0-beta.2 → 0.21.0 2025-03-10 23:12:56 +00:00
Lance Release
c199708e64 Bump version: 0.21.0-beta.1 → 0.21.0-beta.2 2025-03-10 23:12:56 +00:00
Weston Pace
4a47150ae7 feat: upgrade to lance 0.24.1 (#2199) 2025-03-10 15:18:37 -07:00
Wyatt Alt
f86b20a564 fix: delete tables from DDB on drop_all_tables (#2194)
Prior to this commit, issuing drop_all_tables on a listing database with
an external manifest store would delete physical tables but leave
references behind in the manifest store. The table drop would succeed,
but subsequent creation of a table with the same name would fail with a
conflict.

With this patch, the external manifest store is updated to account for
the dropped tables so that dropped table names can be reused.
2025-03-10 15:00:53 -07:00
msu-reevo
cc81f3e1a5 fix(python): typing (#2167)
@wjones127 is there a standard way you guys setup your virtualenv? I can
either relist all the dependencies in the pyright precommit section, or
specify a venv, or the user has to be in the virtual environment when
they run git commit. If the venv location was standardized or a python
manager like `uv` was used it would be easier to avoid duplicating the
pyright dependency list.

Per your suggestion, in `pyproject.toml` I added in all the passing
files to the `includes` section.

For ruff I upgraded the version and removed "TCH" which doesn't exist as
an option.

I added a `pyright_report.csv` which contains a list of all files sorted
by pyright errors ascending as a todo list to work on.

I fixed about 30 issues in `table.py` stemming from str's being passed
into methods that required a string within a set of string Literals by
extracting them into `types.py`

Can you verify in the rust bridge that the schema should be a property
and not a method here? If it's a method, then there's another place in
the code where `inner.schema` should be `inner.schema()`
``` python
class RecordBatchStream:
    @property
    def schema(self) -> pa.Schema: ...
```

Also unless the `_lancedb.pyi` file is wrong, then there is no
`__anext__` here for `__inner` when it's not an `AsyncGenerator` and
only `next` is defined:
``` python
    async def __anext__(self) -> pa.RecordBatch:
        return await self._inner.__anext__()
        if isinstance(self._inner, AsyncGenerator):
            batch = await self._inner.__anext__()
        else:
            batch = await self._inner.next()
        if batch is None:
            raise StopAsyncIteration
        return batch
```
in the else statement, `_inner` is a `RecordBatchStream`
```python
class RecordBatchStream:
    @property
    def schema(self) -> pa.Schema: ...
    async def next(self) -> Optional[pa.RecordBatch]: ...
```

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
2025-03-10 09:01:23 -07:00
Weston Pace
bc49c4db82 feat: respect datafusion's batch size when running as a table provider (#2187)
Datafusion makes the batch size available as part of the `SessionState`.
We should use that to set the `max_batch_length` property in the
`QueryExecutionOptions`.
2025-03-07 05:53:36 -08:00
Weston Pace
d2eec46f17 feat: add support for streaming input to create_table (#2175)
This PR makes it possible to create a table using an asynchronous stream
of input data. Currently only a synchronous iterator is supported. There
are a number of follow-ups not yet tackled:

* Support for embedding functions (the embedding functions wrapper needs
to be re-written to be async, should be an easy lift)
* Support for async input into the remote table (the make_ipc_batch
needs to change to accept async input, leaving undone for now because I
think we want to support actual streaming uploads into the remote table
soon)
* Support for async input into the add function (pretty essential, but
it is a fairly distinct code path, so saving for a different PR)
2025-03-06 11:55:00 -08:00
Lance Release
51437bc228 Bump version: 0.21.0-beta.0 → 0.21.0-beta.1 2025-03-06 19:23:06 +00:00
Bert
fa53cfcfd2 feat: support modifying field metadata in lancedb python (#2178) 2025-03-04 16:58:46 -05:00
vinoyang
374fe0ad95 feat(rust): introduce Catalog trait and implement ListingCatalog (#2148)
Co-authored-by: Weston Pace <weston.pace@gmail.com>
2025-03-03 20:22:24 -08:00
BubbleCal
35e5b84ba9 chore: upgrade lance to 0.24.0-beta.1 (#2171)
Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-03-03 12:32:12 +08:00
Lei Xu
7c12d497b0 ci: bump python to 3.12 in GHA (#2169) 2025-03-01 17:24:02 -08:00
ayao227
dfe4ba8dad chore: add reo integration (#2149)
This PR adds reo integration to the lancedb documentation website.
2025-02-28 07:51:34 -08:00
Weston Pace
fa1b9ad5bd fix: don't use with_schema to remove schema metadata (#2162)
It seems that `RecordBatch::with_schema` is unable to remove schema
metadata from a batch. It fails with the error `target schema is not
superset of current schema`.

I'm not sure how the `test_metadata_erased` test is passing. Strangely,
the metadata was not present by the time the batch arrived at the
metadata eraser. I think maybe the schema metadata is only present in
the batch if there is a filter.

I've created a new unit test that makes sure the metadata is erased if
we have a filter also
2025-02-27 10:24:00 -08:00
BubbleCal
8877eb020d feat: record the server version for remote table (#2147)
Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-02-27 15:55:59 +08:00
Will Jones
01e4291d21 feat(python): drop hard dependency on pylance (#2156)
Closes #1793
2025-02-26 15:53:45 -08:00
Lance Release
ab3ea76ad1 Updating package-lock.json 2025-02-26 21:23:39 +00:00
Lance Release
728ef8657d Updating package-lock.json 2025-02-26 20:11:37 +00:00
Lance Release
0b13901a16 Updating package-lock.json 2025-02-26 20:11:22 +00:00
Lance Release
84b110e0ef Bump version: 0.17.0 → 0.18.0-beta.0 2025-02-26 20:11:07 +00:00
Lance Release
e1836e54e3 Bump version: 0.20.0 → 0.21.0-beta.0 2025-02-26 20:10:54 +00:00
Weston Pace
4ba5326880 feat: reapply upgrade lance to v0.23.3-beta.1 (#2157)
This reverts commit 2f0c5baea2.

---------

Co-authored-by: Lu Qiu <luqiujob@gmail.com>
2025-02-26 11:44:11 -08:00
Lance Release
b036a69300 Updating package-lock.json 2025-02-26 19:32:22 +00:00
Will Jones
5b12a47119 feat!: revert query limit to be unbounded for scans (#2151)
In earlier PRs (#1886, #1191) we made the default limit 10 regardless of
the query type. This was confusing for users and in many cases a
breaking change. Users would have queries that used to return all
results, but instead only returned the first 10, causing silent bugs.

Part of the cause was consistency: the Python sync API seems to have
always had a limit of 10, while newer APIs (Python async and Nodejs)
didn't.

This PR sets the default limit only for searches (vector search, FTS),
while letting scans (even with filters) be unbounded. It does this
consistently for all SDKs.

Fixes #1983
Fixes #1852
Fixes #2141
2025-02-26 10:32:14 -08:00
Lance Release
769d483e50 Updating package-lock.json 2025-02-26 18:16:59 +00:00
Lance Release
9ecb11fe5a Updating package-lock.json 2025-02-26 18:16:42 +00:00
Lance Release
22bd8329f3 Bump version: 0.17.0-beta.0 → 0.17.0 2025-02-26 18:16:07 +00:00
Lance Release
a736fad149 Bump version: 0.16.1-beta.3 → 0.17.0-beta.0 2025-02-26 18:16:01 +00:00
65 changed files with 2254 additions and 643 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.16.1-beta.3"
current_version = "0.18.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -33,13 +33,14 @@ jobs:
python-version: "3.12"
- name: Install ruff
run: |
pip install ruff==0.8.4
pip install ruff==0.9.9
- name: Format check
run: ruff format --check .
- name: Lint
run: ruff check .
doctest:
name: "Doctest"
type-check:
name: "Type Check"
timeout-minutes: 30
runs-on: "ubuntu-22.04"
defaults:
@@ -54,7 +55,36 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
- name: Install protobuf compiler
run: |
sudo apt update
sudo apt install -y protobuf-compiler
pip install toml
- name: Install dependencies
run: |
python ../ci/parse_requirements.py pyproject.toml --extras dev,tests,embeddings > requirements.txt
pip install -r requirements.txt
- name: Run pyright
run: pyright
doctest:
name: "Doctest"
timeout-minutes: 30
runs-on: "ubuntu-24.04"
defaults:
run:
shell: bash
working-directory: python
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
- name: Install protobuf
run: |
@@ -75,8 +105,8 @@ jobs:
timeout-minutes: 30
strategy:
matrix:
python-minor-version: ["9", "11"]
runs-on: "ubuntu-22.04"
python-minor-version: ["9", "12"]
runs-on: "ubuntu-24.04"
defaults:
run:
shell: bash
@@ -127,7 +157,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
- uses: Swatinem/rust-cache@v2
with:
workspaces: python
@@ -157,7 +187,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
- uses: Swatinem/rust-cache@v2
with:
workspaces: python
@@ -168,7 +198,7 @@ jobs:
run: rm -rf target/wheels
pydantic1x:
timeout-minutes: 30
runs-on: "ubuntu-22.04"
runs-on: "ubuntu-24.04"
defaults:
run:
shell: bash

View File

@@ -184,15 +184,17 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install dependencies
- name: Install dependencies (part 1)
run: |
set -e
apk add protobuf-dev curl clang lld llvm19 grep npm bash msitools sed
curl --proto '=https' --tlsv1.3 -sSf https://raw.githubusercontent.com/rust-lang/rustup/refs/heads/master/rustup-init.sh | sh -s -- -y
source $HOME/.cargo/env
rustup target add aarch64-pc-windows-msvc
- name: Install rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
target: aarch64-pc-windows-msvc
- name: Install dependencies (part 2)
run: |
set -e
mkdir -p sysroot
cd sysroot
sh ../ci/sysroot-aarch64-pc-windows-msvc.sh
@@ -264,7 +266,7 @@ jobs:
- name: Install Rust
run: |
Invoke-WebRequest https://win.rustup.rs/x86_64 -OutFile rustup-init.exe
.\rustup-init.exe -y --default-host aarch64-pc-windows-msvc
.\rustup-init.exe -y --default-host aarch64-pc-windows-msvc --default-toolchain 1.83.0
shell: powershell
- name: Add Rust to PATH
run: |

View File

@@ -1,16 +1,22 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.8.4
rev: v0.9.9
hooks:
- id: ruff
- repo: local
# - repo: https://github.com/RobertCraigie/pyright-python
# rev: v1.1.395
# hooks:
# - id: pyright
# args: ["--project", "python"]
# additional_dependencies: [pyarrow-stubs]
- repo: local
hooks:
- id: local-biome-check
name: biome check

502
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -21,30 +21,30 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.23.2", "features" = ["dynamodb"] }
lance-io = { version = "=0.23.2" }
lance-index = { version = "=0.23.2" }
lance-linalg = { version = "=0.23.2" }
lance-table = { version = "=0.23.2" }
lance-testing = { version = "=0.23.2" }
lance-datafusion = { version = "=0.23.2" }
lance-encoding = { version = "=0.23.2" }
lance = { "version" = "=0.24.1", "features" = ["dynamodb"] }
lance-io = { version = "=0.24.1" }
lance-index = { version = "=0.24.1" }
lance-linalg = { version = "=0.24.1" }
lance-table = { version = "=0.24.1" }
lance-testing = { version = "=0.24.1" }
lance-datafusion = { version = "=0.24.1" }
lance-encoding = { version = "=0.24.1" }
# Note that this one does not include pyarrow
arrow = { version = "53.2", optional = false }
arrow-array = "53.2"
arrow-data = "53.2"
arrow-ipc = "53.2"
arrow-ord = "53.2"
arrow-schema = "53.2"
arrow-arith = "53.2"
arrow-cast = "53.2"
arrow = { version = "54.1", optional = false }
arrow-array = "54.1"
arrow-data = "54.1"
arrow-ipc = "54.1"
arrow-ord = "54.1"
arrow-schema = "54.1"
arrow-arith = "54.1"
arrow-cast = "54.1"
async-trait = "0"
datafusion = { version = "44.0", default-features = false }
datafusion-catalog = "44.0"
datafusion-common = { version = "44.0", default-features = false }
datafusion-execution = "44.0"
datafusion-expr = "44.0"
datafusion-physical-plan = "44.0"
datafusion = { version = "45.0", default-features = false }
datafusion-catalog = "45.0"
datafusion-common = { version = "45.0", default-features = false }
datafusion-execution = "45.0"
datafusion-expr = "45.0"
datafusion-physical-plan = "45.0"
env_logger = "0.11"
half = { "version" = "=2.4.1", default-features = false, features = [
"num-traits",
@@ -60,6 +60,7 @@ num-traits = "0.2"
rand = "0.8"
regex = "1.10"
lazy_static = "1"
semver = "1.0.25"
# Temporary pins to work around downstream issues
# https://github.com/apache/arrow-rs/commit/2fddf85afcd20110ce783ed5b4cdeb82293da30b

41
ci/parse_requirements.py Normal file
View File

@@ -0,0 +1,41 @@
import argparse
import toml
def parse_dependencies(pyproject_path, extras=None):
with open(pyproject_path, "r") as file:
pyproject = toml.load(file)
dependencies = pyproject.get("project", {}).get("dependencies", [])
for dependency in dependencies:
print(dependency)
optional_dependencies = pyproject.get("project", {}).get(
"optional-dependencies", {}
)
if extras:
for extra in extras.split(","):
for dep in optional_dependencies.get(extra, []):
print(dep)
def main():
parser = argparse.ArgumentParser(
description="Generate requirements.txt from pyproject.toml"
)
parser.add_argument("path", type=str, help="Path to pyproject.toml")
parser.add_argument(
"--extras",
type=str,
help="Comma-separated list of extras to include",
default="",
)
args = parser.parse_args()
parse_dependencies(args.path, args.extras)
if __name__ == "__main__":
main()

View File

@@ -377,6 +377,7 @@ extra_css:
extra_javascript:
- "extra_js/init_ask_ai_widget.js"
- "extra_js/reo.js"
extra:
analytics:

1
docs/src/extra_js/reo.js Normal file
View File

@@ -0,0 +1 @@
!function(){var e,t,n;e="9627b71b382d201",t=function(){Reo.init({clientID:"9627b71b382d201"})},(n=document.createElement("script")).src="https://static.reo.dev/"+e+"/reo.js",n.defer=!0,n.onload=t,document.head.appendChild(n)}();

View File

@@ -8,7 +8,7 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.16.1-beta.3</version>
<version>0.18.0-final.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.16.1-beta.3</version>
<version>0.18.0-final.0</version>
<packaging>pom</packaging>
<name>LanceDB Parent</name>

68
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"cpu": [
"x64",
"arm64"
@@ -52,14 +52,14 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.16.1-beta.3",
"@lancedb/vectordb-darwin-x64": "0.16.1-beta.3",
"@lancedb/vectordb-linux-arm64-gnu": "0.16.1-beta.3",
"@lancedb/vectordb-linux-arm64-musl": "0.16.1-beta.3",
"@lancedb/vectordb-linux-x64-gnu": "0.16.1-beta.3",
"@lancedb/vectordb-linux-x64-musl": "0.16.1-beta.3",
"@lancedb/vectordb-win32-arm64-msvc": "0.16.1-beta.3",
"@lancedb/vectordb-win32-x64-msvc": "0.16.1-beta.3"
"@lancedb/vectordb-darwin-arm64": "0.18.0",
"@lancedb/vectordb-darwin-x64": "0.18.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.0",
"@lancedb/vectordb-linux-arm64-musl": "0.18.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.0",
"@lancedb/vectordb-linux-x64-musl": "0.18.0",
"@lancedb/vectordb-win32-arm64-msvc": "0.18.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.0"
},
"peerDependencies": {
"@apache-arrow/ts": "^14.0.2",
@@ -330,9 +330,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.16.1-beta.3.tgz",
"integrity": "sha512-k2dfDNvoFjZuF8RCkFX9yFkLIg292mFg+o6IUeXndlikhABi8F+NbRODGUxJf3QUioks2tGF831KFoV5oQyeEA==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.18.0.tgz",
"integrity": "sha512-ormNCmny1j64aSZRrZeUQ1Zs8cOFKrW14NgTmW3AehDuru+Ep+8AriHA5Pmyi6raBOZfNzDSiZs/LTzzyVaa7g==",
"cpu": [
"arm64"
],
@@ -343,9 +343,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.16.1-beta.3.tgz",
"integrity": "sha512-pYvwcAXBB3MXxa2kvK8PxMoEsaE+EFld5pky6dDo6qJQVepUz9pi/e1FTLxW6m0mgwtRj52P6xe55sj1Yln9Qw==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.18.0.tgz",
"integrity": "sha512-S4skQ1RXXQJciq40s84Kyy7v3YC+nao8pX4xUyxDcKRx+90Qg9eH+tehs6XLN1IjrQT/9CWKaE5wxZmv6Oys4g==",
"cpu": [
"x64"
],
@@ -356,9 +356,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.16.1-beta.3.tgz",
"integrity": "sha512-BS4rnBtKGJlEdbYgOe85mGhviQaSfEXl8qw0fh0ml8E0qbi5RuLtwfTFMe3yAKSOnNAvaJISqXQyUN7hzkYkUQ==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.18.0.tgz",
"integrity": "sha512-1txr4tasVdxy321/4Fw8GJPjzrf84F02L9ffN8JebHmmR0S8uk2MKf2WsyLaSVRPd4YHIvvf3qmG0RGaUsb2sw==",
"cpu": [
"arm64"
],
@@ -369,9 +369,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-musl": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-musl/-/vectordb-linux-arm64-musl-0.16.1-beta.3.tgz",
"integrity": "sha512-/F1mzpgSipfXjeaXJx5c0zLPOipPKnSPIpYviSdLU2Ahm1aHLweW1UsoiUoRkBkvEcVrZfHxL64vasey2I0P7Q==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-musl/-/vectordb-linux-arm64-musl-0.18.0.tgz",
"integrity": "sha512-8xS1xaoJeFDx6WmDBcfueWvIbdNX/ptQXfoC7hYICwNHizjlyt4O3Nxz8uG9URMF1y9saUYUditIHLzLVZc76g==",
"cpu": [
"arm64"
],
@@ -382,9 +382,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.16.1-beta.3.tgz",
"integrity": "sha512-zGn2Oby8GAQYG7+dqFVi2DDzli2/GAAY7lwPoYbPlyVytcdTlXRsxea1XiT1jzZmyKIlrxA/XXSRsmRq4n1j1w==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.18.0.tgz",
"integrity": "sha512-8XUc2UnEV3awv0DGJS5gRA7yTkicX6oPN7GudXXxycCKL33FJ2ah7hkeDia9Bhk9MmvTonvsEDvUSqnglcpqfA==",
"cpu": [
"x64"
],
@@ -395,9 +395,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-musl": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-musl/-/vectordb-linux-x64-musl-0.16.1-beta.3.tgz",
"integrity": "sha512-MXYvI7dL+0QtWGDuliUUaEp/XQN+hSndtDc8wlAMyI0lOzmTvC7/C3OZQcMKf6JISZuNS71OVzVTYDYSab9aXw==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-musl/-/vectordb-linux-x64-musl-0.18.0.tgz",
"integrity": "sha512-LV7TuWgLcL82Wdq+EH2Xs3+apqeLohwYLlVIauVAwKEHvdwyNxTOW9TaNLvHXcbylIh7agl2xXvASCNhYZAyzA==",
"cpu": [
"x64"
],
@@ -408,9 +408,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-arm64-msvc": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-arm64-msvc/-/vectordb-win32-arm64-msvc-0.16.1-beta.3.tgz",
"integrity": "sha512-1dbUSg+Mi+0W8JAUXqNWC+uCr0RUqVHhxFVGLSlprqZ8qFJYQ61jFSZr4onOYj9Ta1n6tUb3Nc4acxf3vXXPmw==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-arm64-msvc/-/vectordb-win32-arm64-msvc-0.18.0.tgz",
"integrity": "sha512-kxdCnKfvnuDKoKZRUBbreMBpimHb+k9/pFR48GN6JSrIuzUDx5G1VjHKBmaFhbveZCOBjjtYlg/8ohnWQHZfeA==",
"cpu": [
"arm64"
],
@@ -421,9 +421,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.16.1-beta.3",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.16.1-beta.3.tgz",
"integrity": "sha512-K9oT47zKnFoCEB/JjVKG+w+L0GOMDsPPln+B2TvefAXAWrvweCN2H4LUdsBYCTnntzy80OJCwwH3OwX07M1Y3g==",
"version": "0.18.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.18.0.tgz",
"integrity": "sha512-uAE80q50cAp4gHoGvclxJqZGqj3/9oN9kz8iXgNIxiPngqnN01kVyaj4ulm4Qk/nauWUhHJ3tjTh/+CpkhSc2Q==",
"cpu": [
"x64"
],

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"description": " Serverless, low-latency vector database for AI applications",
"private": false,
"main": "dist/index.js",
@@ -92,13 +92,13 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.16.1-beta.3",
"@lancedb/vectordb-darwin-arm64": "0.16.1-beta.3",
"@lancedb/vectordb-linux-x64-gnu": "0.16.1-beta.3",
"@lancedb/vectordb-linux-arm64-gnu": "0.16.1-beta.3",
"@lancedb/vectordb-linux-x64-musl": "0.16.1-beta.3",
"@lancedb/vectordb-linux-arm64-musl": "0.16.1-beta.3",
"@lancedb/vectordb-win32-x64-msvc": "0.16.1-beta.3",
"@lancedb/vectordb-win32-arm64-msvc": "0.16.1-beta.3"
"@lancedb/vectordb-darwin-x64": "0.18.0",
"@lancedb/vectordb-darwin-arm64": "0.18.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.0",
"@lancedb/vectordb-linux-x64-musl": "0.18.0",
"@lancedb/vectordb-linux-arm64-musl": "0.18.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.0",
"@lancedb/vectordb-win32-arm64-msvc": "0.18.0"
}
}

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.16.1-beta.3"
version = "0.18.0"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -175,6 +175,8 @@ maybeDescribe("storage_options", () => {
tableNames = await db.tableNames();
expect(tableNames).toEqual([]);
await db.dropAllTables();
});
it("can configure encryption at connection and table level", async () => {
@@ -210,6 +212,8 @@ maybeDescribe("storage_options", () => {
await table.add([{ a: 2, b: 3 }]);
await bucket.assertAllEncrypted("test/table2.lance", kmsKey.keyId);
await db.dropAllTables();
});
});
@@ -298,5 +302,32 @@ maybeDescribe("DynamoDB Lock", () => {
const rowCount = await table.countRows();
expect(rowCount).toBe(6);
await db.dropAllTables();
});
it("clears dynamodb state after dropping all tables", async () => {
const uri = `s3+ddb://${bucket.name}/test?ddbTableName=${commitTable.name}`;
const db = await connect(uri, {
storageOptions: CONFIG,
readConsistencyInterval: 0,
});
await db.createTable("foo", [{ a: 1, b: 2 }]);
await db.createTable("bar", [{ a: 1, b: 2 }]);
let tableNames = await db.tableNames();
expect(tableNames).toEqual(["bar", "foo"]);
await db.dropAllTables();
tableNames = await db.tableNames();
expect(tableNames).toEqual([]);
// We can create a new table with the same name as the one we dropped.
await db.createTable("foo", [{ a: 1, b: 2 }]);
tableNames = await db.tableNames();
expect(tableNames).toEqual(["foo"]);
await db.dropAllTables();
});
});

View File

@@ -666,11 +666,11 @@ describe("When creating an index", () => {
expect(fs.readdirSync(indexDir)).toHaveLength(1);
for await (const r of tbl.query().where("id > 1").select(["id"])) {
expect(r.numRows).toBe(10);
expect(r.numRows).toBe(298);
}
// should also work with 'filter' alias
for await (const r of tbl.query().filter("id > 1").select(["id"])) {
expect(r.numRows).toBe(10);
expect(r.numRows).toBe(298);
}
});

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.16.1-beta.3",
"version": "0.18.0",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.16.1-beta.3",
"version": "0.18.0",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

56
pyright_report.csv Normal file
View File

@@ -0,0 +1,56 @@
file,errors,warnings,total_issues
python/python/lancedb/arrow.py,0,0,0
python/python/lancedb/background_loop.py,0,0,0
python/python/lancedb/embeddings/__init__.py,0,0,0
python/python/lancedb/exceptions.py,0,0,0
python/python/lancedb/index.py,0,0,0
python/python/lancedb/integrations/__init__.py,0,0,0
python/python/lancedb/remote/__init__.py,0,0,0
python/python/lancedb/remote/errors.py,0,0,0
python/python/lancedb/rerankers/__init__.py,0,0,0
python/python/lancedb/rerankers/answerdotai.py,0,0,0
python/python/lancedb/rerankers/cohere.py,0,0,0
python/python/lancedb/rerankers/colbert.py,0,0,0
python/python/lancedb/rerankers/cross_encoder.py,0,0,0
python/python/lancedb/rerankers/openai.py,0,0,0
python/python/lancedb/rerankers/util.py,0,0,0
python/python/lancedb/rerankers/voyageai.py,0,0,0
python/python/lancedb/schema.py,0,0,0
python/python/lancedb/types.py,0,0,0
python/python/lancedb/__init__.py,0,1,1
python/python/lancedb/conftest.py,1,0,1
python/python/lancedb/embeddings/bedrock.py,1,0,1
python/python/lancedb/merge.py,1,0,1
python/python/lancedb/rerankers/base.py,1,0,1
python/python/lancedb/rerankers/jinaai.py,0,1,1
python/python/lancedb/rerankers/linear_combination.py,1,0,1
python/python/lancedb/embeddings/instructor.py,2,0,2
python/python/lancedb/embeddings/openai.py,2,0,2
python/python/lancedb/embeddings/watsonx.py,2,0,2
python/python/lancedb/embeddings/registry.py,3,0,3
python/python/lancedb/embeddings/sentence_transformers.py,3,0,3
python/python/lancedb/integrations/pyarrow.py,3,0,3
python/python/lancedb/rerankers/rrf.py,3,0,3
python/python/lancedb/dependencies.py,4,0,4
python/python/lancedb/embeddings/gemini_text.py,4,0,4
python/python/lancedb/embeddings/gte.py,4,0,4
python/python/lancedb/embeddings/gte_mlx_model.py,4,0,4
python/python/lancedb/embeddings/ollama.py,4,0,4
python/python/lancedb/embeddings/transformers.py,4,0,4
python/python/lancedb/remote/db.py,5,0,5
python/python/lancedb/context.py,6,0,6
python/python/lancedb/embeddings/cohere.py,6,0,6
python/python/lancedb/fts.py,6,0,6
python/python/lancedb/db.py,9,0,9
python/python/lancedb/embeddings/utils.py,9,0,9
python/python/lancedb/common.py,11,0,11
python/python/lancedb/util.py,13,0,13
python/python/lancedb/embeddings/imagebind.py,14,0,14
python/python/lancedb/embeddings/voyageai.py,15,0,15
python/python/lancedb/embeddings/open_clip.py,16,0,16
python/python/lancedb/pydantic.py,16,0,16
python/python/lancedb/embeddings/base.py,17,0,17
python/python/lancedb/embeddings/jinaai.py,18,1,19
python/python/lancedb/remote/table.py,23,0,23
python/python/lancedb/query.py,47,1,48
python/python/lancedb/table.py,61,0,61
1 file errors warnings total_issues
2 python/python/lancedb/arrow.py 0 0 0
3 python/python/lancedb/background_loop.py 0 0 0
4 python/python/lancedb/embeddings/__init__.py 0 0 0
5 python/python/lancedb/exceptions.py 0 0 0
6 python/python/lancedb/index.py 0 0 0
7 python/python/lancedb/integrations/__init__.py 0 0 0
8 python/python/lancedb/remote/__init__.py 0 0 0
9 python/python/lancedb/remote/errors.py 0 0 0
10 python/python/lancedb/rerankers/__init__.py 0 0 0
11 python/python/lancedb/rerankers/answerdotai.py 0 0 0
12 python/python/lancedb/rerankers/cohere.py 0 0 0
13 python/python/lancedb/rerankers/colbert.py 0 0 0
14 python/python/lancedb/rerankers/cross_encoder.py 0 0 0
15 python/python/lancedb/rerankers/openai.py 0 0 0
16 python/python/lancedb/rerankers/util.py 0 0 0
17 python/python/lancedb/rerankers/voyageai.py 0 0 0
18 python/python/lancedb/schema.py 0 0 0
19 python/python/lancedb/types.py 0 0 0
20 python/python/lancedb/__init__.py 0 1 1
21 python/python/lancedb/conftest.py 1 0 1
22 python/python/lancedb/embeddings/bedrock.py 1 0 1
23 python/python/lancedb/merge.py 1 0 1
24 python/python/lancedb/rerankers/base.py 1 0 1
25 python/python/lancedb/rerankers/jinaai.py 0 1 1
26 python/python/lancedb/rerankers/linear_combination.py 1 0 1
27 python/python/lancedb/embeddings/instructor.py 2 0 2
28 python/python/lancedb/embeddings/openai.py 2 0 2
29 python/python/lancedb/embeddings/watsonx.py 2 0 2
30 python/python/lancedb/embeddings/registry.py 3 0 3
31 python/python/lancedb/embeddings/sentence_transformers.py 3 0 3
32 python/python/lancedb/integrations/pyarrow.py 3 0 3
33 python/python/lancedb/rerankers/rrf.py 3 0 3
34 python/python/lancedb/dependencies.py 4 0 4
35 python/python/lancedb/embeddings/gemini_text.py 4 0 4
36 python/python/lancedb/embeddings/gte.py 4 0 4
37 python/python/lancedb/embeddings/gte_mlx_model.py 4 0 4
38 python/python/lancedb/embeddings/ollama.py 4 0 4
39 python/python/lancedb/embeddings/transformers.py 4 0 4
40 python/python/lancedb/remote/db.py 5 0 5
41 python/python/lancedb/context.py 6 0 6
42 python/python/lancedb/embeddings/cohere.py 6 0 6
43 python/python/lancedb/fts.py 6 0 6
44 python/python/lancedb/db.py 9 0 9
45 python/python/lancedb/embeddings/utils.py 9 0 9
46 python/python/lancedb/common.py 11 0 11
47 python/python/lancedb/util.py 13 0 13
48 python/python/lancedb/embeddings/imagebind.py 14 0 14
49 python/python/lancedb/embeddings/voyageai.py 15 0 15
50 python/python/lancedb/embeddings/open_clip.py 16 0 16
51 python/python/lancedb/pydantic.py 16 0 16
52 python/python/lancedb/embeddings/base.py 17 0 17
53 python/python/lancedb/embeddings/jinaai.py 18 1 19
54 python/python/lancedb/remote/table.py 23 0 23
55 python/python/lancedb/query.py 47 1 48
56 python/python/lancedb/table.py 61 0 61

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.20.0"
current_version = "0.21.1"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -8,9 +8,9 @@ For general contribution guidelines, see [CONTRIBUTING.md](../CONTRIBUTING.md).
The Python package is a wrapper around the Rust library, `lancedb`. We use
[pyo3](https://pyo3.rs/) to create the bindings between Rust and Python.
* `src/`: Rust bindings source code
* `python/lancedb`: Python package source code
* `python/tests`: Unit tests
- `src/`: Rust bindings source code
- `python/lancedb`: Python package source code
- `python/tests`: Unit tests
## Development environment
@@ -61,6 +61,12 @@ make test
make doctest
```
Run type checking:
```shell
make typecheck
```
To run a single test, you can use the `pytest` command directly. Provide the path
to the test file, and optionally the test name after `::`.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.20.0"
version = "0.21.1"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true
@@ -14,21 +14,20 @@ name = "_lancedb"
crate-type = ["cdylib"]
[dependencies]
arrow = { version = "53.2", features = ["pyarrow"] }
arrow = { version = "54.1", features = ["pyarrow"] }
lancedb = { path = "../rust/lancedb", default-features = false }
env_logger.workspace = true
pyo3 = { version = "0.22.2", features = [
"extension-module",
"abi3-py39",
"gil-refs"
pyo3 = { version = "0.23", features = ["extension-module", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.23", features = [
"attributes",
"tokio-runtime",
] }
pyo3-async-runtimes = { version = "0.22", features = ["attributes", "tokio-runtime"] }
pin-project = "1.1.5"
futures.workspace = true
tokio = { version = "1.40", features = ["sync"] }
[build-dependencies]
pyo3-build-config = { version = "0.20.3", features = [
pyo3-build-config = { version = "0.23", features = [
"extension-module",
"abi3-py39",
] }

View File

@@ -23,6 +23,10 @@ check: ## Check formatting and lints.
fix: ## Fix python lints
ruff check python --fix
.PHONY: typecheck
typecheck: ## Run type checking with pyright.
pyright
.PHONY: doctest
doctest: ## Run documentation tests.
pytest --doctest-modules python/lancedb
@@ -30,3 +34,7 @@ doctest: ## Run documentation tests.
.PHONY: test
test: ## Run tests.
pytest python/tests -vv --durations=10 -m "not slow and not s3_test"
.PHONY: clean
clean:
rm -rf data

View File

@@ -4,11 +4,12 @@ name = "lancedb"
dynamic = ["version"]
dependencies = [
"deprecation",
"pylance~=0.23.2",
"tqdm>=4.27.0",
"pyarrow>=14",
"pydantic>=1.10",
"packaging",
"overrides>=0.7",
"pylance>=0.23.2",
]
description = "lancedb"
authors = [{ name = "LanceDB Devs", email = "dev@lancedb.com" }]
@@ -91,7 +92,7 @@ requires = ["maturin>=1.4"]
build-backend = "maturin"
[tool.ruff.lint]
select = ["F", "E", "W", "G", "TCH", "PERF"]
select = ["F", "E", "W", "G", "PERF"]
[tool.pytest.ini_options]
addopts = "--strict-markers --ignore-glob=lancedb/embeddings/*.py"
@@ -102,5 +103,28 @@ markers = [
]
[tool.pyright]
include = ["python/lancedb/table.py"]
include = [
"python/lancedb/index.py",
"python/lancedb/rerankers/util.py",
"python/lancedb/rerankers/__init__.py",
"python/lancedb/rerankers/voyageai.py",
"python/lancedb/rerankers/jinaai.py",
"python/lancedb/rerankers/openai.py",
"python/lancedb/rerankers/cross_encoder.py",
"python/lancedb/rerankers/colbert.py",
"python/lancedb/rerankers/answerdotai.py",
"python/lancedb/rerankers/cohere.py",
"python/lancedb/arrow.py",
"python/lancedb/__init__.py",
"python/lancedb/types.py",
"python/lancedb/integrations/__init__.py",
"python/lancedb/exceptions.py",
"python/lancedb/background_loop.py",
"python/lancedb/schema.py",
"python/lancedb/remote/__init__.py",
"python/lancedb/remote/errors.py",
"python/lancedb/embeddings/__init__.py",
"python/lancedb/_lancedb.pyi",
]
exclude = ["python/tests/"]
pythonVersion = "3.12"

View File

@@ -14,6 +14,7 @@ from ._lancedb import connect as lancedb_connect
from .common import URI, sanitize_uri
from .db import AsyncConnection, DBConnection, LanceDBConnection
from .remote import ClientConfig
from .remote.db import RemoteDBConnection
from .schema import vector
from .table import AsyncTable
@@ -86,8 +87,6 @@ def connect(
conn : DBConnection
A connection to a LanceDB database.
"""
from .remote.db import RemoteDBConnection
if isinstance(uri, str) and uri.startswith("db://"):
if api_key is None:
api_key = os.environ.get("LANCEDB_API_KEY")

View File

@@ -3,6 +3,7 @@ from typing import Dict, List, Optional, Tuple, Any, Union, Literal
import pyarrow as pa
from .index import BTree, IvfFlat, IvfPq, Bitmap, LabelList, HnswPq, HnswSq, FTS
from .remote import ClientConfig
class Connection(object):
uri: str
@@ -71,11 +72,15 @@ async def connect(
region: Optional[str],
host_override: Optional[str],
read_consistency_interval: Optional[float],
client_config: Optional[Union[ClientConfig, Dict[str, Any]]],
storage_options: Optional[Dict[str, str]],
) -> Connection: ...
class RecordBatchStream:
@property
def schema(self) -> pa.Schema: ...
async def next(self) -> Optional[pa.RecordBatch]: ...
def __aiter__(self) -> "RecordBatchStream": ...
async def __anext__(self) -> pa.RecordBatch: ...
class Query:
def where(self, filter: str): ...
@@ -142,6 +147,10 @@ class CompactionStats:
files_removed: int
files_added: int
class CleanupStats:
bytes_removed: int
old_versions: int
class RemovalStats:
bytes_removed: int
old_versions_removed: int

View File

@@ -110,7 +110,7 @@ class Query(pydantic.BaseModel):
full_text_query: Optional[Union[str, dict]] = None
# top k results to return
k: int
k: Optional[int] = None
# # metrics
metric: str = "L2"
@@ -257,7 +257,7 @@ class LanceQueryBuilder(ABC):
def __init__(self, table: "Table"):
self._table = table
self._limit = 10
self._limit = None
self._offset = 0
self._columns = None
self._where = None
@@ -370,8 +370,7 @@ class LanceQueryBuilder(ABC):
The maximum number of results to return.
The default query limit is 10 results.
For ANN/KNN queries, you must specify a limit.
Entering 0, a negative number, or None will reset
the limit to the default value of 10.
For plain searches, all records are returned if limit not set.
*WARNING* if you have a large dataset, setting
the limit to a large number, e.g. the table size,
can potentially result in reading a
@@ -595,6 +594,8 @@ class LanceVectorQueryBuilder(LanceQueryBuilder):
fast_search: bool = False,
):
super().__init__(table)
if self._limit is None:
self._limit = 10
self._query = query
self._distance_type = "L2"
self._nprobes = 20
@@ -888,6 +889,8 @@ class LanceFtsQueryBuilder(LanceQueryBuilder):
fts_columns: Union[str, List[str]] = [],
):
super().__init__(table)
if self._limit is None:
self._limit = 10
self._query = query
self._phrase_query = False
self.ordering_field_name = ordering_field_name
@@ -1055,7 +1058,7 @@ class LanceEmptyQueryBuilder(LanceQueryBuilder):
query = Query(
columns=self._columns,
filter=self._where,
k=self._limit or 10,
k=self._limit,
with_row_id=self._with_row_id,
vector=[],
# not actually respected in remote query

View File

@@ -9,7 +9,8 @@ from typing import Any, Dict, Iterable, List, Optional, Union
from urllib.parse import urlparse
import warnings
from lancedb import connect_async
# Remove this import to fix circular dependency
# from lancedb import connect_async
from lancedb.remote import ClientConfig
import pyarrow as pa
from overrides import override
@@ -78,6 +79,9 @@ class RemoteDBConnection(DBConnection):
self.client_config = client_config
# Import connect_async here to avoid circular import
from lancedb import connect_async
self._conn = LOOP.run(
connect_async(
db_url,

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import inspect
import deprecation
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -24,16 +25,15 @@ from typing import (
)
from urllib.parse import urlparse
import lance
from . import __version__
from lancedb.arrow import peek_reader
from lancedb.background_loop import LOOP
from .dependencies import _check_for_pandas
from .dependencies import _check_for_hugging_face, _check_for_pandas
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.fs as pa_fs
import numpy as np
from lance import LanceDataset
from lance.dependencies import _check_for_hugging_face
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
@@ -66,24 +66,36 @@ from .index import lang_mapping
if TYPE_CHECKING:
from ._lancedb import Table as LanceDBTable, OptimizeStats, CompactionStats
from ._lancedb import (
Table as LanceDBTable,
OptimizeStats,
CleanupStats,
CompactionStats,
)
from .db import LanceDBConnection
from .index import IndexConfig
from lance.dataset import CleanupStats, ReaderLike
import pandas
import PIL
from .types import (
QueryType,
OnBadVectorsType,
AddMode,
CreateMode,
VectorIndexType,
ScalarIndexType,
BaseTokenizerType,
DistanceType,
)
pd = safe_import_pandas()
pl = safe_import_polars()
QueryType = Literal["vector", "fts", "hybrid", "auto"]
def _into_pyarrow_reader(data) -> pa.RecordBatchReader:
if _check_for_hugging_face(data):
# Huggingface datasets
from lance.dependencies import datasets
from lancedb.dependencies import datasets
if _check_for_hugging_face(data):
if isinstance(data, datasets.Dataset):
schema = data.features.arrow_schema
return pa.RecordBatchReader.from_batches(schema, data.data.to_batches())
@@ -175,7 +187,7 @@ def _sanitize_data(
data: "DATA",
target_schema: Optional[pa.Schema] = None,
metadata: Optional[dict] = None, # embedding metadata
on_bad_vectors: Literal["error", "drop", "fill", "null"] = "error",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
*,
allow_subschema: bool = False,
@@ -321,7 +333,7 @@ def sanitize_create_table(
data,
schema: Union[pa.Schema, LanceModel],
metadata=None,
on_bad_vectors: str = "error",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
):
if inspect.isclass(schema) and issubclass(schema, LanceModel):
@@ -573,9 +585,7 @@ class Table(ABC):
accelerator: Optional[str] = None,
index_cache_size: Optional[int] = None,
*,
index_type: Literal[
"IVF_FLAT", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"
] = "IVF_PQ",
index_type: VectorIndexType = "IVF_PQ",
num_bits: int = 8,
max_iterations: int = 50,
sample_rate: int = 256,
@@ -640,7 +650,7 @@ class Table(ABC):
column: str,
*,
replace: bool = True,
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"] = "BTREE",
index_type: ScalarIndexType = "BTREE",
):
"""Create a scalar index on a column.
@@ -705,7 +715,7 @@ class Table(ABC):
tokenizer_name: Optional[str] = None,
with_position: bool = True,
# tokenizer configs:
base_tokenizer: Literal["simple", "raw", "whitespace"] = "simple",
base_tokenizer: BaseTokenizerType = "simple",
language: str = "English",
max_token_length: Optional[int] = 40,
lower_case: bool = True,
@@ -774,8 +784,8 @@ class Table(ABC):
def add(
self,
data: DATA,
mode: str = "append",
on_bad_vectors: str = "error",
mode: AddMode = "append",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
):
"""Add more data to the [Table](Table).
@@ -957,7 +967,7 @@ class Table(ABC):
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: str,
on_bad_vectors: OnBadVectorsType,
fill_value: float,
): ...
@@ -1074,7 +1084,7 @@ class Table(ABC):
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
) -> "CleanupStats":
"""
Clean up old versions of the table, freeing disk space.
@@ -1385,6 +1395,14 @@ class LanceTable(Table):
def to_lance(self, **kwargs) -> LanceDataset:
"""Return the LanceDataset backing this table."""
try:
import lance
except ImportError:
raise ImportError(
"The lance library is required to use this function. "
"Please install with `pip install pylance`."
)
return lance.dataset(
self._dataset_path,
version=self.version,
@@ -1561,10 +1579,10 @@ class LanceTable(Table):
def create_index(
self,
metric="L2",
metric: DistanceType = "l2",
num_partitions=None,
num_sub_vectors=None,
vector_column_name=VECTOR_COLUMN_NAME,
vector_column_name: str = VECTOR_COLUMN_NAME,
replace: bool = True,
accelerator: Optional[str] = None,
index_cache_size: Optional[int] = None,
@@ -1650,7 +1668,7 @@ class LanceTable(Table):
column: str,
*,
replace: bool = True,
index_type: Literal["BTREE", "BITMAP", "LABEL_LIST"] = "BTREE",
index_type: ScalarIndexType = "BTREE",
):
if index_type == "BTREE":
config = BTree()
@@ -1675,7 +1693,7 @@ class LanceTable(Table):
tokenizer_name: Optional[str] = None,
with_position: bool = True,
# tokenizer configs:
base_tokenizer: str = "simple",
base_tokenizer: BaseTokenizerType = "simple",
language: str = "English",
max_token_length: Optional[int] = 40,
lower_case: bool = True,
@@ -1809,8 +1827,8 @@ class LanceTable(Table):
def add(
self,
data: DATA,
mode: str = "append",
on_bad_vectors: str = "error",
mode: AddMode = "append",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
):
"""Add data to the table.
@@ -1844,7 +1862,7 @@ class LanceTable(Table):
def merge(
self,
other_table: Union[LanceTable, ReaderLike],
other_table: Union[LanceTable, DATA],
left_on: str,
right_on: Optional[str] = None,
schema: Optional[Union[pa.Schema, LanceModel]] = None,
@@ -1894,12 +1912,13 @@ class LanceTable(Table):
1 2 b e
2 3 c f
"""
if isinstance(schema, LanceModel):
schema = schema.to_arrow_schema()
if isinstance(other_table, LanceTable):
other_table = other_table.to_lance()
if isinstance(other_table, LanceDataset):
other_table = other_table.to_table()
else:
other_table = _sanitize_data(
other_table,
schema,
)
self.to_lance().merge(
other_table, left_on=left_on, right_on=right_on, schema=schema
)
@@ -2047,7 +2066,7 @@ class LanceTable(Table):
query_type,
vector_column_name=vector_column_name,
ordering_field_name=ordering_field_name,
fts_columns=fts_columns,
fts_columns=fts_columns or [],
)
@classmethod
@@ -2057,13 +2076,13 @@ class LanceTable(Table):
name: str,
data: Optional[DATA] = None,
schema: Optional[pa.Schema] = None,
mode: Literal["create", "overwrite"] = "create",
mode: CreateMode = "create",
exist_ok: bool = False,
on_bad_vectors: str = "error",
on_bad_vectors: OnBadVectorsType = "error",
fill_value: float = 0.0,
embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None,
*,
storage_options: Optional[Dict[str, str]] = None,
storage_options: Optional[Dict[str, str | bool]] = None,
data_storage_version: Optional[str] = None,
enable_v2_manifest_paths: Optional[bool] = None,
):
@@ -2217,17 +2236,22 @@ class LanceTable(Table):
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: str,
on_bad_vectors: OnBadVectorsType,
fill_value: float,
):
LOOP.run(self._table._do_merge(merge, new_data, on_bad_vectors, fill_value))
@deprecation.deprecated(
deprecated_in="0.21.0",
current_version=__version__,
details="Use `Table.optimize` instead.",
)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
) -> "CleanupStats":
"""
Clean up old versions of the table, freeing disk space.
@@ -2252,6 +2276,11 @@ class LanceTable(Table):
older_than, delete_unverified=delete_unverified
)
@deprecation.deprecated(
deprecated_in="0.21.0",
current_version=__version__,
details="Use `Table.optimize` instead.",
)
def compact_files(self, *args, **kwargs) -> CompactionStats:
"""
Run the compaction process on the table.
@@ -2383,6 +2412,19 @@ class LanceTable(Table):
"""
LOOP.run(self._table.migrate_v2_manifest_paths())
def replace_field_metadata(self, field_name: str, new_metadata: Dict[str, str]):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
LOOP.run(self._table.replace_field_metadata(field_name, new_metadata))
def _handle_bad_vectors(
reader: pa.RecordBatchReader,
@@ -2845,7 +2887,7 @@ class AsyncTable:
data: DATA,
*,
mode: Optional[Literal["append", "overwrite"]] = "append",
on_bad_vectors: Optional[str] = None,
on_bad_vectors: Optional[OnBadVectorsType] = None,
fill_value: Optional[float] = None,
):
"""Add more data to the [Table](Table).
@@ -2951,7 +2993,7 @@ class AsyncTable:
@overload
async def search(
self,
query: Optional[Union[str]] = None,
query: Optional[str] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["auto"] = ...,
ordering_field_name: Optional[str] = None,
@@ -2961,7 +3003,7 @@ class AsyncTable:
@overload
async def search(
self,
query: Optional[Union[str]] = None,
query: Optional[str] = None,
vector_column_name: Optional[str] = None,
query_type: Literal["hybrid"] = ...,
ordering_field_name: Optional[str] = None,
@@ -3005,7 +3047,7 @@ class AsyncTable:
query_type: QueryType = "auto",
ordering_field_name: Optional[str] = None,
fts_columns: Optional[Union[str, List[str]]] = None,
) -> AsyncQuery:
) -> Union[AsyncHybridQuery | AsyncFTSQuery | AsyncVectorQuery]:
"""Create a search query to find the nearest neighbors
of the given query vector. We currently support [vector search][search]
and [full-text search][experimental-full-text-search].
@@ -3195,7 +3237,9 @@ class AsyncTable:
# The sync remote table calls into this method, so we need to map the
# query to the async version of the query and run that here. This is only
# used for that code path right now.
async_query = self.query().limit(query.k)
async_query = self.query()
if query.k is not None:
async_query = async_query.limit(query.k)
if query.offset > 0:
async_query = async_query.offset(query.offset)
if query.columns:
@@ -3242,7 +3286,7 @@ class AsyncTable:
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: str,
on_bad_vectors: OnBadVectorsType,
fill_value: float,
):
schema = await self.schema()
@@ -3611,6 +3655,21 @@ class AsyncTable:
"""
await self._inner.migrate_manifest_paths_v2()
async def replace_field_metadata(
self, field_name: str, new_metadata: dict[str, str]
):
"""
Replace the metadata of a field in the schema
Parameters
----------
field_name: str
The name of the field to replace the metadata for
new_metadata: dict
The new metadata to set
"""
await self._inner.replace_field_metadata(field_name, new_metadata)
@dataclass
class IndexStatistics:

View File

@@ -0,0 +1,28 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from typing import Literal
# Query type literals
QueryType = Literal["vector", "fts", "hybrid", "auto"]
# Distance type literals
DistanceType = Literal["l2", "cosine", "dot"]
DistanceTypeWithHamming = Literal["l2", "cosine", "dot", "hamming"]
# Vector handling literals
OnBadVectorsType = Literal["error", "drop", "fill", "null"]
# Mode literals
AddMode = Literal["append", "overwrite"]
CreateMode = Literal["create", "overwrite"]
# Index type literals
VectorIndexType = Literal["IVF_FLAT", "IVF_PQ", "IVF_HNSW_SQ", "IVF_HNSW_PQ"]
ScalarIndexType = Literal["BTREE", "BITMAP", "LABEL_LIST"]
IndexType = Literal[
"IVF_PQ", "IVF_HNSW_PQ", "IVF_HNSW_SQ", "FTS", "BTREE", "BITMAP", "LABEL_LIST"
]
# Tokenizer literals
BaseTokenizerType = Literal["simple", "raw", "whitespace"]

View File

@@ -419,17 +419,17 @@ def test_embedding_function_safe_model_dump(embedding_type):
dumped_model = model.safe_model_dump()
assert all(
not k.startswith("_") for k in dumped_model.keys()
), f"{embedding_type}: Dumped model contains keys starting with underscore"
assert all(not k.startswith("_") for k in dumped_model.keys()), (
f"{embedding_type}: Dumped model contains keys starting with underscore"
)
assert (
"max_retries" in dumped_model
), f"{embedding_type}: Essential field 'max_retries' is missing from dumped model"
assert "max_retries" in dumped_model, (
f"{embedding_type}: Essential field 'max_retries' is missing from dumped model"
)
assert isinstance(
dumped_model, dict
), f"{embedding_type}: Dumped model is not a dictionary"
assert isinstance(dumped_model, dict), (
f"{embedding_type}: Dumped model is not a dictionary"
)
for key in model.__dict__:
if key.startswith("_"):

View File

@@ -174,6 +174,10 @@ def test_search_fts(table, use_tantivy):
assert len(results) == 5
assert len(results[0]) == 3 # id, text, _score
# Default limit of 10
results = table.search("puppy").select(["id", "text"]).to_list()
assert len(results) == 10
@pytest.mark.asyncio
async def test_fts_select_async(async_table):

View File

@@ -129,6 +129,6 @@ def test_normalize_scores():
if invert:
expected = pc.subtract(1.0, expected)
assert pc.equal(
result, expected
), f"Expected {expected} but got {result} for invert={invert}"
assert pc.equal(result, expected), (
f"Expected {expected} but got {result} for invert={invert}"
)

View File

@@ -784,8 +784,7 @@ async def test_query_search_auto(mem_db_async: AsyncConnection):
with pytest.raises(
Exception,
match=(
"Cannot perform full text search unless an INVERTED index has "
"been created"
"Cannot perform full text search unless an INVERTED index has been created"
),
):
query = await (await tbl2.search("0.1")).to_arrow()

View File

@@ -9,6 +9,7 @@ import json
import threading
from unittest.mock import MagicMock
import uuid
from packaging.version import Version
import lancedb
from lancedb.conftest import MockTextEmbeddingFunction
@@ -277,11 +278,12 @@ def test_table_create_indices():
@contextlib.contextmanager
def query_test_table(query_handler):
def query_test_table(query_handler, *, server_version=Version("0.1.0")):
def handler(request):
if request.path == "/v1/table/test/describe/":
request.send_response(200)
request.send_header("Content-Type", "application/json")
request.send_header("phalanx-version", str(server_version))
request.end_headers()
request.wfile.write(b"{}")
elif request.path == "/v1/table/test/query/":
@@ -388,17 +390,25 @@ def test_query_sync_maximal():
)
def test_query_sync_multiple_vectors():
@pytest.mark.parametrize("server_version", [Version("0.1.0"), Version("0.2.0")])
def test_query_sync_batch_queries(server_version):
def handler(body):
# TODO: we will add the ability to get the server version,
# so that we can decide how to perform batch quires.
vectors = body["vector"]
if server_version >= Version(
"0.2.0"
): # we can handle batch queries in single request since 0.2.0
assert len(vectors) == 2
res = []
for i, vector in enumerate(vectors):
res.append({"id": 1, "query_index": i})
return pa.Table.from_pylist(res)
else:
assert len(vectors) == 3 # matching dim
return pa.table({"id": [1]})
with query_test_table(handler) as table:
with query_test_table(handler, server_version=server_version) as table:
results = table.search([[1, 2, 3], [4, 5, 6]]).limit(1).to_list()
assert len(results) == 2
results.sort(key=lambda x: x["query_index"])

View File

@@ -131,9 +131,9 @@ def _run_test_reranker(reranker, table, query, query_vector, schema):
"represents the relevance of the result to the query & should "
"be descending."
)
assert np.all(
np.diff(result.column("_relevance_score").to_numpy()) <= 0
), ascending_relevance_err
assert np.all(np.diff(result.column("_relevance_score").to_numpy()) <= 0), (
ascending_relevance_err
)
# Vector search setting
result = (
@@ -143,9 +143,9 @@ def _run_test_reranker(reranker, table, query, query_vector, schema):
.to_arrow()
)
assert len(result) == 30
assert np.all(
np.diff(result.column("_relevance_score").to_numpy()) <= 0
), ascending_relevance_err
assert np.all(np.diff(result.column("_relevance_score").to_numpy()) <= 0), (
ascending_relevance_err
)
result_explicit = (
table.search(query_vector, vector_column_name="vector")
.rerank(reranker=reranker, query_string=query)
@@ -168,9 +168,9 @@ def _run_test_reranker(reranker, table, query, query_vector, schema):
.to_arrow()
)
assert len(result) > 0
assert np.all(
np.diff(result.column("_relevance_score").to_numpy()) <= 0
), ascending_relevance_err
assert np.all(np.diff(result.column("_relevance_score").to_numpy()) <= 0), (
ascending_relevance_err
)
# empty FTS results
query = "abcxyz" * 100
@@ -185,9 +185,9 @@ def _run_test_reranker(reranker, table, query, query_vector, schema):
# should return _relevance_score column
assert "_relevance_score" in result.column_names
assert np.all(
np.diff(result.column("_relevance_score").to_numpy()) <= 0
), ascending_relevance_err
assert np.all(np.diff(result.column("_relevance_score").to_numpy()) <= 0), (
ascending_relevance_err
)
# Multi-vector search setting
rs1 = table.search(query, vector_column_name="vector").limit(10).with_row_id(True)
@@ -262,9 +262,9 @@ def _run_test_hybrid_reranker(reranker, tmp_path, use_tantivy):
"represents the relevance of the result to the query & should "
"be descending."
)
assert np.all(
np.diff(result.column("_relevance_score").to_numpy()) <= 0
), ascending_relevance_err
assert np.all(np.diff(result.column("_relevance_score").to_numpy()) <= 0), (
ascending_relevance_err
)
# Test with empty FTS results
query = "abcxyz" * 100
@@ -278,9 +278,9 @@ def _run_test_hybrid_reranker(reranker, tmp_path, use_tantivy):
)
# should return _relevance_score column
assert "_relevance_score" in result.column_names
assert np.all(
np.diff(result.column("_relevance_score").to_numpy()) <= 0
), ascending_relevance_err
assert np.all(np.diff(result.column("_relevance_score").to_numpy()) <= 0), (
ascending_relevance_err
)
@pytest.mark.parametrize("use_tantivy", [True, False])

View File

@@ -252,3 +252,27 @@ def test_s3_dynamodb_sync(s3_bucket: str, commit_table: str, monkeypatch):
db.drop_table("test_ddb_sync")
assert db.table_names() == []
db.drop_database()
@pytest.mark.s3_test
def test_s3_dynamodb_drop_all_tables(s3_bucket: str, commit_table: str, monkeypatch):
for key, value in CONFIG.items():
monkeypatch.setenv(key.upper(), value)
uri = f"s3+ddb://{s3_bucket}/test2?ddbTableName={commit_table}"
db = lancedb.connect(uri, read_consistency_interval=timedelta(0))
data = pa.table({"x": ["a", "b", "c"]})
db.create_table("foo", data)
db.create_table("bar", data)
assert db.table_names() == ["bar", "foo"]
# dropping all tables should clear multiple tables
db.drop_all_tables()
assert db.table_names() == []
# create a new table with the same name to ensure DDB is clean
db.create_table("foo", data)
assert db.table_names() == ["foo"]
db.drop_all_tables()

View File

@@ -1025,13 +1025,13 @@ def test_empty_query(mem_db: DBConnection):
table = mem_db.create_table("my_table2", data=[{"id": i} for i in range(100)])
df = table.search().select(["id"]).to_pandas()
assert len(df) == 10
assert len(df) == 100
# None is the same as default
df = table.search().select(["id"]).limit(None).to_pandas()
assert len(df) == 10
assert len(df) == 100
# invalid limist is the same as None, wihch is the same as default
df = table.search().select(["id"]).limit(-1).to_pandas()
assert len(df) == 10
assert len(df) == 100
# valid limit should work
df = table.search().select(["id"]).limit(42).to_pandas()
assert len(df) == 42
@@ -1481,3 +1481,12 @@ async def test_optimize_delete_unverified(tmp_db_async: AsyncConnection, tmp_pat
cleanup_older_than=timedelta(seconds=0), delete_unverified=True
)
assert stats.prune.old_versions_removed == 2
def test_replace_field_metadata(tmp_path):
db = lancedb.connect(tmp_path)
table = db.create_table("my_table", data=[{"x": 0}])
table.replace_field_metadata("x", {"foo": "bar"})
schema = table.schema
field = schema[0].metadata
assert field == {b"foo": b"bar"}

View File

@@ -43,7 +43,7 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
} => Python::with_gil(|py| {
let message = err.to_string();
let http_err_cls = py
.import_bound(intern!(py, "lancedb.remote.errors"))?
.import(intern!(py, "lancedb.remote.errors"))?
.getattr(intern!(py, "HttpError"))?;
let err = http_err_cls.call1((
message,
@@ -63,7 +63,7 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
err.setattr(intern!(py, "__cause__"), cause_err)?;
}
Err(PyErr::from_value_bound(err))
Err(PyErr::from_value(err))
}),
LanceError::Retry {
request_id,
@@ -85,7 +85,7 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
let message = err.to_string();
let retry_error_cls = py
.import_bound(intern!(py, "lancedb.remote.errors"))?
.import(intern!(py, "lancedb.remote.errors"))?
.getattr("RetryError")?;
let err = retry_error_cls.call1((
message,
@@ -100,7 +100,7 @@ impl<T> PythonErrorExt<T> for std::result::Result<T, LanceError> {
))?;
err.setattr(intern!(py, "__cause__"), cause_err)?;
Err(PyErr::from_value_bound(err))
Err(PyErr::from_value(err))
}),
_ => self.runtime_error(),
},
@@ -127,18 +127,16 @@ fn http_from_rust_error(
status_code: Option<u16>,
) -> PyResult<PyErr> {
let message = err.to_string();
let http_err_cls = py
.import_bound("lancedb.remote.errors")?
.getattr("HttpError")?;
let http_err_cls = py.import("lancedb.remote.errors")?.getattr("HttpError")?;
let py_err = http_err_cls.call1((message, request_id, status_code))?;
// Reset the traceback since it doesn't provide additional information.
let py_err = py_err.call_method1(intern!(py, "with_traceback"), (PyNone::get_bound(py),))?;
let py_err = py_err.call_method1(intern!(py, "with_traceback"), (PyNone::get(py),))?;
if let Some(cause) = err.source() {
let cause_err = http_from_rust_error(py, cause, request_id, status_code)?;
py_err.setattr(intern!(py, "__cause__"), cause_err)?;
}
Ok(PyErr::from_value_bound(py_err))
Ok(PyErr::from_value(py_err))
}

View File

@@ -7,29 +7,32 @@ use lancedb::index::{
vector::{IvfHnswPqIndexBuilder, IvfHnswSqIndexBuilder, IvfPqIndexBuilder},
Index as LanceDbIndex,
};
use pyo3::types::PyStringMethods;
use pyo3::IntoPyObject;
use pyo3::{
exceptions::{PyKeyError, PyValueError},
intern, pyclass, pymethods,
types::PyAnyMethods,
Bound, FromPyObject, IntoPy, PyAny, PyObject, PyResult, Python,
Bound, FromPyObject, PyAny, PyResult, Python,
};
use crate::util::parse_distance_type;
pub fn class_name<'a>(ob: &'a Bound<'_, PyAny>) -> PyResult<&'a str> {
let full_name: &str = ob
pub fn class_name(ob: &'_ Bound<'_, PyAny>) -> PyResult<String> {
let full_name = ob
.getattr(intern!(ob.py(), "__class__"))?
.getattr(intern!(ob.py(), "__name__"))?
.extract()?;
.getattr(intern!(ob.py(), "__name__"))?;
let full_name = full_name.downcast()?.to_string_lossy();
match full_name.rsplit_once('.') {
Some((_, name)) => Ok(name),
None => Ok(full_name),
Some((_, name)) => Ok(name.to_string()),
None => Ok(full_name.to_string()),
}
}
pub fn extract_index_params(source: &Option<Bound<'_, PyAny>>) -> PyResult<LanceDbIndex> {
if let Some(source) = source {
match class_name(source)? {
match class_name(source)?.as_str() {
"BTree" => Ok(LanceDbIndex::BTree(BTreeIndexBuilder::default())),
"Bitmap" => Ok(LanceDbIndex::Bitmap(Default::default())),
"LabelList" => Ok(LanceDbIndex::LabelList(Default::default())),
@@ -196,11 +199,11 @@ impl IndexConfig {
// For backwards-compatibility with the old sync SDK, we also support getting
// attributes via __getitem__.
pub fn __getitem__(&self, key: String, py: Python<'_>) -> PyResult<PyObject> {
pub fn __getitem__<'a>(&self, key: String, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
match key.as_str() {
"index_type" => Ok(self.index_type.clone().into_py(py)),
"columns" => Ok(self.columns.clone().into_py(py)),
"name" | "index_name" => Ok(self.name.clone().into_py(py)),
"index_type" => Ok(self.index_type.clone().into_pyobject(py)?.into_any()),
"columns" => Ok(self.columns.clone().into_pyobject(py)?.into_any()),
"name" | "index_name" => Ok(self.name.clone().into_pyobject(py)?.into_any()),
_ => Err(PyKeyError::new_err(format!("Invalid key: {}", key))),
}
}

View File

@@ -10,12 +10,13 @@ use lancedb::table::{
Table as LanceDbTable,
};
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
Bound, FromPyObject, PyAny, PyRef, PyResult, Python, ToPyObject,
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::HashMap;
use crate::{
error::PythonErrorExt,
@@ -221,7 +222,7 @@ impl Table {
let stats = inner.index_stats(&index_name).await.infer_error()?;
if let Some(stats) = stats {
Python::with_gil(|py| {
let dict = PyDict::new_bound(py);
let dict = PyDict::new(py);
dict.set_item("num_indexed_rows", stats.num_indexed_rows)?;
dict.set_item("num_unindexed_rows", stats.num_unindexed_rows)?;
dict.set_item("index_type", stats.index_type.to_string())?;
@@ -234,7 +235,7 @@ impl Table {
dict.set_item("num_indices", num_indices)?;
}
Ok(Some(dict.to_object(py)))
Ok(Some(dict.unbind()))
})
} else {
Ok(None)
@@ -265,7 +266,7 @@ impl Table {
versions
.iter()
.map(|v| {
let dict = PyDict::new_bound(py);
let dict = PyDict::new(py);
dict.set_item("version", v.version).unwrap();
dict.set_item(
"timestamp",
@@ -274,14 +275,13 @@ impl Table {
.unwrap();
let tup: Vec<(&String, &String)> = v.metadata.iter().collect();
dict.set_item("metadata", tup.into_py_dict_bound(py))
.unwrap();
dict.to_object(py)
dict.set_item("metadata", tup.into_py_dict(py)?).unwrap();
Ok(dict.unbind())
})
.collect::<Vec<_>>()
.collect::<PyResult<Vec<_>>>()
});
Ok(versions_as_dict)
versions_as_dict
})
}
@@ -486,6 +486,37 @@ impl Table {
Ok(())
})
}
pub fn replace_field_metadata<'a>(
self_: PyRef<'a, Self>,
field_name: String,
metadata: &Bound<'_, PyDict>,
) -> PyResult<Bound<'a, PyAny>> {
let mut new_metadata = HashMap::<String, String>::new();
for (column_name, value) in metadata.into_iter() {
let key: String = column_name.extract()?;
let value: String = value.extract()?;
new_metadata.insert(key, value);
}
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
let native_tbl = inner
.as_native()
.ok_or_else(|| PyValueError::new_err("This cannot be run on a remote table"))?;
let schema = native_tbl.manifest().await.infer_error()?.schema;
let field = schema
.field(&field_name)
.ok_or_else(|| PyKeyError::new_err(format!("Field {} not found", field_name)))?;
native_tbl
.replace_field_metadata(vec![(field.id as u32, new_metadata)])
.await
.infer_error()?;
Ok(())
})
}
}
#[derive(FromPyObject)]

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-node"
version = "0.16.1-beta.3"
version = "0.18.0"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.16.1-beta.3"
version = "0.18.0"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true
@@ -70,6 +70,7 @@ candle-core = { version = "0.6.0", optional = true }
candle-transformers = { version = "0.6.0", optional = true }
candle-nn = { version = "0.6.0", optional = true }
tokenizers = { version = "0.19.1", optional = true }
semver = { workspace = true }
# For a workaround, see workspace Cargo.toml
crunchy.workspace = true
@@ -87,6 +88,7 @@ aws-config = { version = "1.0" }
aws-smithy-runtime = { version = "1.3" }
datafusion.workspace = true
http-body = "1" # Matching reqwest
rstest = "0.23.0"
[features]

View File

@@ -4,12 +4,14 @@
use std::{pin::Pin, sync::Arc};
pub use arrow_schema;
use futures::{Stream, StreamExt};
use datafusion_common::DataFusionError;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::{Stream, StreamExt, TryStreamExt};
#[cfg(feature = "polars")]
use {crate::polars_arrow_convertors, polars::frame::ArrowChunk, polars::prelude::DataFrame};
use crate::error::Result;
use crate::{error::Result, Error};
/// An iterator of batches that also has a schema
pub trait RecordBatchReader: Iterator<Item = Result<arrow_array::RecordBatch>> {
@@ -65,6 +67,20 @@ impl<I: lance::io::RecordBatchStream + 'static> From<I> for SendableRecordBatchS
}
}
pub trait SendableRecordBatchStreamExt {
fn into_df_stream(self) -> datafusion_physical_plan::SendableRecordBatchStream;
}
impl SendableRecordBatchStreamExt for SendableRecordBatchStream {
fn into_df_stream(self) -> datafusion_physical_plan::SendableRecordBatchStream {
let schema = self.schema();
Box::pin(RecordBatchStreamAdapter::new(
schema,
self.map_err(|ldb_err| DataFusionError::External(ldb_err.into())),
))
}
}
/// A simple RecordBatchStream formed from the two parts (stream + schema)
#[pin_project::pin_project]
pub struct SimpleRecordBatchStream<S: Stream<Item = Result<arrow_array::RecordBatch>>> {
@@ -101,7 +117,7 @@ impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> RecordBatchStream
/// used in methods like [`crate::connection::Connection::create_table`]
/// or [`crate::table::Table::add`]
pub trait IntoArrow {
/// Convert the data into an Arrow array
/// Convert the data into an iterator of Arrow batches
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>>;
}
@@ -113,11 +129,38 @@ impl<T: arrow_array::RecordBatchReader + Send + 'static> IntoArrow for T {
}
}
/// A trait for converting incoming data to Arrow asynchronously
///
/// Serves the same purpose as [`IntoArrow`], but for asynchronous data.
///
/// Note: Arrow has no async equivalent to RecordBatchReader and so
pub trait IntoArrowStream {
/// Convert the data into a stream of Arrow batches
fn into_arrow(self) -> Result<SendableRecordBatchStream>;
}
impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> SimpleRecordBatchStream<S> {
pub fn new(stream: S, schema: Arc<arrow_schema::Schema>) -> Self {
Self { schema, stream }
}
}
impl IntoArrowStream for SendableRecordBatchStream {
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
Ok(self)
}
}
impl IntoArrowStream for datafusion_physical_plan::SendableRecordBatchStream {
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let stream = self.map_err(|df_err| Error::Runtime {
message: df_err.to_string(),
});
Ok(Box::pin(SimpleRecordBatchStream::new(stream, schema)))
}
}
#[cfg(feature = "polars")]
/// An iterator of record batches formed from a Polars DataFrame.
pub struct PolarsDataFrameRecordBatchReader {

View File

@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Catalog implementation for managing databases
pub mod listing;
use std::collections::HashMap;
use std::sync::Arc;
use crate::database::Database;
use crate::error::Result;
use async_trait::async_trait;
/// Request parameters for listing databases
#[derive(Clone, Debug, Default)]
pub struct DatabaseNamesRequest {
/// Start listing after this name (exclusive)
pub start_after: Option<String>,
/// Maximum number of names to return
pub limit: Option<u32>,
}
/// Request to open an existing database
#[derive(Clone, Debug)]
pub struct OpenDatabaseRequest {
/// The name of the database to open
pub name: String,
/// A map of database-specific options
///
/// Consult the catalog / database implementation to determine which options are available
pub database_options: HashMap<String, String>,
}
/// Database creation mode
///
/// The default behavior is Create
pub enum CreateDatabaseMode {
/// Create new database, error if exists
Create,
/// Open existing database if present
ExistOk,
/// Overwrite existing database
Overwrite,
}
impl Default for CreateDatabaseMode {
fn default() -> Self {
Self::Create
}
}
/// Request to create a new database
pub struct CreateDatabaseRequest {
/// The name of the database to create
pub name: String,
/// The creation mode
pub mode: CreateDatabaseMode,
/// A map of catalog-specific options, consult your catalog implementation to determine what's available
pub options: HashMap<String, String>,
}
#[async_trait]
pub trait Catalog: Send + Sync + std::fmt::Debug + 'static {
/// List database names with pagination
async fn database_names(&self, request: DatabaseNamesRequest) -> Result<Vec<String>>;
/// Create a new database
async fn create_database(&self, request: CreateDatabaseRequest) -> Result<Arc<dyn Database>>;
/// Open existing database
async fn open_database(&self, request: OpenDatabaseRequest) -> Result<Arc<dyn Database>>;
/// Rename database
async fn rename_database(&self, old_name: &str, new_name: &str) -> Result<()>;
/// Delete database
async fn drop_database(&self, name: &str) -> Result<()>;
/// Delete all databases
async fn drop_all_databases(&self) -> Result<()>;
}

View File

@@ -0,0 +1,569 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
//! Catalog implementation based on a local file system.
use std::collections::HashMap;
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;
use super::{
Catalog, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest, OpenDatabaseRequest,
};
use crate::connection::ConnectRequest;
use crate::database::listing::ListingDatabase;
use crate::database::Database;
use crate::error::{CreateDirSnafu, Error, Result};
use async_trait::async_trait;
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
use lance_io::local::to_local_path;
use object_store::path::Path as ObjectStorePath;
use snafu::ResultExt;
/// A catalog implementation that works by listing subfolders in a directory
///
/// The listing catalog will be created with a base folder specified by the URI. Every subfolder
/// in this base folder will be considered a database. These will be opened as a
/// [`crate::database::listing::ListingDatabase`]
#[derive(Debug)]
pub struct ListingCatalog {
object_store: ObjectStore,
uri: String,
base_path: ObjectStorePath,
storage_options: HashMap<String, String>,
}
impl ListingCatalog {
/// Try to create a local directory to store the lancedb dataset
pub fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(path)?;
}
Ok(())
}
pub fn uri(&self) -> &str {
&self.uri
}
async fn open_path(path: &str) -> Result<Self> {
let (object_store, base_path) = ObjectStore::from_path(path).unwrap();
if object_store.is_local() {
Self::try_create_dir(path).context(CreateDirSnafu { path })?;
}
Ok(Self {
uri: path.to_string(),
base_path,
object_store,
storage_options: HashMap::new(),
})
}
pub async fn connect(request: &ConnectRequest) -> Result<Self> {
let uri = &request.uri;
let parse_res = url::Url::parse(uri);
match parse_res {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => Self::open_path(uri).await,
Ok(url) => {
let plain_uri = url.to_string();
let registry = Arc::new(ObjectStoreRegistry::default());
let storage_options = request.storage_options.clone();
let os_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
..Default::default()
};
let (object_store, base_path) =
ObjectStore::from_uri_and_params(registry, &plain_uri, &os_params).await?;
if object_store.is_local() {
Self::try_create_dir(&plain_uri).context(CreateDirSnafu { path: plain_uri })?;
}
Ok(Self {
uri: String::from(url.clone()),
base_path,
object_store,
storage_options,
})
}
Err(_) => Self::open_path(uri).await,
}
}
fn database_path(&self, name: &str) -> ObjectStorePath {
self.base_path.child(name.replace('\\', "/"))
}
}
#[async_trait]
impl Catalog for ListingCatalog {
async fn database_names(&self, request: DatabaseNamesRequest) -> Result<Vec<String>> {
let mut f = self
.object_store
.read_dir(self.base_path.clone())
.await?
.iter()
.map(Path::new)
.filter_map(|p| p.file_name().and_then(|s| s.to_str().map(String::from)))
.collect::<Vec<String>>();
f.sort();
if let Some(start_after) = request.start_after {
let index = f
.iter()
.position(|name| name.as_str() > start_after.as_str())
.unwrap_or(f.len());
f.drain(0..index);
}
if let Some(limit) = request.limit {
f.truncate(limit as usize);
}
Ok(f)
}
async fn create_database(&self, request: CreateDatabaseRequest) -> Result<Arc<dyn Database>> {
let db_path = self.database_path(&request.name);
let db_path_str = to_local_path(&db_path);
let exists = Path::new(&db_path_str).exists();
match request.mode {
CreateDatabaseMode::Create if exists => {
return Err(Error::DatabaseAlreadyExists { name: request.name })
}
CreateDatabaseMode::Create => {
create_dir_all(db_path.to_string()).unwrap();
}
CreateDatabaseMode::ExistOk => {
if !exists {
create_dir_all(db_path.to_string()).unwrap();
}
}
CreateDatabaseMode::Overwrite => {
if exists {
self.drop_database(&request.name).await?;
}
create_dir_all(db_path.to_string()).unwrap();
}
}
let db_uri = format!("/{}/{}", self.base_path, request.name);
let connect_request = ConnectRequest {
uri: db_uri,
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: self.storage_options.clone(),
};
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
}
async fn open_database(&self, request: OpenDatabaseRequest) -> Result<Arc<dyn Database>> {
let db_path = self.database_path(&request.name);
let db_path_str = to_local_path(&db_path);
let exists = Path::new(&db_path_str).exists();
if !exists {
return Err(Error::DatabaseNotFound { name: request.name });
}
let connect_request = ConnectRequest {
uri: db_path.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
storage_options: self.storage_options.clone(),
};
Ok(Arc::new(
ListingDatabase::connect_with_options(&connect_request).await?,
))
}
async fn rename_database(&self, _old_name: &str, _new_name: &str) -> Result<()> {
Err(Error::NotSupported {
message: "rename_database is not supported in LanceDB OSS yet".to_string(),
})
}
async fn drop_database(&self, name: &str) -> Result<()> {
let db_path = self.database_path(name);
self.object_store
.remove_dir_all(db_path.clone())
.await
.map_err(|err| match err {
lance::Error::NotFound { .. } => Error::DatabaseNotFound {
name: name.to_owned(),
},
_ => Error::from(err),
})?;
Ok(())
}
async fn drop_all_databases(&self) -> Result<()> {
self.object_store
.remove_dir_all(self.base_path.clone())
.await?;
Ok(())
}
}
#[cfg(all(test, not(windows)))]
mod tests {
use super::*;
/// file:/// URIs with drive letters do not work correctly on Windows
#[cfg(windows)]
fn path_to_uri(path: PathBuf) -> String {
path.to_str().unwrap().to_string()
}
#[cfg(not(windows))]
fn path_to_uri(path: PathBuf) -> String {
Url::from_file_path(path).unwrap().to_string()
}
async fn setup_catalog() -> (TempDir, ListingCatalog) {
let tempdir = tempfile::tempdir().unwrap();
let catalog_path = tempdir.path().join("catalog");
std::fs::create_dir_all(&catalog_path).unwrap();
let uri = path_to_uri(catalog_path);
let request = ConnectRequest {
uri: uri.clone(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
(tempdir, catalog)
}
use crate::database::{CreateTableData, CreateTableRequest, TableNamesRequest};
use crate::table::TableDefinition;
use arrow_schema::Field;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::{tempdir, TempDir};
use url::Url;
#[tokio::test]
async fn test_database_names() {
let (_tempdir, catalog) = setup_catalog().await;
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_create_database() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "db1".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert_eq!(names, vec!["db1"]);
}
#[tokio::test]
async fn test_create_database_exist_ok() {
let (_tempdir, catalog) = setup_catalog().await;
let db1 = catalog
.create_database(CreateDatabaseRequest {
name: "db_exist_ok".into(),
mode: CreateDatabaseMode::ExistOk,
options: HashMap::new(),
})
.await
.unwrap();
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
db1.create_table(CreateTableRequest {
name: "test_table".parse().unwrap(),
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
})
.await
.unwrap();
let db2 = catalog
.create_database(CreateDatabaseRequest {
name: "db_exist_ok".into(),
mode: CreateDatabaseMode::ExistOk,
options: HashMap::new(),
})
.await
.unwrap();
let tables = db2.table_names(TableNamesRequest::default()).await.unwrap();
assert_eq!(tables, vec!["test_table".to_string()]);
}
#[tokio::test]
async fn test_create_database_overwrite() {
let (_tempdir, catalog) = setup_catalog().await;
let db = catalog
.create_database(CreateDatabaseRequest {
name: "db_overwrite".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
db.create_table(CreateTableRequest {
name: "old_table".parse().unwrap(),
data: CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
mode: Default::default(),
write_options: Default::default(),
})
.await
.unwrap();
let tables = db.table_names(TableNamesRequest::default()).await.unwrap();
assert!(!tables.is_empty());
let new_db = catalog
.create_database(CreateDatabaseRequest {
name: "db_overwrite".into(),
mode: CreateDatabaseMode::Overwrite,
options: HashMap::new(),
})
.await
.unwrap();
let tables = new_db
.table_names(TableNamesRequest::default())
.await
.unwrap();
assert!(tables.is_empty());
}
#[tokio::test]
async fn test_create_database_overwrite_non_existing() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "new_db".into(),
mode: CreateDatabaseMode::Overwrite,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.contains(&"new_db".to_string()));
}
#[tokio::test]
async fn test_open_database() {
let (_tempdir, catalog) = setup_catalog().await;
// Test open non-existent
let result = catalog
.open_database(OpenDatabaseRequest {
name: "missing".into(),
database_options: HashMap::new(),
})
.await;
assert!(matches!(
result.unwrap_err(),
Error::DatabaseNotFound { name } if name == "missing"
));
// Create and open
catalog
.create_database(CreateDatabaseRequest {
name: "valid_db".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let db = catalog
.open_database(OpenDatabaseRequest {
name: "valid_db".into(),
database_options: HashMap::new(),
})
.await
.unwrap();
assert_eq!(
db.table_names(TableNamesRequest::default()).await.unwrap(),
Vec::<String>::new()
);
}
#[tokio::test]
async fn test_drop_database() {
let (_tempdir, catalog) = setup_catalog().await;
// Create test database
catalog
.create_database(CreateDatabaseRequest {
name: "to_drop".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(!names.is_empty());
// Drop database
catalog.drop_database("to_drop").await.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_drop_all_databases() {
let (_tempdir, catalog) = setup_catalog().await;
catalog
.create_database(CreateDatabaseRequest {
name: "db1".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
catalog
.create_database(CreateDatabaseRequest {
name: "db2".into(),
mode: CreateDatabaseMode::Create,
options: HashMap::new(),
})
.await
.unwrap();
catalog.drop_all_databases().await.unwrap();
let names = catalog
.database_names(DatabaseNamesRequest::default())
.await
.unwrap();
assert!(names.is_empty());
}
#[tokio::test]
async fn test_rename_database_unsupported() {
let (_tempdir, catalog) = setup_catalog().await;
let result = catalog.rename_database("old", "new").await;
assert!(matches!(
result.unwrap_err(),
Error::NotSupported { message } if message.contains("rename_database")
));
}
#[tokio::test]
async fn test_connect_local_path() {
let tmp_dir = tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let request = ConnectRequest {
uri: path.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
assert!(catalog.object_store.is_local());
assert_eq!(catalog.uri, path);
}
#[tokio::test]
async fn test_connect_file_scheme() {
let tmp_dir = tempdir().unwrap();
let path = tmp_dir.path();
let uri = path_to_uri(path.to_path_buf());
let request = ConnectRequest {
uri: uri.clone(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();
assert!(catalog.object_store.is_local());
assert_eq!(catalog.uri, uri);
}
#[tokio::test]
async fn test_connect_invalid_uri_fallback() {
let invalid_uri = "invalid:///path";
let request = ConnectRequest {
uri: invalid_uri.to_string(),
api_key: None,
region: None,
host_override: None,
#[cfg(feature = "remote")]
client_config: Default::default(),
storage_options: HashMap::new(),
read_consistency_interval: None,
};
let result = ListingCatalog::connect(&request).await;
assert!(result.is_err());
}
}

View File

@@ -11,7 +11,7 @@ use arrow_schema::{Field, SchemaRef};
use lance::dataset::ReadParams;
use object_store::aws::AwsCredential;
use crate::arrow::IntoArrow;
use crate::arrow::{IntoArrow, IntoArrowStream, SendableRecordBatchStream};
use crate::database::listing::{
ListingDatabase, OPT_NEW_TABLE_STORAGE_VERSION, OPT_NEW_TABLE_V2_MANIFEST_PATHS,
};
@@ -75,6 +75,14 @@ impl IntoArrow for NoData {
}
}
// Stores the value given from the initial CreateTableBuilder::new call
// and defers errors until `execute` is called
enum CreateTableBuilderInitialData {
None,
Iterator(Result<Box<dyn RecordBatchReader + Send>>),
Stream(Result<SendableRecordBatchStream>),
}
/// A builder for configuring a [`Connection::create_table`] operation
pub struct CreateTableBuilder<const HAS_DATA: bool> {
parent: Arc<dyn Database>,
@@ -83,7 +91,7 @@ pub struct CreateTableBuilder<const HAS_DATA: bool> {
request: CreateTableRequest,
// This is a bit clumsy but we defer errors until `execute` is called
// to maintain backwards compatibility
data: Option<Result<Box<dyn RecordBatchReader + Send>>>,
data: CreateTableBuilderInitialData,
}
// Builder methods that only apply when we have initial data
@@ -103,7 +111,26 @@ impl CreateTableBuilder<true> {
),
embeddings: Vec::new(),
embedding_registry,
data: Some(data.into_arrow()),
data: CreateTableBuilderInitialData::Iterator(data.into_arrow()),
}
}
fn new_streaming<T: IntoArrowStream>(
parent: Arc<dyn Database>,
name: String,
data: T,
embedding_registry: Arc<dyn EmbeddingRegistry>,
) -> Self {
let dummy_schema = Arc::new(arrow_schema::Schema::new(Vec::<Field>::default()));
Self {
parent,
request: CreateTableRequest::new(
name,
CreateTableData::Empty(TableDefinition::new_from_schema(dummy_schema)),
),
embeddings: Vec::new(),
embedding_registry,
data: CreateTableBuilderInitialData::Stream(data.into_arrow()),
}
}
@@ -125,18 +152,38 @@ impl CreateTableBuilder<true> {
}
fn into_request(self) -> Result<CreateTableRequest> {
let data = if self.embeddings.is_empty() {
self.data.unwrap()?
} else {
let data = self.data.unwrap()?;
Box::new(WithEmbeddings::new(data, self.embeddings))
};
let req = self.request;
if self.embeddings.is_empty() {
match self.data {
CreateTableBuilderInitialData::Iterator(maybe_iter) => {
let data = maybe_iter?;
Ok(CreateTableRequest {
data: CreateTableData::Data(data),
..req
..self.request
})
}
CreateTableBuilderInitialData::None => {
unreachable!("No data provided for CreateTableBuilder<true>")
}
CreateTableBuilderInitialData::Stream(maybe_stream) => {
let data = maybe_stream?;
Ok(CreateTableRequest {
data: CreateTableData::StreamingData(data),
..self.request
})
}
}
} else {
let CreateTableBuilderInitialData::Iterator(maybe_iter) = self.data else {
return Err(Error::NotSupported { message: "Creating a table with embeddings is currently not support when the input is streaming".to_string() });
};
let data = maybe_iter?;
let data = Box::new(WithEmbeddings::new(data, self.embeddings));
Ok(CreateTableRequest {
data: CreateTableData::Data(data),
..self.request
})
}
}
}
// Builder methods that only apply when we do not have initial data
@@ -151,7 +198,7 @@ impl CreateTableBuilder<false> {
Self {
parent,
request: CreateTableRequest::new(name, CreateTableData::Empty(table_definition)),
data: None,
data: CreateTableBuilderInitialData::None,
embeddings: Vec::default(),
embedding_registry,
}
@@ -432,7 +479,7 @@ impl Connection {
TableNamesBuilder::new(self.internal.clone())
}
/// Create a new table from data
/// Create a new table from an iterator of data
///
/// # Parameters
///
@@ -451,6 +498,25 @@ impl Connection {
)
}
/// Create a new table from a stream of data
///
/// # Parameters
///
/// * `name` - The name of the table
/// * `initial_data` - The initial data to write to the table
pub fn create_table_streaming<T: IntoArrowStream>(
&self,
name: impl Into<String>,
initial_data: T,
) -> CreateTableBuilder<true> {
CreateTableBuilder::<true>::new_streaming(
self.internal.clone(),
name.into(),
initial_data,
self.embedding_registry.clone(),
)
}
/// Create an empty table with a given schema
///
/// # Parameters
@@ -788,12 +854,16 @@ mod test_utils {
mod tests {
use std::fs::create_dir_all;
use arrow::compute::concat_batches;
use arrow_array::RecordBatchReader;
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::{stream, TryStreamExt};
use lance::error::{ArrowResult, DataFusionResult};
use lance_testing::datagen::{BatchGenerator, IncrementingInt32};
use tempfile::tempdir;
use crate::arrow::SimpleRecordBatchStream;
use crate::database::listing::{ListingDatabaseOptions, NewTableConfig};
use crate::query::QueryBase;
use crate::query::{ExecutableQuery, QueryExecutionOptions};
@@ -976,6 +1046,63 @@ mod tests {
assert_eq!(batches.len(), 1);
}
#[tokio::test]
async fn test_create_table_streaming() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let db = connect(uri).execute().await.unwrap();
let batches = make_data().collect::<ArrowResult<Vec<_>>>().unwrap();
let schema = batches.first().unwrap().schema();
let one_batch = concat_batches(&schema, batches.iter()).unwrap();
let ldb_stream = stream::iter(batches.clone().into_iter().map(Result::Ok));
let ldb_stream: SendableRecordBatchStream =
Box::pin(SimpleRecordBatchStream::new(ldb_stream, schema.clone()));
let tbl1 = db
.create_table_streaming("one", ldb_stream)
.execute()
.await
.unwrap();
let df_stream = stream::iter(batches.into_iter().map(DataFusionResult::Ok));
let df_stream: datafusion_physical_plan::SendableRecordBatchStream =
Box::pin(RecordBatchStreamAdapter::new(schema.clone(), df_stream));
let tbl2 = db
.create_table_streaming("two", df_stream)
.execute()
.await
.unwrap();
let tbl1_data = tbl1
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let tbl1_data = concat_batches(&schema, tbl1_data.iter()).unwrap();
assert_eq!(tbl1_data, one_batch);
let tbl2_data = tbl2
.query()
.execute()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let tbl2_data = concat_batches(&schema, tbl2_data.iter()).unwrap();
assert_eq!(tbl2_data, one_batch);
}
#[tokio::test]
async fn drop_table() {
let tmp_dir = tempdir().unwrap();

View File

@@ -18,8 +18,13 @@ use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use async_trait::async_trait;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;
use lance::dataset::ReadParams;
use lance_datafusion::utils::StreamingWriteSource;
use crate::arrow::{SendableRecordBatchStream, SendableRecordBatchStreamExt};
use crate::error::Result;
use crate::table::{BaseTable, TableDefinition, WriteOptions};
@@ -81,12 +86,41 @@ impl Default for CreateTableMode {
/// The data to start a table or a schema to create an empty table
pub enum CreateTableData {
/// Creates a table using data, no schema required as it will be obtained from the data
/// Creates a table using an iterator of data, the schema will be obtained from the data
Data(Box<dyn RecordBatchReader + Send>),
/// Creates a table using a stream of data, the schema will be obtained from the data
StreamingData(SendableRecordBatchStream),
/// Creates an empty table, the definition / schema must be provided separately
Empty(TableDefinition),
}
impl CreateTableData {
pub fn schema(&self) -> Arc<arrow_schema::Schema> {
match self {
Self::Data(reader) => reader.schema(),
Self::StreamingData(stream) => stream.schema(),
Self::Empty(definition) => definition.schema.clone(),
}
}
}
#[async_trait]
impl StreamingWriteSource for CreateTableData {
fn arrow_schema(&self) -> Arc<arrow_schema::Schema> {
self.schema()
}
fn into_stream(self) -> datafusion_physical_plan::SendableRecordBatchStream {
match self {
Self::Data(reader) => reader.into_stream(),
Self::StreamingData(stream) => stream.into_df_stream(),
Self::Empty(table_definition) => {
let schema = table_definition.schema.clone();
Box::pin(RecordBatchStreamAdapter::new(schema, stream::empty()))
}
}
}
}
/// A request to create a table
pub struct CreateTableRequest {
/// The name of the new table

View File

@@ -7,9 +7,9 @@ use std::fs::create_dir_all;
use std::path::Path;
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatchIterator;
use lance::dataset::{ReadParams, WriteMode};
use lance::io::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry, WrappingObjectStore};
use lance_datafusion::utils::StreamingWriteSource;
use lance_encoding::version::LanceFileVersion;
use lance_table::io::commit::commit_handler_from_url;
use object_store::local::LocalFileSystem;
@@ -22,8 +22,8 @@ use crate::table::NativeTable;
use crate::utils::validate_table_name;
use super::{
BaseTable, CreateTableData, CreateTableMode, CreateTableRequest, Database, DatabaseOptions,
OpenTableRequest, TableNamesRequest,
BaseTable, CreateTableMode, CreateTableRequest, Database, DatabaseOptions, OpenTableRequest,
TableNamesRequest,
};
/// File extension to indicate a lance table
@@ -322,6 +322,37 @@ impl ListingDatabase {
Ok(uri)
}
async fn drop_tables(&self, names: Vec<String>) -> Result<()> {
let object_store_params = ObjectStoreParams {
storage_options: Some(self.storage_options.clone()),
..Default::default()
};
let mut uri = self.uri.clone();
if let Some(query_string) = &self.query_string {
uri.push_str(&format!("?{}", query_string));
}
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params)).await?;
for name in names {
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
let full_path = self.base_path.child(dir_name.clone());
commit_handler.delete(&full_path).await?;
self.object_store
.remove_dir_all(full_path.clone())
.await
.map_err(|err| match err {
// this error is not lance::Error::DatasetNotFound, as the method
// `remove_dir_all` may be used to remove something not be a dataset
lance::Error::NotFound { .. } => Error::TableNotFound {
name: name.to_owned(),
},
_ => Error::from(err),
})?;
}
Ok(())
}
}
#[async_trait::async_trait]
@@ -401,19 +432,12 @@ impl Database for ListingDatabase {
write_params.mode = WriteMode::Overwrite;
}
let data = match request.data {
CreateTableData::Data(data) => data,
CreateTableData::Empty(table_definition) => {
let schema = table_definition.schema.clone();
Box::new(RecordBatchIterator::new(vec![], schema))
}
};
let data_schema = data.schema();
let data_schema = request.data.arrow_schema();
match NativeTable::create(
&table_uri,
&request.name,
data,
request.data,
self.store_wrapper.clone(),
Some(write_params),
self.read_consistency_interval,
@@ -500,40 +524,12 @@ impl Database for ListingDatabase {
}
async fn drop_table(&self, name: &str) -> Result<()> {
let dir_name = format!("{}.{}", name, LANCE_EXTENSION);
let full_path = self.base_path.child(dir_name.clone());
self.object_store
.remove_dir_all(full_path.clone())
.await
.map_err(|err| match err {
// this error is not lance::Error::DatasetNotFound,
// as the method `remove_dir_all` may be used to remove something not be a dataset
lance::Error::NotFound { .. } => Error::TableNotFound {
name: name.to_owned(),
},
_ => Error::from(err),
})?;
let object_store_params = ObjectStoreParams {
storage_options: Some(self.storage_options.clone()),
..Default::default()
};
let mut uri = self.uri.clone();
if let Some(query_string) = &self.query_string {
uri.push_str(&format!("?{}", query_string));
}
let commit_handler = commit_handler_from_url(&uri, &Some(object_store_params))
.await
.unwrap();
commit_handler.delete(&full_path).await.unwrap();
Ok(())
self.drop_tables(vec![name.to_string()]).await
}
async fn drop_all_tables(&self) -> Result<()> {
self.object_store
.remove_dir_all(self.base_path.clone())
.await?;
Ok(())
let tables = self.table_names(TableNamesRequest::default()).await?;
self.drop_tables(tables).await
}
fn as_any(&self) -> &dyn std::any::Any {

View File

@@ -15,6 +15,10 @@ pub enum Error {
InvalidInput { message: String },
#[snafu(display("Table '{name}' was not found"))]
TableNotFound { name: String },
#[snafu(display("Database '{name}' was not found"))]
DatabaseNotFound { name: String },
#[snafu(display("Database '{name}' already exists."))]
DatabaseAlreadyExists { name: String },
#[snafu(display("Index '{name}' was not found"))]
IndexNotFound { name: String },
#[snafu(display("Embedding function '{name}' was not found. : {reason}"))]

View File

@@ -191,6 +191,7 @@
//! ```
pub mod arrow;
pub mod catalog;
pub mod connection;
pub mod data;
pub mod database;

View File

@@ -470,6 +470,9 @@ impl<T: HasQuery> QueryBase for T {
}
fn full_text_search(mut self, query: FullTextSearchQuery) -> Self {
if self.mut_query().limit.is_none() {
self.mut_query().limit = Some(DEFAULT_TOP_K);
}
self.mut_query().full_text_search = Some(query);
self
}
@@ -634,7 +637,7 @@ pub struct QueryRequest {
impl Default for QueryRequest {
fn default() -> Self {
Self {
limit: Some(DEFAULT_TOP_K),
limit: None,
offset: None,
filter: None,
full_text_search: None,
@@ -719,6 +722,11 @@ impl Query {
let mut vector_query = self.into_vector();
let query_vector = vector.to_query_vector(&DataType::Float32, "default")?;
vector_query.request.query_vector.push(query_vector);
if vector_query.request.base.limit.is_none() {
vector_query.request.base.limit = Some(DEFAULT_TOP_K);
}
Ok(vector_query)
}

View File

@@ -19,12 +19,41 @@ use crate::database::{
};
use crate::error::Result;
use crate::table::BaseTable;
use crate::Error;
use super::client::{ClientConfig, HttpSend, RequestResultExt, RestfulLanceDbClient, Sender};
use super::table::RemoteTable;
use super::util::batches_to_ipc_bytes;
use super::util::{batches_to_ipc_bytes, parse_server_version};
use super::ARROW_STREAM_CONTENT_TYPE;
// the versions of the server that we support
// for any new feature that we need to change the SDK behavior, we should bump the server version,
// and add a feature flag as method of `ServerVersion` here.
pub const DEFAULT_SERVER_VERSION: semver::Version = semver::Version::new(0, 1, 0);
#[derive(Debug, Clone)]
pub struct ServerVersion(pub semver::Version);
impl Default for ServerVersion {
fn default() -> Self {
Self(DEFAULT_SERVER_VERSION.clone())
}
}
impl ServerVersion {
pub fn parse(version: &str) -> Result<Self> {
let version = Self(
semver::Version::parse(version).map_err(|e| Error::InvalidInput {
message: e.to_string(),
})?,
);
Ok(version)
}
pub fn support_multivector(&self) -> bool {
self.0 >= semver::Version::new(0, 2, 0)
}
}
#[derive(Deserialize)]
struct ListTablesResponse {
tables: Vec<String>,
@@ -33,7 +62,7 @@ struct ListTablesResponse {
#[derive(Debug)]
pub struct RemoteDatabase<S: HttpSend = Sender> {
client: RestfulLanceDbClient<S>,
table_cache: Cache<String, ()>,
table_cache: Cache<String, Arc<RemoteTable<S>>>,
}
impl RemoteDatabase {
@@ -115,13 +144,19 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
}
let (request_id, rsp) = self.client.send(req, true).await?;
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let tables = rsp
.json::<ListTablesResponse>()
.await
.err_to_http(request_id)?
.tables;
for table in &tables {
self.table_cache.insert(table.clone(), ()).await;
let remote_table = Arc::new(RemoteTable::new(
self.client.clone(),
table.clone(),
version.clone(),
));
self.table_cache.insert(table.clone(), remote_table).await;
}
Ok(tables)
}
@@ -129,6 +164,11 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
async fn create_table(&self, request: CreateTableRequest) -> Result<Arc<dyn BaseTable>> {
let data = match request.data {
CreateTableData::Data(data) => data,
CreateTableData::StreamingData(_) => {
return Err(Error::NotSupported {
message: "Creating a remote table from a streaming source".to_string(),
})
}
CreateTableData::Empty(table_definition) => {
let schema = table_definition.schema.clone();
Box::new(RecordBatchIterator::new(vec![], schema))
@@ -187,34 +227,42 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
return Err(crate::Error::InvalidInput { message: body });
}
}
self.client.check_response(&request_id, rsp).await?;
self.table_cache.insert(request.name.clone(), ()).await;
Ok(Arc::new(RemoteTable::new(
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name,
)))
request.name.clone(),
version,
));
self.table_cache
.insert(request.name.clone(), table.clone())
.await;
Ok(table)
}
async fn open_table(&self, request: OpenTableRequest) -> Result<Arc<dyn BaseTable>> {
// We describe the table to confirm it exists before moving on.
if self.table_cache.get(&request.name).await.is_none() {
if let Some(table) = self.table_cache.get(&request.name).await {
Ok(table.clone())
} else {
let req = self
.client
.post(&format!("/v1/table/{}/describe/", request.name));
let (request_id, resp) = self.client.send(req, true).await?;
if resp.status() == StatusCode::NOT_FOUND {
let (request_id, rsp) = self.client.send(req, true).await?;
if rsp.status() == StatusCode::NOT_FOUND {
return Err(crate::Error::TableNotFound { name: request.name });
}
self.client.check_response(&request_id, resp).await?;
}
Ok(Arc::new(RemoteTable::new(
let rsp = self.client.check_response(&request_id, rsp).await?;
let version = parse_server_version(&request_id, &rsp)?;
let table = Arc::new(RemoteTable::new(
self.client.clone(),
request.name,
)))
request.name.clone(),
version,
));
self.table_cache.insert(request.name, table.clone()).await;
Ok(table)
}
}
async fn rename_table(&self, current_name: &str, new_name: &str) -> Result<()> {
@@ -224,8 +272,10 @@ impl<S: HttpSend> Database for RemoteDatabase<S> {
let req = req.json(&serde_json::json!({ "new_table_name": new_name }));
let (request_id, resp) = self.client.send(req, false).await?;
self.client.check_response(&request_id, resp).await?;
self.table_cache.remove(current_name).await;
self.table_cache.insert(new_name.into(), ()).await;
let table = self.table_cache.remove(current_name).await;
if let Some(table) = table {
self.table_cache.insert(new_name.into(), table).await;
}
Ok(())
}

View File

@@ -10,7 +10,7 @@ use crate::index::IndexStatistics;
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
use crate::table::{AddDataMode, AnyQuery, Filter};
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
use crate::{DistanceType, Error};
use crate::{DistanceType, Error, Table};
use arrow_array::RecordBatchReader;
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, SchemaRef};
@@ -24,7 +24,7 @@ use http::StatusCode;
use lance::arrow::json::{JsonDataType, JsonSchema};
use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
use lance_datafusion::exec::OneShotExec;
use lance_datafusion::exec::{execute_plan, OneShotExec};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
@@ -41,6 +41,7 @@ use crate::{
use super::client::RequestResultExt;
use super::client::{HttpSend, RestfulLanceDbClient, Sender};
use super::db::ServerVersion;
use super::ARROW_STREAM_CONTENT_TYPE;
#[derive(Debug)]
@@ -48,15 +49,21 @@ pub struct RemoteTable<S: HttpSend = Sender> {
#[allow(dead_code)]
client: RestfulLanceDbClient<S>,
name: String,
server_version: ServerVersion,
version: RwLock<Option<u64>>,
}
impl<S: HttpSend> RemoteTable<S> {
pub fn new(client: RestfulLanceDbClient<S>, name: String) -> Self {
pub fn new(
client: RestfulLanceDbClient<S>,
name: String,
server_version: ServerVersion,
) -> Self {
Self {
client,
name,
server_version,
version: RwLock::new(None),
}
}
@@ -154,9 +161,9 @@ impl<S: HttpSend> RemoteTable<S> {
body["offset"] = serde_json::Value::Number(serde_json::Number::from(offset));
}
if let Some(limit) = params.limit {
// Server requires k.
let limit = params.limit.unwrap_or(usize::MAX);
body["k"] = serde_json::Value::Number(serde_json::Number::from(limit));
}
if let Some(filter) = &params.filter {
if let QueryFilter::Sql(filter) = filter {
@@ -212,10 +219,11 @@ impl<S: HttpSend> RemoteTable<S> {
}
fn apply_vector_query_params(
body: &mut serde_json::Value,
&self,
mut body: serde_json::Value,
query: &VectorQueryRequest,
) -> Result<()> {
Self::apply_query_params(body, &query.base)?;
) -> Result<Vec<serde_json::Value>> {
Self::apply_query_params(&mut body, &query.base)?;
// Apply general parameters, before we dispatch based on number of query vectors.
body["distance_type"] = serde_json::json!(query.distance_type.unwrap_or_default());
@@ -256,25 +264,40 @@ impl<S: HttpSend> RemoteTable<S> {
}
}
match query.query_vector.len() {
let bodies = match query.query_vector.len() {
0 => {
// Server takes empty vector, not null or undefined.
body["vector"] = serde_json::Value::Array(Vec::new());
vec![body]
}
1 => {
body["vector"] = vector_to_json(&query.query_vector[0])?;
vec![body]
}
_ => {
if self.server_version.support_multivector() {
let vectors = query
.query_vector
.iter()
.map(vector_to_json)
.collect::<Result<Vec<_>>>()?;
body["vector"] = serde_json::Value::Array(vectors);
vec![body]
} else {
// Server does not support multiple vectors in a single query.
// We need to send multiple requests.
let mut bodies = Vec::with_capacity(query.query_vector.len());
for vector in &query.query_vector {
let mut body = body.clone();
body["vector"] = vector_to_json(vector)?;
bodies.push(body);
}
bodies
}
}
};
Ok(())
Ok(bodies)
}
async fn check_mutable(&self) -> Result<()> {
@@ -299,27 +322,34 @@ impl<S: HttpSend> RemoteTable<S> {
&self,
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<Pin<Box<dyn RecordBatchStream + Send>>> {
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let request = self.client.post(&format!("/v1/table/{}/query/", self.name));
let version = self.current_version().await;
let mut body = serde_json::json!({ "version": version });
match query {
let requests = match query {
AnyQuery::Query(query) => {
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
vec![request.json(&body)]
}
AnyQuery::VectorQuery(query) => {
Self::apply_vector_query_params(&mut body, query)?;
}
let bodies = self.apply_vector_query_params(body, query)?;
bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect()
}
};
let request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
let stream = self.read_arrow_stream(&request_id, response).await?;
Ok(stream)
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures).await?;
Ok(streams)
}
}
@@ -342,7 +372,7 @@ mod test_utils {
use crate::remote::client::test_utils::MockSender;
impl RemoteTable<MockSender> {
pub fn new_mock<F, T>(name: String, handler: F) -> Self
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
@@ -351,6 +381,7 @@ mod test_utils {
Self {
client,
name,
server_version: version.map(ServerVersion).unwrap_or_default(),
version: RwLock::new(None),
}
}
@@ -491,8 +522,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let stream = self.execute_query(query, options).await?;
let streams = self.execute_query(query, options).await?;
if streams.len() == 1 {
let stream = streams.into_iter().next().unwrap();
Ok(Arc::new(OneShotExec::new(stream)))
} else {
let stream_execs = streams
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
Table::multi_vector_plan(stream_execs)
}
}
async fn query(
@@ -500,8 +540,24 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<DatasetRecordBatchStream> {
let stream = self.execute_query(query, _options).await?;
Ok(DatasetRecordBatchStream::new(stream))
let streams = self.execute_query(query, _options).await?;
if streams.len() == 1 {
Ok(DatasetRecordBatchStream::new(
streams.into_iter().next().unwrap(),
))
} else {
let stream_execs = streams
.into_iter()
.map(|stream| Arc::new(OneShotExec::new(stream)) as Arc<dyn ExecutionPlan>)
.collect();
let plan = Table::multi_vector_plan(stream_execs)?;
Ok(DatasetRecordBatchStream::new(execute_plan(
plan,
Default::default(),
)?))
}
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
self.check_mutable().await?;
@@ -884,8 +940,10 @@ mod tests {
use futures::{future::BoxFuture, StreamExt, TryFutureExt};
use lance_index::scalar::FullTextSearchQuery;
use reqwest::Body;
use rstest::rstest;
use crate::index::vector::IvfFlatIndexBuilder;
use crate::remote::db::DEFAULT_SERVER_VERSION;
use crate::remote::JSON_CONTENT_TYPE;
use crate::{
index::{vector::IvfPqIndexBuilder, Index, IndexStatistics, IndexType},
@@ -1293,6 +1351,52 @@ mod tests {
table.delete("id in (1, 2, 3)").await.unwrap();
}
#[tokio::test]
async fn test_query_plain() {
let expected_data = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();
let expected_data_ref = expected_data.clone();
let table = Table::new_with_handler("my_table", move |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
assert_eq!(
request.headers().get("Content-Type").unwrap(),
JSON_CONTENT_TYPE
);
let body = request.body().unwrap().as_bytes().unwrap();
let body: serde_json::Value = serde_json::from_slice(body).unwrap();
let expected_body = serde_json::json!({
"k": usize::MAX,
"prefilter": true,
"vector": [], // Empty vector means no vector query.
"version": null,
});
assert_eq!(body, expected_body);
let response_body = write_ipc_file(&expected_data_ref);
http::Response::builder()
.status(200)
.header(CONTENT_TYPE, ARROW_FILE_CONTENT_TYPE)
.body(response_body)
.unwrap()
});
let data = table
.query()
.execute()
.await
.unwrap()
.collect::<Vec<_>>()
.await;
assert_eq!(data.len(), 1);
assert_eq!(data[0].as_ref().unwrap(), &expected_data);
}
#[tokio::test]
async fn test_query_vector_default_values() {
let expected_data = RecordBatch::try_new(
@@ -1508,9 +1612,12 @@ mod tests {
.unwrap();
}
#[rstest]
#[case(DEFAULT_SERVER_VERSION.clone())]
#[case(semver::Version::new(0, 2, 0))]
#[tokio::test]
async fn test_query_multiple_vectors() {
let table = Table::new_with_handler("my_table", |request| {
async fn test_batch_queries(#[case] version: semver::Version) {
let table = Table::new_with_handler_version("my_table", version.clone(), move |request| {
assert_eq!(request.method(), "POST");
assert_eq!(request.url().path(), "/v1/table/my_table/query/");
assert_eq!(
@@ -1520,10 +1627,12 @@ mod tests {
let body: serde_json::Value =
serde_json::from_slice(request.body().unwrap().as_bytes().unwrap()).unwrap();
let query_vectors = body["vector"].as_array().unwrap();
let version = ServerVersion(version.clone());
let data = if version.support_multivector() {
assert_eq!(query_vectors.len(), 2);
assert_eq!(query_vectors[0].as_array().unwrap().len(), 3);
assert_eq!(query_vectors[1].as_array().unwrap().len(), 3);
let data = RecordBatch::try_new(
RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("query_index", DataType::Int32, false),
@@ -1533,7 +1642,17 @@ mod tests {
Arc::new(Int32Array::from(vec![0, 0, 0, 1, 1, 1])),
],
)
.unwrap();
.unwrap()
} else {
// it's single flat vector, so here the length is dim
assert_eq!(query_vectors.len(), 3);
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap()
};
let response_body = write_ipc_file(&data);
http::Response::builder()
.status(200)

View File

@@ -4,9 +4,12 @@
use std::io::Cursor;
use arrow_array::RecordBatchReader;
use reqwest::Response;
use crate::Result;
use super::db::ServerVersion;
pub fn batches_to_ipc_bytes(batches: impl RecordBatchReader) -> Result<Vec<u8>> {
const WRITE_BUF_SIZE: usize = 4096;
let buf = Vec::with_capacity(WRITE_BUF_SIZE);
@@ -22,3 +25,24 @@ pub fn batches_to_ipc_bytes(batches: impl RecordBatchReader) -> Result<Vec<u8>>
}
Ok(buf.into_inner())
}
pub fn parse_server_version(req_id: &str, rsp: &Response) -> Result<ServerVersion> {
let version = rsp
.headers()
.get("phalanx-version")
.map(|v| {
let v = v.to_str().map_err(|e| crate::Error::Http {
source: e.into(),
request_id: req_id.to_string(),
status_code: Some(rsp.status()),
})?;
ServerVersion::parse(v).map_err(|e| crate::Error::Http {
source: e.into(),
request_id: req_id.to_string(),
status_code: Some(rsp.status()),
})
})
.transpose()?
.unwrap_or_default();
Ok(version)
}

View File

@@ -28,13 +28,13 @@ pub use lance::dataset::NewColumnTransform;
pub use lance::dataset::ReadParams;
pub use lance::dataset::Version;
use lance::dataset::{
Dataset, InsertBuilder, UpdateBuilder as LanceUpdateBuilder, WhenMatched, WriteMode,
WriteParams,
InsertBuilder, UpdateBuilder as LanceUpdateBuilder, WhenMatched, WriteMode, WriteParams,
};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::index::vector::utils::infer_vector_dim;
use lance::io::WrappingObjectStore;
use lance_datafusion::exec::execute_plan;
use lance_datafusion::utils::StreamingWriteSource;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::vector::pq::PQBuildParams;
@@ -509,6 +509,27 @@ mod test_utils {
let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
name.into(),
handler,
None,
));
Self {
inner,
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version<T>(
name: impl Into<String>,
version: semver::Version,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(crate::remote::table::RemoteTable::new_mock(
name.into(),
handler,
Some(version),
));
Self {
inner,
@@ -1243,7 +1264,7 @@ impl NativeTable {
pub async fn create(
uri: &str,
name: &str,
batches: impl RecordBatchReader + Send + 'static,
batches: impl StreamingWriteSource,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: Option<WriteParams>,
read_consistency_interval: Option<std::time::Duration>,
@@ -1258,7 +1279,9 @@ impl NativeTable {
None => params,
};
let dataset = Dataset::write(batches, uri, Some(params))
let insert_builder = InsertBuilder::new(uri).with_params(&params);
let dataset = insert_builder
.execute_stream(batches)
.await
.map_err(|e| match e {
lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
@@ -1266,6 +1289,7 @@ impl NativeTable {
},
source => Error::Lance { source },
})?;
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
@@ -2370,8 +2394,9 @@ mod tests {
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use futures::TryStreamExt;
use lance::dataset::{Dataset, WriteMode};
use lance::dataset::WriteMode;
use lance::io::{ObjectStoreParams, WrappingObjectStore};
use lance::Dataset;
use rand::Rng;
use tempfile::tempdir;
@@ -2421,6 +2446,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let batches = make_test_batches();
let batches = Box::new(batches) as Box<dyn RecordBatchReader + Send>;
let table = NativeTable::create(uri, "test", batches, None, None, None)
.await
.unwrap();

View File

@@ -4,6 +4,7 @@
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatch;
use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider};
@@ -104,7 +105,9 @@ impl ExecutionPlan for MetadataEraserExec {
) -> DataFusionResult<SendableRecordBatchStream> {
let stream = self.input.execute(partition, context)?;
let schema = self.schema.clone();
let stream = stream.map_ok(move |batch| batch.with_schema(schema.clone()).unwrap());
let stream = stream.map_ok(move |batch| {
RecordBatch::try_new(schema.clone(), batch.columns().to_vec()).unwrap()
});
Ok(
Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream))
as SendableRecordBatchStream,
@@ -148,7 +151,7 @@ impl TableProvider for BaseTableAdapter {
async fn scan(
&self,
_state: &dyn Session,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
@@ -174,9 +177,15 @@ impl TableProvider for BaseTableAdapter {
// Need to override the default of 10
query.limit = None;
}
let options = QueryExecutionOptions {
max_batch_length: state.config().batch_size() as u32,
..Default::default()
};
let plan = self
.table
.create_plan(&AnyQuery::Query(query), QueryExecutionOptions::default())
.create_plan(&AnyQuery::Query(query), options)
.map_err(|err| DataFusionError::External(err.into()))
.await?;
Ok(Arc::new(MetadataEraserExec::new(plan)))
@@ -201,14 +210,18 @@ pub mod tests {
use arrow::array::AsArray;
use arrow_array::{
Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt32Array,
BinaryArray, Float64Array, Int32Array, Int64Array, RecordBatch, RecordBatchIterator,
RecordBatchReader, StringArray, UInt32Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::{datasource::provider_as_source, prelude::SessionContext};
use datafusion::{
datasource::provider_as_source,
prelude::{SessionConfig, SessionContext},
};
use datafusion_catalog::TableProvider;
use datafusion_execution::SendableRecordBatchStream;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use tempfile::tempdir;
use crate::{
@@ -238,9 +251,49 @@ pub mod tests {
)
}
fn make_tbl_two_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
let metadata = HashMap::from_iter(vec![("foo".to_string(), "bar".to_string())]);
let schema = Arc::new(
Schema::new(vec![
Field::new("ints", DataType::Int64, true),
Field::new("strings", DataType::Utf8, true),
Field::new("floats", DataType::Float64, true),
Field::new("jsons", DataType::Utf8, true),
Field::new("bins", DataType::Binary, true),
Field::new("nodates", DataType::Utf8, true),
])
.with_metadata(metadata),
);
RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from_iter_values(0..1000)),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
)),
Arc::new(Float64Array::from_iter_values((0..1000).map(|i| i as f64))),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| format!("{{\"i\":{}}}", i)),
)),
Arc::new(BinaryArray::from_iter_values(
(0..1000).map(|i| (i as u32).to_be_bytes().to_vec()),
)),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
)),
],
)],
schema,
)
}
struct TestFixture {
_tmp_dir: tempfile::TempDir,
// An adapter for a table with make_test_batches batches
adapter: Arc<BaseTableAdapter>,
// an adapter for a table with make_tbl_two_test_batches batches
adapter2: Arc<BaseTableAdapter>,
}
impl TestFixture {
@@ -262,20 +315,40 @@ pub mod tests {
.await
.unwrap();
let tbl2 = db
.create_table("tbl2", make_tbl_two_test_batches())
.execute()
.await
.unwrap();
let adapter = Arc::new(
BaseTableAdapter::try_new(tbl.base_table().clone())
.await
.unwrap(),
);
let adapter2 = Arc::new(
BaseTableAdapter::try_new(tbl2.base_table().clone())
.await
.unwrap(),
);
Self {
_tmp_dir: tmp_dir,
adapter,
adapter2,
}
}
async fn plan_to_stream(plan: LogicalPlan) -> SendableRecordBatchStream {
SessionContext::new()
Self::plan_to_stream_with_config(plan, SessionConfig::default()).await
}
async fn plan_to_stream_with_config(
plan: LogicalPlan,
config: SessionConfig,
) -> SendableRecordBatchStream {
SessionContext::new_with_config(config)
.execute_logical_plan(plan)
.await
.unwrap()
@@ -309,7 +382,7 @@ pub mod tests {
}
async fn check_plan(plan: LogicalPlan, expected: &str) {
let physical_plan = dbg!(Self::plan_to_explain(plan).await);
let physical_plan = Self::plan_to_explain(plan).await;
let mut lines_checked = 0;
for (actual_line, expected_line) in physical_plan.lines().zip(expected.lines()) {
lines_checked += 1;
@@ -325,6 +398,30 @@ pub mod tests {
}
}
#[tokio::test]
async fn test_batch_size() {
let fixture = TestFixture::new().await;
let plan = LogicalPlanBuilder::scan("foo", provider_as_source(fixture.adapter2), None)
.unwrap()
.build()
.unwrap();
let config = SessionConfig::default().with_batch_size(100);
let stream = TestFixture::plan_to_stream_with_config(plan.clone(), config).await;
let batch_count = stream.count().await;
assert_eq!(batch_count, 10);
let config = SessionConfig::default().with_batch_size(250);
let stream = TestFixture::plan_to_stream_with_config(plan, config).await;
let batch_count = stream.count().await;
assert_eq!(batch_count, 4);
}
#[tokio::test]
async fn test_metadata_erased() {
let fixture = TestFixture::new().await;
@@ -343,6 +440,27 @@ pub mod tests {
}
}
#[tokio::test]
async fn test_metadata_erased_with_filter() {
// This is a regression test where the metadata eraser was not properly erasing metadata
let fixture = TestFixture::new().await;
assert!(fixture.adapter.schema().metadata().is_empty());
let plan = LogicalPlanBuilder::scan("foo", provider_as_source(fixture.adapter2), None)
.unwrap()
.filter(col("ints").lt(lit(10)))
.unwrap()
.build()
.unwrap();
let mut stream = TestFixture::plan_to_stream(plan).await;
while let Some(batch) = stream.try_next().await.unwrap() {
assert!(batch.schema().metadata().is_empty());
}
}
#[tokio::test]
async fn test_filter_pushdown() {
let fixture = TestFixture::new().await;